Skip to content

Commit

Permalink
[lldp_syncd] add new OIDs - lldpRemTable & lldpLocPortTable (sonic-net#5
Browse files Browse the repository at this point in the history
)

* [lldp_syncd] add new OIDs - lldpRemTable & lldpLocPortTable
* [lldp_syncd] add new OIDs - lldpLocalSystemData
* enhance code style, set default port descr to ' ' instead of ''
* [lldp_syncd] introduce cache to avoid writing to db every 5s
* review comments - merge conflict, exception handling, pep8
* LLDP_ENTRY_TABLE fix indentation
* review comments - move scrap_output() out of class, return '' for missing system capabilities
* review comments - test for isinstance dict, move _scrap_output()
  • Loading branch information
mykolaf authored and qiluo-msft committed Jul 18, 2018
1 parent 94f2700 commit 37dbf74
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 54 deletions.
1 change: 1 addition & 0 deletions src/lldp_syncd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
logger.addHandler(logging.NullHandler())

from .daemon import LldpSyncDaemon
from .dbsyncd import DBSyncDaemon
204 changes: 151 additions & 53 deletions src/lldp_syncd/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from sonic_syncd import SonicSyncDaemon
from . import logger
from .conventions import LldpPortIdSubtype, LldpChassisIdSubtype
from .conventions import LldpPortIdSubtype, LldpChassisIdSubtype, LldpSystemCapabilitiesMap

LLDPD_TIME_FORMAT = '%H:%M:%S'

Expand Down Expand Up @@ -44,7 +44,8 @@ def parse_time(time_str):
"""
days, hour_min_secs = re.split(LLDPD_UPTIME_RE_SPLIT_PATTERN, time_str)
struct_time = time.strptime(hour_min_secs, LLDPD_TIME_FORMAT)
time_delta = datetime.timedelta(days=int(days), hours=struct_time.tm_hour, minutes=struct_time.tm_min,
time_delta = datetime.timedelta(days=int(days), hours=struct_time.tm_hour,
minutes=struct_time.tm_min,
seconds=struct_time.tm_sec)
return int(time_delta.total_seconds())

Expand All @@ -56,6 +57,7 @@ class LldpSyncDaemon(SonicSyncDaemon):
within the same Redis instance on a switch
"""
LLDP_ENTRY_TABLE = 'LLDP_ENTRY_TABLE'
LLDP_LOC_CHASSIS_TABLE = 'LLDP_LOC_CHASSIS'

@unique
class PortIdSubtypeMap(int, Enum):
Expand Down Expand Up @@ -109,19 +111,54 @@ class ChassisIdSubtypeMap(int, Enum):
# chassis = int(LldpChassisIdSubtype.chassisComponent) # (unsupported by lldpd)
local = int(LldpPortIdSubtype.local)

def get_sys_capability_list(self, if_attributes):
"""
Get a list of capabilities from interface attributes dictionary.
:param if_attributes: interface attributes
:return: list of capabilities
"""
try:
# [{'enabled': ..., 'type': 'capability1'}, {'enabled': ..., 'type': 'capability2'}]
capability_list = if_attributes['chassis'].values()[0]['capability']
# {'enabled': ..., 'type': 'capability'}
if isinstance(capability_list, dict):
capability_list = [capability_list]
except KeyError:
logger.error("Failed to get system capabilities")
return []
return capability_list

def parse_sys_capabilities(self, capability_list, enabled=False):
"""
Get a bit map of capabilities, accoding to textual convention.
:param capability_list: list of capabilities
:param enabled: if true, consider only the enabled capabilities
:return: string representing a bit map
"""
# chassis is incomplete, missing capabilities
if not capability_list:
return ""

sys_cap = 0x00
for capability in capability_list:
try:
if (not enabled) or capability["enabled"]:
sys_cap |= 128 >> LldpSystemCapabilitiesMap[capability["type"].lower()]
except KeyError:
logger.warning("Unknown capability {}".format(capability["type"]))
return "%0.2X 00" % sys_cap

def __init__(self, update_interval=None):
super(LldpSyncDaemon, self).__init__()
self._update_interval = update_interval or DEFAULT_UPDATE_INTERVAL
self.db_connector = SonicV2Connector()
self.db_connector.connect(self.db_connector.APPL_DB)

