Skip to content

Commit

Permalink
[caclmgrd] Prevent unnecessary iptables updates (#5312)
Browse files Browse the repository at this point in the history
When a large number of changes occur to the ACL table of Config DB, caclmgrd will get flooded with notifications, and previously, it would regenerate and apply the iptables rules for each change, which is unnecessary, as the iptables rules should only get applied once after the last change notification is received. If the ACL table contains a large number of control plane ACL rules, this could cause a large delay in caclmgrd getting the rules applied.

This patch causes caclmgrd to delay updating the iptables rules until it has not received a change notification for at least 0.5 seconds.
  • Loading branch information
jleveque authored Oct 19, 2020
1 parent b5043a2 commit edf4971
Showing 1 changed file with 92 additions and 10 deletions.
102 changes: 92 additions & 10 deletions files/image_config/caclmgrd/caclmgrd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ try:
import os
import subprocess
import sys
import threading
import time

from sonic_py_common import daemon_base, device_info
from swsscommon import swsscommon
Expand All @@ -26,6 +28,8 @@ VERSION = "1.0"

SYSLOG_IDENTIFIER = "caclmgrd"

DEFAULT_NAMESPACE = ''


# ========================== Helper Functions =========================

Expand Down Expand Up @@ -75,21 +79,38 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
}
}

UPDATE_DELAY_SECS = 0.5

def __init__(self, log_identifier):
super(ControlPlaneAclManager, self).__init__(log_identifier)

# Update-thread-specific data per namespace
self.update_thread = {}
self.lock = {}
self.num_changes = {}

# Initialize update-thread-specific data for default namespace
self.update_thread[DEFAULT_NAMESPACE] = None
self.lock[DEFAULT_NAMESPACE] = threading.Lock()
self.num_changes[DEFAULT_NAMESPACE] = 0

SonicDBConfig.load_sonic_global_db_config()
self.config_db_map = {}
self.iptables_cmd_ns_prefix = {}
self.config_db_map[''] = ConfigDBConnector(use_unix_socket_path=True, namespace='')
self.config_db_map[''].connect()
self.iptables_cmd_ns_prefix[''] = ""
self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[''], '')
self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[''], '')
self.config_db_map[DEFAULT_NAMESPACE] = ConfigDBConnector(use_unix_socket_path=True, namespace=DEFAULT_NAMESPACE)
self.config_db_map[DEFAULT_NAMESPACE].connect()
self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE] = ""
self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE)
self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE)
self.namespace_docker_mgmt_ip = {}
self.namespace_docker_mgmt_ipv6 = {}

namespaces = device_info.get_all_namespaces()
for front_asic_namespace in namespaces['front_ns']:
self.update_thread[front_asic_namespace] = None
self.lock[front_asic_namespace] = threading.Lock()
self.num_changes[front_asic_namespace] = 0

self.config_db_map[front_asic_namespace] = ConfigDBConnector(use_unix_socket_path=True, namespace=front_asic_namespace)
self.config_db_map[front_asic_namespace].connect()
self.iptables_cmd_ns_prefix[front_asic_namespace] = "ip netns exec " + front_asic_namespace + " "
Expand All @@ -99,6 +120,10 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
front_asic_namespace)

for back_asic_namespace in namespaces['back_ns']:
self.update_thread[back_asic_namespace] = None
self.lock[back_asic_namespace] = threading.Lock()
self.num_changes[back_asic_namespace] = 0

self.iptables_cmd_ns_prefix[back_asic_namespace] = "ip netns exec " + back_asic_namespace + " "
self.namespace_docker_mgmt_ip[back_asic_namespace] = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[back_asic_namespace],
back_asic_namespace)
Expand Down Expand Up @@ -495,9 +520,44 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

self.run_commands(iptables_cmds)

def check_and_update_control_plane_acls(self, namespace, num_changes):
"""
This function is intended to be spawned in a separate thread.
Its purpose is to prevent unnecessary iptables updates if we receive
multiple rapid ACL table update notifications. It sleeps for UPDATE_DELAY_SECS
then checks if any more ACL table updates were received in that window. If new
updates were received, it will sleep again and repeat the process until no
updates were received during the delay window, at which point it will update
iptables using the current ACL rules.
"""
while True:
# Sleep for our delay interval
time.sleep(self.UPDATE_DELAY_SECS)

with self.lock[namespace]:
if self.num_changes[namespace] > num_changes:
# More ACL table changes occurred since this thread was spawned
# spawn a new thread with the current number of changes
new_changes = self.num_changes[namespace] - num_changes
self.log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..."
.format(namespace, new_changes, self.UPDATE_DELAY_SECS))
num_changes = self.num_changes[namespace]
else:
if num_changes == self.num_changes[namespace] and num_changes > 0:
self.log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..."
.format(namespace, self.UPDATE_DELAY_SECS))
self.update_control_plane_acls(namespace)
else:
self.log_error("Error updating ACLs for namespace '{}'".format(namespace))

# Re-initialize
self.num_changes[namespace] = 0
self.update_thread[namespace] = None
return

def run(self):
# Select Time-out for 10 Seconds
SELECT_TIMEOUT_MS = 1000 * 10
# Set select timeout to 1 second
SELECT_TIMEOUT_MS = 1000

self.log_info("Starting up ...")

Expand All @@ -515,7 +575,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# Map of Namespace <--> susbcriber table's object
config_db_subscriber_table_map = {}

# Loop through all asic namespaces (if present) and host (namespace='')
# Loop through all asic namespaces (if present) and host namespace (DEFAULT_NAMESPACE)
for namespace in self.config_db_map.keys():
# Unconditionally update control plane ACLs once at start on given namespace
self.update_control_plane_acls(namespace)
Expand All @@ -540,14 +600,23 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# Loop on select to see if any event happen on config db of any namespace
while True:
ctrl_plane_acl_notification = set()

# A brief sleep appears necessary in this loop or any spawned
# update threads will get stuck. Appears to be due to the sel.select() call.
# TODO: Eliminate the need for this sleep.
time.sleep(0.1)

(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue
# Get the redisselect object from selectable object

# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

# Get the corresponding namespace from redisselect db connector object
namespace = redisSelectObj.getDbConnector().getNamespace()

# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
Expand All @@ -568,7 +637,20 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
self.update_control_plane_acls(namespace)
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace]))
self.update_thread[namespace].start()

# ============================= Functions =============================

Expand Down

0 comments on commit edf4971

Please sign in to comment.