-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[caclmgrd] Prevent unnecessary iptables updates #5312
Changes from all commits
457b027
8060838
8aa91bb
15ef28b
c484ee9
afeffe2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ try: | |
import os | ||
import subprocess | ||
import sys | ||
import threading | ||
import time | ||
|
||
from sonic_py_common import daemon_base, device_info | ||
from swsscommon import swsscommon | ||
|
@@ -26,6 +28,8 @@ VERSION = "1.0" | |
|
||
SYSLOG_IDENTIFIER = "caclmgrd" | ||
|
||
DEFAULT_NAMESPACE = '' | ||
|
||
|
||
# ========================== Helper Functions ========================= | ||
|
||
|
@@ -75,21 +79,38 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): | |
} | ||
} | ||
|
||
UPDATE_DELAY_SECS = 0.5 | ||
|
||
def __init__(self, log_identifier): | ||
super(ControlPlaneAclManager, self).__init__(log_identifier) | ||
|
||
# Update-thread-specific data per namespace | ||
self.update_thread = {} | ||
self.lock = {} | ||
self.num_changes = {} | ||
|
||
# Initialize update-thread-specific data for default namespace | ||
self.update_thread[DEFAULT_NAMESPACE] = None | ||
self.lock[DEFAULT_NAMESPACE] = threading.Lock() | ||
self.num_changes[DEFAULT_NAMESPACE] = 0 | ||
|
||
SonicDBConfig.load_sonic_global_db_config() | ||
self.config_db_map = {} | ||
self.iptables_cmd_ns_prefix = {} | ||
self.config_db_map[''] = ConfigDBConnector(use_unix_socket_path=True, namespace='') | ||
self.config_db_map[''].connect() | ||
self.iptables_cmd_ns_prefix[''] = "" | ||
self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[''], '') | ||
self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[''], '') | ||
self.config_db_map[DEFAULT_NAMESPACE] = ConfigDBConnector(use_unix_socket_path=True, namespace=DEFAULT_NAMESPACE) | ||
self.config_db_map[DEFAULT_NAMESPACE].connect() | ||
self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE] = "" | ||
self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE) | ||
self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE) | ||
self.namespace_docker_mgmt_ip = {} | ||
self.namespace_docker_mgmt_ipv6 = {} | ||
|
||
namespaces = device_info.get_all_namespaces() | ||
for front_asic_namespace in namespaces['front_ns']: | ||
self.update_thread[front_asic_namespace] = None | ||
self.lock[front_asic_namespace] = threading.Lock() | ||
self.num_changes[front_asic_namespace] = 0 | ||
|
||
self.config_db_map[front_asic_namespace] = ConfigDBConnector(use_unix_socket_path=True, namespace=front_asic_namespace) | ||
self.config_db_map[front_asic_namespace].connect() | ||
self.iptables_cmd_ns_prefix[front_asic_namespace] = "ip netns exec " + front_asic_namespace + " " | ||
|
@@ -99,6 +120,10 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): | |
front_asic_namespace) | ||
|
||
for back_asic_namespace in namespaces['back_ns']: | ||
self.update_thread[back_asic_namespace] = None | ||
self.lock[back_asic_namespace] = threading.Lock() | ||
self.num_changes[back_asic_namespace] = 0 | ||
|
||
self.iptables_cmd_ns_prefix[back_asic_namespace] = "ip netns exec " + back_asic_namespace + " " | ||
self.namespace_docker_mgmt_ip[back_asic_namespace] = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[back_asic_namespace], | ||
back_asic_namespace) | ||
|
@@ -495,9 +520,44 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): | |
|
||
self.run_commands(iptables_cmds) | ||
|
||
def check_and_update_control_plane_acls(self, namespace, num_changes): | ||
""" | ||
This function is intended to be spawned in a separate thread. | ||
Its purpose is to prevent unnecessary iptables updates if we receive | ||
multiple rapid ACL table update notifications. It sleeps for UPDATE_DELAY_SECS | ||
then checks if any more ACL table updates were received in that window. If new | ||
updates were received, it will sleep again and repeat the process until no | ||
updates were received during the delay window, at which point it will update | ||
iptables using the current ACL rules. | ||
""" | ||
while True: | ||
# Sleep for our delay interval | ||
time.sleep(self.UPDATE_DELAY_SECS) | ||
|
||
with self.lock[namespace]: | ||
if self.num_changes[namespace] > num_changes: | ||
# More ACL table changes occurred since this thread was spawned | ||
# spawn a new thread with the current number of changes | ||
new_changes = self.num_changes[namespace] - num_changes | ||
self.log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..." | ||
.format(namespace, new_changes, self.UPDATE_DELAY_SECS)) | ||
num_changes = self.num_changes[namespace] | ||
else: | ||
if num_changes == self.num_changes[namespace] and num_changes > 0: | ||
self.log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..." | ||
.format(namespace, self.UPDATE_DELAY_SECS)) | ||
self.update_control_plane_acls(namespace) | ||
else: | ||
self.log_error("Error updating ACLs for namespace '{}'".format(namespace)) | ||
|
||
# Re-initialize | ||
self.num_changes[namespace] = 0 | ||
self.update_thread[namespace] = None | ||
return | ||
|
||
def run(self): | ||
# Select Time-out for 10 Seconds | ||
SELECT_TIMEOUT_MS = 1000 * 10 | ||
# Set select timeout to 1 second | ||
SELECT_TIMEOUT_MS = 1000 | ||
|
||
self.log_info("Starting up ...") | ||
|
||
|
@@ -515,7 +575,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): | |
# Map of Namespace <--> susbcriber table's object | ||
config_db_subscriber_table_map = {} | ||
|
||
# Loop through all asic namespaces (if present) and host (namespace='') | ||
# Loop through all asic namespaces (if present) and host namespace (DEFAULT_NAMESPACE) | ||
for namespace in self.config_db_map.keys(): | ||
# Unconditionally update control plane ACLs once at start on given namespace | ||
self.update_control_plane_acls(namespace) | ||
|
@@ -540,14 +600,23 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): | |
# Loop on select to see if any event happen on config db of any namespace | ||
while True: | ||
ctrl_plane_acl_notification = set() | ||
|
||
# A brief sleep appears necessary in this loop or any spawned | ||
# update threads will get stuck. Appears to be due to the sel.select() call. | ||
# TODO: Eliminate the need for this sleep. | ||
time.sleep(0.1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jleveque Do we need this still ? Can we update SELECT_TIMEOUT_MS to take care of of this sleep ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot explain the root cause of the need for this sleep. I wish I could. Adjusting |
||
|
||
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) | ||
# Continue if select is timeout or selectable object is not return | ||
if state != swsscommon.Select.OBJECT: | ||
continue | ||
# Get the redisselect object from selectable object | ||
|
||
# Get the redisselect object from selectable object | ||
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) | ||
|
||
# Get the corresponding namespace from redisselect db connector object | ||
namespace = redisSelectObj.getDbConnector().getNamespace() | ||
|
||
# Pop data of both Subscriber Table object of namespace that got config db acl table event | ||
for table in config_db_subscriber_table_map[namespace]: | ||
while True: | ||
|
@@ -568,7 +637,20 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): | |
|
||
# Update the Control Plane ACL of the namespace that got config db acl table event | ||
for namespace in ctrl_plane_acl_notification: | ||
self.update_control_plane_acls(namespace) | ||
with self.lock[namespace]: | ||
if self.num_changes[namespace] == 0: | ||
self.log_info("ACL change detected for namespace '{}'".format(namespace)) | ||
|
||
# Increment the number of change events we've received for this namespace | ||
self.num_changes[namespace] += 1 | ||
|
||
# If an update thread is not already spawned for the namespace which we received | ||
# the ACL table update event, spawn one now | ||
if not self.update_thread[namespace]: | ||
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) | ||
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, | ||
args=(namespace, self.num_changes[namespace])) | ||
self.update_thread[namespace].start() | ||
|
||
# ============================= Functions ============================= | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jleveque do we need num_changes > 0 check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's basically a safeguard, in case
num_changes == 0
andself.num_changes[namespace] == 0
. We shouldn't get here in this situation, but if we do we will hit the else case and log an error message. It could be helpful for debugging.