def source_update(self):
"""
Invoke lldpctl and format as JSON
"""
cmd = ['/usr/sbin/lldpctl', '-f', 'json']
logger.debug("Invoking lldpctl with: {}".format(cmd))
self.chassis_cache = {}
self.interfaces_cache = {}

@staticmethod
def _scrap_output(cmd):
try:
# execute the subprocess command
lldpctl_output = subprocess.check_output(cmd)
Expand All @@ -135,8 +172,22 @@ def source_update(self):
except ValueError:
logger.exception("Failed to parse lldpctl output")
return None
else:
return lldpctl_json

return lldpctl_json

def source_update(self):
"""
Invoke lldpctl and format as JSON
"""
cmd = ['/usr/sbin/lldpctl', '-f', 'json']
logger.debug("Invoking lldpctl with: {}".format(cmd))
cmd_local = ['/usr/sbin/lldpcli', '-f', 'json', 'show', 'chassis']
logger.debug("Invoking lldpcli with: {}".format(cmd_local))

lldp_json = self._scrap_output(cmd)
lldp_json['lldp_loc_chassis'] = self._scrap_output(cmd_local)

return lldp_json

def parse_update(self, lldp_json):
"""
Expand All @@ -148,20 +199,19 @@ def parse_update(self, lldp_json):
LldpRemEntry ::= SEQUENCE {
lldpRemTimeMark TimeFilter,
*lldpRemLocalPortNum LldpPortNumber,
*lldpRemIndex Integer32,
lldpRemLocalPortNum LldpPortNumber,
lldpRemIndex Integer32,
lldpRemChassisIdSubtype LldpChassisIdSubtype,
lldpRemChassisId LldpChassisId,
lldpRemPortIdSubtype LldpPortIdSubtype,
lldpRemPortId LldpPortId,
lldpRemPortDesc SnmpAdminString,
lldpRemSysName SnmpAdminString,
lldpRemSysDesc SnmpAdminString,
*lldpRemSysCapSupported LldpSystemCapabilitiesMap,
*lldpRemSysCapEnabled LldpSystemCapabilitiesMap
lldpRemSysCapSupported LldpSystemCapabilitiesMap,
lldpRemSysCapEnabled LldpSystemCapabilitiesMap
}
"""
# TODO: *Implement
try:
interface_list = lldp_json['lldp'].get('interface') or []
parsed_interfaces = defaultdict(dict)
Expand All @@ -175,12 +225,45 @@ def parse_update(self, lldp_json):
if_attributes = interface_list[if_name]

if 'port' in if_attributes:
parsed_interfaces[if_name].update(self.parse_port(if_attributes['port']))
rem_port_keys = ('lldp_rem_port_id_subtype',
'lldp_rem_port_id',
'lldp_rem_port_desc')
parsed_port = zip(rem_port_keys, self.parse_port(if_attributes['port']))
parsed_interfaces[if_name].update(parsed_port)

if 'chassis' in if_attributes:
parsed_interfaces[if_name].update(self.parse_chassis(if_attributes['chassis']))
rem_chassis_keys = ('lldp_rem_chassis_id_subtype',
'lldp_rem_chassis_id',
'lldp_rem_sys_name',
'lldp_rem_sys_desc')
parsed_chassis = zip(rem_chassis_keys,
self.parse_chassis(if_attributes['chassis']))
parsed_interfaces[if_name].update(parsed_chassis)

# lldpRemTimeMark TimeFilter,
parsed_interfaces[if_name].update({'lldp_rem_time_mark': str(parse_time(if_attributes.get('age')))})
parsed_interfaces[if_name].update({'lldp_rem_time_mark':
str(parse_time(if_attributes.get('age')))})

# lldpRemIndex
parsed_interfaces[if_name].update({'lldp_rem_index': str(if_attributes.get('rid'))})

capability_list = self.get_sys_capability_list(if_attributes)
# lldpSysCapSupported
parsed_interfaces[if_name].update({'lldp_rem_sys_cap_supported':
self.parse_sys_capabilities(capability_list)})
# lldpSysCapEnabled
parsed_interfaces[if_name].update({'lldp_rem_sys_cap_enabled':
self.parse_sys_capabilities(
capability_list, enabled=True)})
if lldp_json['lldp_loc_chassis']:
loc_chassis_keys = ('lldp_loc_chassis_id_subtype',
'lldp_loc_chassis_id',
'lldp_loc_sys_name',
'lldp_loc_sys_desc')
parsed_chassis = zip(loc_chassis_keys,
self.parse_chassis(lldp_json['lldp_loc_chassis']
['local-chassis']['chassis']))
parsed_interfaces['local-chassis'].update(parsed_chassis)

