Source code for taf.testlib.linux.lldp.lldptool

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``lldptool.py``

"""

import argparse
import os

import sys
import re
import pprint
from collections import defaultdict

LLDPAD_SERVICE = "lldpad"   # this is "boot.lldpad" on SLES
LLDPAD_PATH = "/usr/sbin/lldpad"   # this is "boot.lldpad" on SLES
LLDPTOOL = "lldptool"
LLDPAD_CONFIG_FILE = "/var/lib/lldpad/lldpad.conf"


[docs]class TlvNames(object): CHASSIS_ID = "Chassis ID TLV" PORT_ID = "Port ID TLV" TIME_TO_LIVE = "Time to Live TLV" PORT_DESCRIPTION = "Port Description TLV" SYSTEM_NAME = "System Name TLV" SYSTEM_DESCRIPTION = "System Description TLV" SYSTEM_CAPABILITIES = "System Capabilities TLV" MANAGEMENT_ADDRESS = "Management Address TLV" LLDP_MED_CAPABILITIES = "LLDP-MED Capabilities TLV" LLDP_MED_HARDWARE_REVISION = "LLDP-MED Hardware Revision TLV" LLDP_MED_FIRMWARE_REVISION = "LLDP-MED Firmware Revision TLV" LLDP_MED_SOFTWARE_REVISION = "LLDP-MED Software Revision TLV" LLDP_MED_SERIAL_NUMBER = "LLDP-MED Serial Number TLV" LLDP_MED_MANUFACTURER_NAME = "LLDP-MED Manufacturer Name TLV" LLDP_MED_MODEL_NAME = "LLDP-MED Model Name TLV" LLDP_MED_ASSET_ID = "LLDP-MED Asset ID TLV" MAC_PHY_CONFIGURATION_STATUS = "MAC/PHY Configuration Status TLV" LINK_AGGREGATION = "Link Aggregation TLV" MAXIMUM_FRAME_SIZE = "Maximum Frame Size TLV" EVB_DRAFT_0_2_CONFIGURATION = "EVB draft 0.2 Configuration TLV" IEEE_8021QAZ_ETS_CONFIGURATION = "IEEE 8021QAZ ETS Configuration TLV" IEEE_8021QAZ_PFC = "IEEE 8021QAZ PFC TLV" CEE_DCBX = "CEE DCBX TLV" END_OF_LLDPDU = "End of LLDPDU TLV"
lldptool_tni = """ Chassis ID TLV \tIPv4: 10.0.0.150 Port ID TLV \tIfname: Te 0/13 Time to Live TLV \t120 Port Description TLV \tTe 0/13 System Name TLV \tbroc150_jack System Description TLV \tCEE Switch System Capabilities TLV \tSystem capabilities: Bridge, Router \tEnabled capabilities: Bridge Management Address TLV \tIPv4: 10.0.0.150 Unknown interface subtype: 0 CEE DCBX TLV \tControl TLV: \t SeqNo: 1, AckNo: 3 \tPriority Groups TLV: \t Enabled, Not Willing, No Error \t PGID Priorities: 0:[0,1,5,7] 1:[3] 2:[4] 3:[2] 4:[6] \t PGID Percentages: 0:20% 1:10% 2:40% 3:20% 4:10% 5:0% 6:0% 7:0% \t Number of TC's supported: 8 \tPriority Flow Control TLV: \t Enabled, Not Willing, No Error \t PFC enabled priorities: 2, 3, 4, 6 \t Number of TC's supported: 8 \tApplication TLV: \t Enabled, Not Willing, No Error \t Ethertype: 0x8906, Priority Map: 0x04 \t TCP/UDP Port: 0x0cbc, Priority Map: 0x10 \tUnknown DCBX sub-TLV: 0000800080 \tUnknown DCBX sub-TLV: 0000800180 End of LLDPDU TLV """ cee_sub_tlv = """ CEE DCBX TLV \tControl TLV: \t SeqNo: 1, AckNo: 3 \tPriority Groups TLV: \t Enabled, Not Willing, No Error \t PGID Priorities: 0:[0,1,5,7] 1:[3] 2:[4] 3:[2] 4:[6] \t PGID Percentages: 0:20% 1:10% 2:40% 3:20% 4:10% 5:0% 6:0% 7:0% \t Number of TC's supported: 8 \tPriority Flow Control TLV: \t Enabled, Not Willing, No Error \t PFC enabled priorities: 2, 3, 4, 6 \t Number of TC's supported: 8 \tApplication TLV: \t Enabled, Not Willing, No Error \t Ethertype: 0x8906, Priority Map: 0x04 \t TCP/UDP Port: 0x0cbc, Priority Map: 0x10 \tUnknown DCBX sub-TLV: 0000800080 \tUnknown DCBX sub-TLV: 0000800180 """ lldptool_ti = """Chassis ID TLV \tMAC: 00:1b:21:87:ac:7d Port ID TLV \tMAC: 00:1b:21:87:ac:7d Time to Live TLV \t120 CEE DCBX TLV \tControl TLV: \t SeqNo: 8, AckNo: 2 \tPriority Groups TLV: \t Enabled, Willing, No Error \t PGID Priorities: 0:[0] 1:[1] 2:[2] 3:[3] 4:[4] 5:[5] 6:[6] 7:[7] \t PGID Percentages: 0:13% 1:13% 2:13% 3:13% 4:12% 5:12% 6:12% 7:12% \t Number of TC's supported: 8 \tPriority Flow Control TLV: \t Enabled, Willing, No Error \t PFC enabled priorities: none \t Number of TC's supported: 8 \tApplication TLV: \t Enabled, Willing, No Error \t Ethertype: 0x8906, Priority Map: 0x08 \t TCP/UDP Port: 0x0cbc, Priority Map: 0x10 \t Ethertype: 0x8914, Priority Map: 0x0f End of LLDPDU TLV """ ieee_lldptool_tni = ieee_lldptool_ti = """Chassis ID TLV \tMAC: a0:36:9f:0b:3f:9c Port ID TLV \tMAC: a0:36:9f:0b:3f:9c Time to Live TLV \t120 IEEE 8021QAZ ETS Configuration TLV \t Willing: no \t CBS: not supported \t MAX_TCS: 4 \t PRIO_MAP: 0:0 1:0 2:1 3:1 4:2 5:2 6:3 7:3 \t TC Bandwidth: 25% 25% 25% 25% 0% 0% 0% 0% \t TSA_MAP: 0:ets 1:ets 2:ets 3:ets 4:strict 5:strict 6:strict 7:strict IEEE 8021QAZ ETS Recommendation TLV \t PRIO_MAP: 0:0 1:0 2:1 3:1 4:2 5:2 6:3 7:3 \t TC Bandwidth: 25% 25% 25% 25% 0% 0% 0% 0% \t TSA_MAP: 0:ets 1:ets 2:ets 3:ets 4:strict 5:strict 6:strict 7:strict IEEE 8021QAZ PFC TLV \t Willing: no \t MACsec Bypass Capable: no \t PFC capable traffic classes: 4 \t PFC enabled: 1 2 5 7 IEEE 8021QAZ APP TLV \tApp#0: \t Priority: 2 \t Sel: 2 \t {S}TCP Port: 3260 \tApp#1: \t Priority: 5 \t Sel: 1 \t Ethertype: 0x8906 End of LLDPDU TLV """ lldptool_tni_with_oid = """ Chassis ID TLV \tMAC: 5c:26:0a:f1:c5:73 Port ID TLV \tIfname: Te1/0/2 Time to Live TLV \t120 Port Description TLV \tTe1/0/2 System Name TLV \tpowerconnect-155 System Description TLV \tPowerconnect 8024, 4.2.0.4, VxWorks 6.6 System Capabilities TLV \tSystem capabilities: Bridge, Router \tEnabled capabilities: Bridge Management Address TLV \tIPv4: 10.0.0.155 \tIfindex: 145 \tOID: 1.3.6.1.4.1.674.10895.3023 CEE DCBX TLV \tControl TLV: \t SeqNo: 1, AckNo: 2 \tPriority Flow Control TLV: \t Enabled, Not Willing, No Error \t PFC enabled priorities: 3, 4 \t Number of TC's supported: 2 \tPriority Groups TLV: \t Enabled, Not Willing, No Error \t PGID Priorities: 0:[0,1,2,5,6,7] 1:[3] 2:[4] \t PGID Percentages: 0:25% 1:30% 2:45% 3:0% 4:0% 5:0% 6:0% 7:0% \t Number of TC's supported: 3 \tApplication TLV: \t Enabled, Not Willing, No Error \t TCP/UDP Port: 0x0cbc, Priority Map: 0x10 \t Ethertype: 0x8906, Priority Map: 0x08 End of LLDPDU TLV """ unknown_tlvs = """\ CEE DCBX TLV \tControl TLV: \t SeqNo: 1, AckNo: 16777216 \tApplication TLV: \t Enabled, Not Willing, No Error \t Ethertype: 0x8906, Priority Map: 0x08 \tPriority Flow Control TLV: \t Enabled, Not Willing, No Error \t PFC enabled priorities: 3 \t Number of TC's supported: 8 \tPriority Groups TLV: \t Enabled, Not Willing, No Error \t PGID Priorities: 0:[0,1,2,4,5,6,7] 1:[3] \t PGID Percentages: 0:50% 1:50% 2:0% 3:0% 4:0% 5:0% 6:0% 7:0% \t Number of TC's supported: 2 Unidentified Org Specific TLV \tOUI: 0x000142, Subtype: 1, Info: 01 Unidentified Org Specific TLV \tOUI: 0x0080c2, Subtype: 1, Info: 0001 End of LLDPDU TLV """ lldptool_tni_evb_med = """\ Chassis ID TLV \tMAC: 90:e2:ba:75:27:08 Port ID TLV \tMAC: 90:e2:ba:75:27:08 Time to Live TLV \t120 System Capabilities TLV \tSystem capabilities: Station Only \tEnabled capabilities: Station Only LLDP-MED Capabilities TLV \tDevice Type: class1 \tCapabilities: LLDP-MED, Inventory LLDP-MED Hardware Revision TLV \tA09 LLDP-MED Firmware Revision TLV \t3.0.0 LLDP-MED Software Revision TLV \t3.11.0-15-generic LLDP-MED Serial Number TLV \t44454C4C-5700-1037-8035-B2C04F47 LLDP-MED Manufacturer Name TLV \tDell Inc. LLDP-MED Model Name TLV \tPowerEdge R710 LLDP-MED Asset ID TLV \t2W75GQ1 MAC/PHY Configuration Status TLV \tAuto-negotiation supported and enabled \tPMD auto-negotiation capabilities: 0x8001 \tMAU type: Unknown [0x0000] EVB draft 0.2 Configuration TLV \tsupported forwarding mode: (0x80) standard 802.1Q \tsupported capabilities: (0x7) RTE ECP VDP \tconfigured forwarding mode: (00) \tconfigured capabilities: (00) \tno. of supported VSIs: 0000 \tno. of configured VSIs: 0000 \tRTE: 15 IEEE 8021QAZ ETS Configuration TLV \t Willing: yes \t CBS: not supported \t MAX_TCS: 8 \t PRIO_MAP: 0:0 1:0 2:0 3:0 4:0 5:0 6:0 7:0 \t TC Bandwidth: 0% 0% 0% 0% 0% 0% 0% 0% \t TSA_MAP: 0:strict 1:strict 2:strict 3:strict 4:strict 5:strict 6:strict 7:strict IEEE 8021QAZ PFC TLV \t Willing: yes \t MACsec Bypass Capable: no \t PFC capable traffic classes: 8 \t PFC enabled: none End of LLDPDU TLV """ COLON_SPLIT_NO_WHITESPACE_RE = re.compile(r"\s*([^:]+)\s*:\s*(.*)\s*")
[docs]def comma_sep_ints(s): # autodetect int base because we may get 0x04 return tuple(int(p, 0) for p in s.split(','))
[docs]def yes_no_to_bool(s): return s == 'yes'
[docs]def tsa_value(s): key_vals = find_all_colon_key_vals(s) return dict((int(k), v) for k, v in key_vals)
[docs]def up2tc_value(s): key_vals = find_all_colon_key_vals(s) return dict((int(k), int(v)) for k, v in key_vals)
[docs]class State(object): def __init__(self, parser): self.parser = parser
[docs]class Start(State):
[docs] def tlv(self, l): self.parser.tlv_state.set_tlv_type(l) self.parser.state = self.parser.tlv_state
[docs] def non_tlv(self, l): # save error TLV self.parser.tlv_list.append((l, None)) return self
[docs] def tlv_data(self, _): self.parser.state = self.parser.error_state
[docs] def end_tlv(self, l): self.parser.tlv_list.append((l, None)) self.parser.state = self.parser.end_state
[docs]class TLV(State): def __init__(self, parser): State.__init__(self, parser) self.tlv_type = None self.name = None
[docs] def set_tlv_type(self, l): self.name = l
[docs] def tlv(self, _): self.tlv_type = None self.parser.state = self.parser.error_state
[docs] def non_tlv(self, _): self.tlv_type = None self.parser.state = self.parser.error_state
[docs] def tlv_data(self, l): self.parser.tlv_data_state.start_tlv(self.name, l) self.parser.state = self.parser.tlv_data_state
[docs] def end_tlv(self, _): self.parser.state = self.parser.error_state
[docs]class TLVData(State): def __init__(self, parser): State.__init__(self, parser) self.tlv_type = None self.tlv_values = None
[docs] def start_tlv(self, name, l): self.tlv_type = name self.tlv_values = [l]
[docs] def tlv(self, l): self.parser.add_tlv((self.tlv_type, self.tlv_values)) self.tlv_type = l self.tlv_values = [] self.parser.state = self.parser.tlv_data_state
[docs] def non_tlv(self, l): self.parser.add_tlv((self.tlv_type, self.tlv_values)) self.tlv_type = None self.tlv_values = [] self.parser.tlv_list.append((l, None)) self.parser.state = self.parser.start_state
[docs] def tlv_data(self, l): self.tlv_values.append(l)
[docs] def end_tlv(self, l): self.parser.add_tlv((self.tlv_type, self.tlv_values)) self.tlv_type = None self.tlv_values = [] self.parser.tlv_list.append((l, None)) self.parser.state = self.parser.end_state
[docs]class ErrorState(State): pass
[docs]class EndState(State): pass
[docs]class Parser(object): def __init__(self, tlv_handlers): self.tlv_handlers = tlv_handlers self.tlv_list = [] self.start_state = Start(self) self.tlv_state = TLV(self) self.tlv_data_state = TLVData(self) self.error_state = ErrorState(self) self.end_state = EndState(self) self.state = self.start_state # be careful we use \n TLV_RE = re.compile(r"^(\S" "[^\n]+)TLV") TLV_DATA_RE = re.compile(r"^\s+" "([^\n]+)") NON_TLV_RE = re.compile(r"^(\S" "[^\n]+)(?!TLV)") END_TLV_RE = re.compile("^End of LLDPDU TLV")
[docs] def add_tlv(self, tlv): res = self.tlv_handlers.get(tlv[0], subtype_handler)(tlv[1]) self.tlv_list.append((tlv[0], res))
[docs] def parse(self, lines): self.state = self.start_state self.tlv_list = [] # import pdb ; pdb.set_trace() if len(sys.argv) > 1 and sys.argv[1] == "rpdb": import rpdb2 rpdb2.start_embedded_debugger('foo', True, True) if isinstance(lines, str): lines = lines.splitlines() for l in lines: if self.state == self.error_state: raise SyntaxError(l) elif self.state == self.end_state: return self.tlv_list elif self.END_TLV_RE.match(l): self.state.end_tlv(l) elif self.TLV_RE.match(l): self.state.tlv(l) elif self.NON_TLV_RE.match(l): self.state.non_tlv(l) elif self.TLV_DATA_RE.match(l): self.state.tlv_data(l) if self.state == self.end_state: return self.tlv_list else: raise SyntaxError("EOF without End of LLDPDU TLV")
[docs]def control_tlv_handler(val): values = {} for line in val: values.update( [COLON_SPLIT_NO_WHITESPACE_RE.search(v).groups() for v in line.split(',')]) return values
[docs]def parse_enabled_willing_error(val): enabled, willing, error = [s.strip() for s in val.split(',')] values = {'Enable': (enabled == 'Enabled'), 'Willing': (willing == 'Willing'), 'Errors': error} return values
[docs]def pgid_priorities(pgid_prios): pgids = re.findall(r'(\d):\[([^]]+)\]', pgid_prios) pgid_map = dict((int(k), (int(sp) for sp in v.split(','))) for k, v in pgids) up2tc = {} for tc, ups in pgid_map.items(): for up in ups: up2tc[up] = tc return up2tc
[docs]def pgid_percentages(pgid_percents): percentages = re.findall(r'\d:(\d+)%', pgid_percents) return tuple(int(p) for p in percentages)
[docs]def priority_group_tlv_handler(val): handlers = { 'PGID Priorities': pgid_priorities, 'PGID Percentages': pgid_percentages, "Number of TC's supported": int, } values = {} values.update(parse_enabled_willing_error(val[0])) for v in val[1:]: tlv_name, tlv_val = v.split(":", 1) values[tlv_name] = handlers[tlv_name](tlv_val) return values
[docs]def priority_flow_control_tlv_handler(val): handlers = { 'PFC enabled priorities': priority_flow_control_enabled_handler, "Number of TC's supported": int, } values = {} values.update(parse_enabled_willing_error(val[0])) for v in val[1:]: tlv_name, tlv_val = v.split(":", 1) values[tlv_name] = handlers[tlv_name](tlv_val) return values
[docs]def priority_flow_control_enabled_handler(pfc_enabled): enabled_prios = dict((up, False) for up in range(8)) enabled_prios.update((int( p.strip()), True) for p in pfc_enabled.split(",") if 'none' not in p) return enabled_prios
[docs]def ieee_priority_flow_control_enabled_handler(pfc_enabled): enabled_prios = dict((up, False) for up in range(8)) enabled_prios.update((int( p.strip()), True) for p in pfc_enabled.split() if 'none' not in p) return enabled_prios
[docs]def application_tlv_handler(val): # Application TLV: # Enabled, Not Willing, No Error # Ethertype: 0x8906, Priority Map: 0x04 # TCP/UDP Port: 0x0cbc, Priority Map: 0x10 handlers = { 'Ethertype': int_auto_base, 'TCP/UDP Port': int_auto_base, } values = {} values.update(parse_enabled_willing_error(val[0])) selectors = defaultdict(dict) for app in val[1:]: selector, priority_map = [tuple( s.strip() for s in v.split(':', 1)) for v in app.split(',')] # values[selector]. converted_selector = (selector[0], handlers[selector[0]](selector[1])) selectors[converted_selector] = int(priority_map[1], 16) # if selector[0] == "Ethertype" and selector[1] == '0x8906': # pass # elif selector[0] == "TCP/UDP Port" and selector[1] == '0x0cbc': # pass values['Applications'] = dict(selectors) return values
SUB_TLV_HANDLERS = { 'Control TLV:': control_tlv_handler, 'Priority Groups TLV:': priority_group_tlv_handler, 'Priority Flow Control TLV:': priority_flow_control_tlv_handler, 'Application TLV:': application_tlv_handler, }
[docs]class CEESubTLVParser(object): def __init__(self, tlv_handlers=None): if tlv_handlers is None: self.tlv_handlers = SUB_TLV_HANDLERS else: self.tlv_handlers = tlv_handlers self.tlv_list = [] self.start_state = Start(self) self.tlv_state = TLV(self) self.tlv_data_state = TLVData(self) self.error_state = ErrorState(self) self.end_state = EndState(self) self.state = self.start_state # be careful mixing \t and \s TLV_RE = re.compile("^(?:\t| {8})" r"\S" "[^\n]+TLV") TLV_DATA_RE = re.compile("^(?:\t| {8})" r"\s+\S" "[^\n]+")
[docs] def add_tlv(self, tlv): res = self.tlv_handlers.get(tlv[0], subtype_handler)(tlv[1]) self.tlv_list.append((tlv[0], res))
[docs] def parse(self, lines): self.state = self.start_state self.tlv_list = [] # raw text TLV for already split into lines if os.linesep in lines: lines = lines.splitlines() for l in lines: if self.state == self.error_state: raise SyntaxError(l) elif self.TLV_RE.match(l): # use lstrip() because we just use the re for matching and # discard the groups self.state.tlv(l.lstrip()) elif self.TLV_DATA_RE.match(l): self.state.tlv_data(l.lstrip()) # fake an end tlv self.state.end_tlv('') return self.tlv_list[:-1]
[docs]class IEEEAppTLVParser(object): def __init__(self, tlv_handlers=None): if tlv_handlers is None: self.tlv_handlers = {} else: self.tlv_handlers = tlv_handlers self.tlv_list = [] self.start_state = Start(self) self.tlv_state = TLV(self) self.tlv_data_state = TLVData(self) self.error_state = ErrorState(self) self.end_state = EndState(self) self.state = self.start_state # be careful mixing \t and \s self.tlv_re = re.compile("^(?:\t| {8})" r"App#\d+:") self.tlv_data_re = re.compile("^(?:\t| {8})" r"\s+\S" "[^\n]+")
[docs] def add_tlv(self, tlv): res = self.tlv_handlers.get(tlv[0], subtype_handler)(tlv[1]) # TODO: We have a loop in the parser, so it is adding duplicates # for now prevent appending duplicates if res: if (tlv[0], res) not in self.tlv_list: self.tlv_list.append((tlv[0], res))
[docs] def parse(self, lines): self.state = self.start_state # raw text TLV for already split into lines if '\n' in lines: lines = lines.splitlines() for l in lines: if self.state == self.error_state: raise SyntaxError(l) elif self.tlv_re.match(l): self.state.tlv(l.lstrip()) elif self.tlv_data_re.match(l): self.state.tlv_data(l.lstrip()) # fake an end tlv self.state.end_tlv('') return [tlv for tlv in self.tlv_list[:-1] if tlv[1] is not None]
[docs]def non_subtype_handler(tlvs): return [t.strip() for t in tlvs]
[docs]def subtype_handler(tlvs): """ Args: tlvs Returns: list: list of subtype, value pairs """ values = [] for tlv in tlvs: if ':' in tlv: values.append([s.strip() for s in tlv.split(':', 1)]) else: values.append(tlv.strip()) return values
[docs]def sub_tlv_as_dict_handler_factory(sub_tlv_handlers): def save_tlvs_as_dict(tlvs): values = {} for tlv in tlvs: if ':' in tlv: name, value = COLON_SPLIT_NO_WHITESPACE_RE.search(tlv).groups() converted_value = sub_tlv_handlers.get( name, lambda x: x)(value) values[name] = converted_value else: v = tlv.strip() values[v] = v return values return save_tlvs_as_dict
IEEE_PFC_HANDLERS = { "PFC enabled": ieee_priority_flow_control_enabled_handler, "PFC capable traffic classes": int, "Willing": yes_no_to_bool, "MACsec Bypass Capable": yes_no_to_bool, }
[docs]def cee_sub_tlv_handler_into_list(val): p = CEESubTLVParser() return p.parse(val)
[docs]def cee_sub_tlv_handler(val): p = CEESubTLVParser() return dict(p.parse(val))
[docs]def ieee_app_tlv_handler(val): p = IEEEAppTLVParser() return dict(p.parse(val))
TLV_HANDLERS = { TlvNames.SYSTEM_NAME: non_subtype_handler, TlvNames.SYSTEM_DESCRIPTION: non_subtype_handler, TlvNames.PORT_DESCRIPTION: non_subtype_handler, TlvNames.CEE_DCBX: cee_sub_tlv_handler, 'IEEE 8021QAZ ETS Configuration TLV': sub_tlv_as_dict_handler_factory({}), 'IEEE 8021QAZ ETS Recommendation TLV': sub_tlv_as_dict_handler_factory({}), 'IEEE 8021QAZ PFC TLV': sub_tlv_as_dict_handler_factory(IEEE_PFC_HANDLERS), 'IEEE 8021QAZ APP TLV': IEEEAppTLVParser().parse, }
[docs]def parse_into_list(s): p = Parser(TLV_HANDLERS) return p.parse(s)
[docs]def parse(s): p = Parser(TLV_HANDLERS) return dict(p.parse(s))
[docs]def int_auto_base(n): """Convert string to an int, automatically detecting the base. """ return int(n, base=0)
[docs]class AppCollector(object): def __init__(self): super(AppCollector, self).__init__() self.apps = {} def __call__(self, *args, **kwargs): self.apps[comma_sep_ints(args[0])] = True return self.apps
SET_FIELD_NAMES_TRANSFORM = { 'enableTx': "advertise", }
[docs]def parse_set(s): set_fields = { 'enabled': priority_flow_control_enabled_handler, 'willing': yes_no_to_bool, 'enableTx': yes_no_to_bool, 'tsa': tsa_value, 'up2tc': up2tc_value, 'tcbw': comma_sep_ints, 'app': AppCollector(), } parser = argparse.ArgumentParser(prog="lldptool") parser.add_argument( "-g", action="store", default=False, dest="bridge_scope") parser.add_argument("-n", choices=("nb", "ncb", "nntpmrb", "nearest_bridge", "neareast_customer_bridge", "nearest_nontpmr_bridge"), dest="neighboor") parser.add_argument( "-T", action="store_true", default=False, dest="set_tlv") parser.add_argument( "-t", action="store_true", default=False, dest="get_tlv") parser.add_argument( "-L", action="store_true", default=False, dest="set_lldp") parser.add_argument( "-l", action="store_true", default=False, dest="get_lldp") parser.add_argument( "-r", action="store_true", default=False, dest="raw_client") parser.add_argument( "-R", action="store_true", default=False, dest="only_raw_client") parser.add_argument( "-c", action="store_true", default=False, dest="query_config") parser.add_argument("-i", action="store", dest="interface") parser.add_argument("-V", action="store", dest="tlvid") opts = parser.parse_args(s.split()) args = s[1:] # by default set max_tcs to 8, we can't even set it on the command line vals = {'max_tcs': 8, "interface": opts.interface} if not opts.query_config: for name, val in (t.split('=') for t in args): vals[SET_FIELD_NAMES_TRANSFORM.get( # TODO: AppCollector needs to be cleared # TODO: can we have multiple for every field? name, name)] = set_fields[name](val) return vals
[docs]def tlv_name_to_python_const(t): return re.sub("_TLV$", "", re.sub("[ ./-]", "_", t.upper()))
test_parse = [ ('Chassis ID TLV', [['IPv6', 'fe80::92e2:baff:fe75:2708']]), ('Port ID TLV', [['MAC', '90:e2:ba:75:27:08']]), ('Time to Live TLV', ['120']), ('Port Description TLV', ['Interface 6 as eth4']), ('System Name TLV', ['terminator13']), ('System Description TLV', [ 'Linux terminator13 3.11.0-15-generic #25~precise1-Ubuntu SMP ' 'Thu Jan 30 17:39:31 UTC 2014 x86_64']), ('System Capabilities TLV', [['System capabilities', 'Station Only'], ['Enabled capabilities', 'Station Only']]), ('Management Address TLV', [['IPv6', 'fe80::92e2:baff:fe75:2708'], ['Ifindex', '6']]), ('LLDP-MED Capabilities TLV', [['Device Type', 'class1'], ['Capabilities', 'LLDP-MED, Inventory']]), ('LLDP-MED Hardware Revision TLV', ['A09']), ('LLDP-MED Firmware Revision TLV', ['3.0.0']), ('LLDP-MED Software Revision TLV', ['3.11.0-15-generic']), ('LLDP-MED Serial Number TLV', ['44454C4C-5700-1037-8035-B2C04F47']), ('LLDP-MED Manufacturer Name TLV', ['Dell Inc.']), ('LLDP-MED Model Name TLV', ['PowerEdge R710']), ('LLDP-MED Asset ID TLV', ['2W75GQ1']), ('MAC/PHY Configuration Status TLV', ['Auto-negotiation supported and enabled', ['PMD auto-negotiation capabilities', '0x8001'], ['MAU type', 'Unknown [0x0000]']]), ('Link Aggregation TLV', ['Aggregation not capable', 'Currently not aggregated', ['Aggregated Port ID', '0']]), ('Maximum Frame Size TLV', ['1518']), ('IEEE 8021QAZ ETS Configuration TLV', {'CBS': 'not supported', 'MAX_TCS': '8', 'PRIO_MAP': '0:0 1:0 2:0 3:0 4:0 5:0 6:0 7:0 ', 'TC Bandwidth': '0% 0% 0% 0% 0% 0% 0% 0% ', 'TSA_MAP': '0:strict 1:strict 2:strict 3:strict 4:strict 5:strict ' '6:strict 7:strict ', 'Willing': 'yes'}), ('IEEE 8021QAZ PFC TLV', {'MACsec Bypass Capable': False, 'PFC capable traffic classes': 8, 'PFC enabled': {0: False, 1: False, 2: False, 3: False, 4: False, 5: False, 6: False, 7: False}, 'Willing': True}), ('End of LLDPDU TLV', None)] if __name__ == '__main__': # print parse(" Chassis ID TLV\nTLV\n") _p = Parser(TLV_HANDLERS) try: with open(sys.argv[1]) as infile: pprint.pprint(_p.parse(infile.read())) except (IndexError, IOError) as e: # pprint.pprint(_p.parse(lldptool_ti)) # pprint.pprint(_p.parse(ieee_lldptool_ti)) # pprint.pprint(_p.parse(lldptool_tni)) # pprint.pprint(_p.parse(lldptool_tni_with_oid)) # pprint.pprint(_p.parse(lldptool_tni_evb_med)) # for _t, _v in _p.parse(lldptool_tni_with_oid): # print '%s = "%s"' % (tlv_name_to_python_const(_t), _t) for _t, _v in test_parse: print('%s = "%s"' % (tlv_name_to_python_const(_t), _t)) # p = CEESubTLVParser({}) # pprint.pprint(_p.parse(cee_sub_tlv)) lldp_parser = Parser(TLV_HANDLERS)
[docs]def find_all_colon_key_vals(s): return re.findall(r'(\d):(\w+)', s)