return parsed_interfaces
except (KeyError, ValueError):
Expand All @@ -190,29 +273,25 @@ def parse_chassis(self, chassis_attributes):
try:
if 'id' in chassis_attributes and 'id' not in chassis_attributes['id']:
sys_name = ''
rem_attributes = chassis_attributes
attributes = chassis_attributes
id_attributes = chassis_attributes['id']
else:
(sys_name, rem_attributes) = chassis_attributes.items()[0]
id_attributes = rem_attributes.get('id', '')
(sys_name, attributes) = chassis_attributes.items()[0]
id_attributes = attributes.get('id', '')

chassis_id_subtype = str(self.ChassisIdSubtypeMap[id_attributes['type']].value)
chassis_id = id_attributes.get('value', '')
rem_desc = rem_attributes.get('descr', '')
descr = attributes.get('descr', '')
except (KeyError, ValueError):
logger.exception("Could not infer system information from: {}".format(chassis_attributes))
chassis_id_subtype = chassis_id = sys_name = rem_desc = ''

return {
# lldpRemChassisIdSubtype LldpChassisIdSubtype,
'lldp_rem_chassis_id_subtype': chassis_id_subtype,
# lldpRemChassisId LldpChassisId,
'lldp_rem_chassis_id': chassis_id,
# lldpRemSysName SnmpAdminString,
'lldp_rem_sys_name': sys_name,
# lldpRemSysDesc SnmpAdminString,
'lldp_rem_sys_desc': rem_desc,
}
logger.exception("Could not infer system information from: {}"
.format(chassis_attributes))
chassis_id_subtype = chassis_id = sys_name = descr = ''

return (chassis_id_subtype,
chassis_id,
sys_name,
descr,
)

def parse_port(self, port_attributes):
port_identifiers = port_attributes.get('id')
Expand All @@ -224,33 +303,52 @@ def parse_port(self, port_attributes):
logger.exception("Could not infer chassis subtype from: {}".format(port_attributes))
subtype, value = None

return {
# lldpRemPortIdSubtype LldpPortIdSubtype,
'lldp_rem_port_id_subtype': subtype,
# lldpRemPortId LldpPortId,
'lldp_rem_port_id': value,
# lldpRemSysDesc SnmpAdminString,
'lldp_rem_port_desc': port_attributes.get('descr', '')
}
return (subtype,
value,
port_attributes.get('descr', ''),
)

def cache_diff(self, cache, update):
"""
Find difference in keys between update and local cache dicts
:param cache: Local cache dict
:param update: Update dict
:return: new, changed, deleted keys tuple
"""
new_keys = [key for key in update.keys() if key not in cache.keys()]
changed_keys = list(set(key for key in update.keys() + cache.keys()
if update[key] != cache.get(key)))
deleted_keys = [key for key in cache.keys() if key not in update.keys()]
return new_keys, changed_keys, deleted_keys

def sync(self, parsed_update):
"""
Sync LLDP information to redis DB.
"""
logger.debug("Initiating LLDPd sync to Redis...")

# First, delete all entries from the LLDP_ENTRY_TABLE
client = self.db_connector.redis_clients[self.db_connector.APPL_DB]
pattern = '{}:*'.format(LldpSyncDaemon.LLDP_ENTRY_TABLE)
self.db_connector.delete_all_by_pattern(self.db_connector.APPL_DB, pattern)
# push local chassis data to APP DB
chassis_update = parsed_update.pop('local-chassis')
if chassis_update != self.chassis_cache:
self.db_connector.delete(self.db_connector.APPL_DB,
LldpSyncDaemon.LLDP_LOC_CHASSIS_TABLE)
for k, v in chassis_update.items():
self.db_connector.set(self.db_connector.APPL_DB,
LldpSyncDaemon.LLDP_LOC_CHASSIS_TABLE, k, v, blocking=True)
logger.debug("sync'd: {}".format(json.dumps(chassis_update, indent=3)))

# Repopulate LLDP_ENTRY_TABLE by adding all elements from parsed_update
for interface, if_attributes in parsed_update.items():
new, changed, deleted = self.cache_diff(self.interfaces_cache, parsed_update)
# Delete LLDP_ENTRIES which were modified or are missing
for interface in changed + deleted:
table_key = ':'.join([LldpSyncDaemon.LLDP_ENTRY_TABLE, interface])
self.db_connector.delete(self.db_connector.APPL_DB, table_key)
# Repopulate LLDP_ENTRY_TABLE by adding all changed elements
for interface in changed + new:
if re.match(SONIC_ETHERNET_RE_PATTERN, interface) is None:
logger.warning("Ignoring interface '{}'".format(interface))
continue
for k, v in if_attributes.items():
# port_table_key = LLDP_ENTRY_TABLE:INTERFACE_NAME;
table_key = ':'.join([LldpSyncDaemon.LLDP_ENTRY_TABLE, interface])
# port_table_key = LLDP_ENTRY_TABLE:INTERFACE_NAME;
table_key = ':'.join([LldpSyncDaemon.LLDP_ENTRY_TABLE, interface])
for k, v in parsed_update[interface].items():
self.db_connector.set(self.db_connector.APPL_DB, table_key, k, v, blocking=True)
logger.debug("sync'd: \n{}".format(json.dumps(if_attributes, indent=3)))
logger.debug("sync'd: \n{}".format(json.dumps(parsed_update[interface], indent=3)))
55 changes: 55 additions & 0 deletions src/lldp_syncd/dbsyncd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import subprocess
from swsssdk import ConfigDBConnector

from sonic_syncd import SonicSyncDaemon
from . import logger


class DBSyncDaemon(SonicSyncDaemon):
"""
A Thread that listens to changes in CONFIG DB,
and contains handlers to configure lldpd accordingly.
"""

def __init__(self):
super(DBSyncDaemon, self).__init__()
self.config_db = ConfigDBConnector()
self.config_db.connect()
logger.info("[lldp dbsyncd] Connected to configdb")
self.port_table = {}

def run_command(self, command):
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
stdout = p.communicate()[0]
p.wait()
if p.returncode != 0:
logger.error("[lldp dbsyncd] command execution returned {}. "
"Command: '{}', stdout: '{}'".format(p.returncode, command, stdout))

def port_handler(self, key, data):
"""
Handle updates in 'PORT' table.
"""
# we're interested only in description for now
if self.port_table[key].get("description") != data.get("description"):
new_descr = data.get("description", " ")
logger.info("[lldp dbsyncd] Port {} description changed to {}."
.format(key, new_descr))
self.run_command("lldpcli configure lldp portidsubtype local {} description '{}'"
.format(key, new_descr))
# update local cache
self.port_table[key] = data

def run(self):
self.port_table = self.config_db.get_table('PORT')
# supply LLDP_LOC_ENTRY_TABLE and lldpd with correct values on start
for port_name, attributes in self.port_table.items():
self.run_command("lldpcli configure lldp portidsubtype local {} description '{}'"
.format(port_name, attributes.get("description", " ")))

# subscribe for further changes
self.config_db.subscribe('PORT', lambda table, key, data:
self.port_handler(key, data))

logger.info("[lldp dbsyncd] Subscribed to configdb PORT table")
self.config_db.listen()
4 changes: 4 additions & 0 deletions src/lldp_syncd/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from . import logger
from .daemon import LldpSyncDaemon
from .dbsyncd import DBSyncDaemon

DEFAULT_UPDATE_FREQUENCY = 10

Expand All @@ -8,8 +9,11 @@ def main(update_frequency=None):
try:
lldp_syncd = LldpSyncDaemon(update_frequency or DEFAULT_UPDATE_FREQUENCY)
logger.info('Starting SONiC LLDP sync daemon...')
dbsyncd = DBSyncDaemon()
lldp_syncd.start()
dbsyncd.start()
lldp_syncd.join()
dbsyncd.join()
except KeyboardInterrupt:
logger.info("ctrl-C captured, shutting down.")
except Exception:
Expand Down
Loading

0 comments on commit 37dbf74

Please sign in to comment.