Source code for taf.testlib.snmpcmd

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``snmpcmd.py``

`Module for SNMP specific functionality`

"""

import re
import time

from pyasn1.type import univ
from pysnmp.smi import builder, view
from pysnmp.entity.rfc3413.oneliner import cmdgen

from . import helpers
from . import loggers


[docs]class SNMPCmd(object): """`SNMP specific functionality class. Args: config(list[dict]): environment config env_switches(dict): switches dictionary in format {switch_id: switch_object} mib_dir(str): MIB module name """ suite_logger = loggers.ClassLogger()
[docs] def __init__(self, config, env_switches, mib_dir): """Initialize SNMPCmd class Args: config(list[dict]): environment config env_switches(dict): switches dictionary in format {switch_id: switch_object} mib_dir(str): MIB module name """ self.switches = {} # get community info from config file: for conf in config: if 'get_community' in conf: self.get_community = conf['get_community'] self.mib_dir = mib_dir if 'set_community' in conf: self.set_community = conf['set_community'] # get switches ip addresses and ports for switch_id in list(env_switches.keys()): sw_ipaddr = env_switches[switch_id].ipaddr if 'sshtun_port' in env_switches[switch_id].config: sw_port = 161 if env_switches[switch_id].config['sshtun_port'] != 22: sw_ipaddr = "10.10.{0}.{1}".format(*str(env_switches[switch_id].config['sshtun_port']).split('0')) else: sw_port = int(env_switches[switch_id].port) - 8080 + 4700 self.switches.update({switch_id: {'host': sw_ipaddr, 'port': sw_port}}) self.mib_builder = builder.MibBuilder() mib_path = self.mib_builder.getMibPath() + (mib_dir, ) self.mib_builder.setMibPath(*mib_path) # self.suite_logger.debug("mib_builder __modPathsSeen: %s" % (self.mib_builder._MibBuilder__modPathsSeen, )) # self.suite_logger.debug("mib_builder __modSeen: %s" % (self.mib_builder._MibBuilder__modSeen, )) self.mibViewController = view.MibViewController(self.mib_builder) # loading SNMP types as instances self.suite_logger.debug("Loading basic types from standard MIB modules") self.OctetString, Integer = self.mib_builder.importSymbols('ASN1', 'OctetString', 'Integer')[0:2] Counter32, Unsigned32, Counter64 = self.mib_builder.importSymbols('SNMPv2-SMI', 'Counter32', 'Unsigned32', 'Counter64')[0:3] InetAddressType, self.InetAddress, InetAddressIPv4, InetAddressIPv6, InetAddressIPv4z, InetAddressIPv6z, InetAddressDNS = \ self.mib_builder.importSymbols('INET-ADDRESS-MIB', 'InetAddressType', 'InetAddress', 'InetAddressIPv4', 'InetAddressIPv6', 'InetAddressIPv4z', 'InetAddressIPv6z', 'InetAddressDNS')[0:7] self.__integer = Integer() self.__counter32 = Counter32() self.__unsigned32 = Unsigned32() self.__counter64 = Counter64() self.__octetString = self.OctetString() # creating InetAddress types dict with keys corresponded to InetAddressType named values self.InetAddresses = {'ipv4': InetAddressIPv4(), 'ipv6': InetAddressIPv6(), 'ipv4z': InetAddressIPv4z(), 'ipv6z': InetAddressIPv6z(), 'dns': InetAddressDNS()}
[docs] def _find_and_load_mib(self, mibs_dict, sym_name): """Find MIB name and load it to MibBuilder. Args: mibs_dict(dict): dictionary that contains MIBs. sym_name(str): MIB symbol name Returns: str: Name of MIB in which symbol name is. 'None' if MIB's name wasn't found. Examples:: self._find_and_load_mib(helpers.MIBS_DICT, 'onsSwitchppControlRouteInterfaceMtu') """ # searching MIB name for specified symbol name in specified MIBs dictionary mod_name = next((name for name, values in mibs_dict.items() if sym_name in values), None) if not mod_name: # symbol name wasn't found in MIBs in MIBs dictionary self.suite_logger.debug("MIB name for << %s >> wasn't found" % (sym_name,)) else: self.suite_logger.debug("MIB name for << %s >> found: << %s >>" % (sym_name, mod_name,)) if mod_name not in self.mib_builder.mibSymbols: # loading found MIB try: self.mib_builder.loadModules(mod_name) self.suite_logger.debug("MIB << %s >> successful loaded" % (mod_name,)) except Exception: self.suite_logger.debug("MIB << %s >> is not loaded" % mod_name) # self.suite_logger.debug("mib_builder __modPathsSeen: %s" % (self.mib_builder._MibBuilder__modPathsSeen, )) # self.suite_logger.debug("mib_builder __modSeen: %s" % (self.mib_builder._MibBuilder__modSeen, )) else: self.suite_logger.debug("MIB << %s >> is already loaded" % (mod_name,)) return mod_name
[docs] def _get_oid(self, mod_name, sym_name): """Getting values from source by SNMP. Args: mod_name(dict): MIB module name. sym_name(str): MIB symbol name Returns: list: List of MIBs oids. Examples:: self._get_oid('ons_stat', 'onsSnmpAgentStatisticsPortId', 'tests/ui/mibs/') """ mib_node, = self.mib_builder.importSymbols(mod_name, sym_name)[0:1] listed_oid = list(mib_node.getName()) self.suite_logger.debug("Transleted OID: %s" % (listed_oid, )) return listed_oid
[docs] def _get_previous(self, mod_name, sym_name): """Getting OID and NodeName of previous SNMP element of sequence. Args: mod_name(dict): MIB module name. sym_name(str): MIB symbol name Returns: list: List with OID and symbol name of previous element. Examples:: self._get_previous('ONS-SWITCH-MIB', 'onsSwitchppControlBridgeInfoInbandIpNetMaskInetAddress') """ mib_node, = self.mib_builder.importSymbols(mod_name, sym_name)[0:1] type_oid = list(mib_node.getName()) # decreasing last oid member for 1 type_oid[-1] -= 1 # getting information for previous element oid, prev_names, suffix = self.mibViewController.getNodeNameByOid(tuple(type_oid)) return list(oid), prev_names[-1]
[docs] def _normalize_result(self, mod_name, sym_name, result, to_oid=False): """Normalize SNMP GET result according syntax from MIB. Args: mod_name(dict): MIB module name. sym_name(str): MIB symbol name result(list): List with one SNMP GET result for all types except InetAddress, for InetAddress - list with two elements ['InetAddress', 'InetAddressType']. to_oid(bool): indicator of formatting given result for OID. Returns: str, int: Normalized result according to syntax. Examples:: self._normalize_result('ONS-SWITCH-MIB', 'onsSwitchppControlBridgeInfoInbandIpNetMaskInetAddress', [OctetString(hexValue='ffffff00'), Integer(1)]) """ mib_node, = self.mib_builder.importSymbols(mod_name, sym_name)[0:1] syntax = mib_node.getSyntax() # branch for basic SNMP types if len(result) == 1 and result != 'None': result = result[0] # formatting digital types if result.isSuperTypeOf(self.__integer) or result.isSuperTypeOf(self.__counter32) \ or result.isSuperTypeOf(self.__unsigned32) or result.isSuperTypeOf(self.__counter64): self.suite_logger.debug("DIGITAL result type found.") if to_oid is False: # if DIGITAL result normalizing not for OID subtype_named_values = syntax.subtype().getNamedValues() if len(subtype_named_values.namedValues) > 0: result = subtype_named_values.getName(result) elif hasattr(syntax, 'displayHint') and syntax.displayHint is not None: self.suite_logger.debug("Formatting result according to DISPLAY-HINT: \"%s\"" % syntax.displayHint) try: result = int(syntax.prettyOut(result)) except ValueError: result = syntax.prettyOut(result) else: result = int(result) elif to_oid is True: # normalizing DIGITAL result for OID self.suite_logger.debug("Formatting result to use in OID") result = int(result) # formatting string types elif result.isSuperTypeOf(self.__octetString) or isinstance(result, self.OctetString): self.suite_logger.debug("OCTET STRING result type found.") if not to_oid: # normalizing OCTET STRING result not for OID if hasattr(syntax, 'displayHint') and syntax.displayHint is not None: self.suite_logger.debug("Formatting result according to DISPLAY-HINT: \"%s\"" % syntax.displayHint) result = syntax.prettyOut(result) else: self.suite_logger.debug("No DISPLAY-HINT found.") result = result.prettyPrint() else: # normalizing OCTET STRING result for OID self.suite_logger.debug("Formatting result to use in OID") res_len = "" if syntax.isFixedLength() else str(len(result)) + "." result = res_len + ".".join(str(number) for number in result.asNumbers()) else: self.suite_logger.debug("Unknown result type. Result \"%s\" didn't normalized." % (result, )) # branch for [InetAddress, InetAddressType] result elif len(result) == 2: if isinstance(syntax, self.InetAddress): self.suite_logger.debug("INET ADDRESS result type found.") addr_type_sym_name = self._get_previous(mod_name, sym_name)[1] addr_type = self._normalize_result(mod_name, addr_type_sym_name, [result[1]]) self.suite_logger.debug("Formatting result according to INET ADDRESS TYPE value: \"%s\" = \"%s\"" % (addr_type_sym_name, addr_type)) result = self.InetAddresses[addr_type].prettyOut(result[0]) # normalizing InetAddress result for OID if to_oid is True: self.suite_logger.debug("Formatting result to use in OID") if addr_type == "dns": result = str(len(result)) + "." + ".".join(str(number) for number in result.asNumbers()) else: result = ".".join(re.findall(r"[\w]+", result)) else: self.suite_logger.debug("Unknown result type. Result \"%s\" didn't normalized." % (result[0], )) return result
[docs] def _snmp_get_call(self, switch_id, arguments, community, version, to_oid=False, poll_timeout=20): """Getting data from source via SNMP. Args: switch_id(int): ID of switch to get SNMP call to. arguments(list): SNMP call (SNMP symbol name, index). Index can have inserted calls. community(str): SNMP community to read. version(str): version of SNMP protocol to use. to_oid(bool): indicator of returned result's OID format poll_timeout(int): timeout to appearing SNMP data. Returns: str, int: Normalized received SNMP data. Examples:: self._snmp_get_call(1, ['onsSwitchppControlBridgeInfoInbandIpNetMaskInetAddress', "1"], "sppCommunity", "v2", False, 20) self._snmp_get_call(1, ['onsSwitchppControlBridgeInfoInbandIpNetMaskInetAddress', ["1.{}.1", ["PortId", "1.2.3"]]], "sppCommunity", "v2", False, 20) """ sym_name = arguments[0] if isinstance(arguments[1], list): # Making calls inserted to index if "{}" in arguments[1][0]: # formation of OID by calling inserted calls and substitution received values to index values_list = [] for call in arguments[1][1:]: values_list.append(self._snmp_get_call(switch_id, call, community, version, to_oid=True)) arguments[1] = arguments[1][0].format(*values_list) elif isinstance(arguments[1][0], str) and len(arguments[1]) == 2: arguments[1] = self._snmp_get_call(switch_id, arguments[1], community, version, to_oid=True) # finding MIB name by symbol parameter's name self.suite_logger.debug("Get parameter: << %s >>" % (sym_name,)) mod_name = self._find_and_load_mib(helpers.MIBS_DICT, sym_name) # Return 'None' if MIB for parameter wasn't found if not mod_name: return 'None' # getting listed_oid(s): listed_oids = [] listed_oid = self._get_oid(mod_name, sym_name) listed_oids.append(listed_oid) # adding oid of previous element (must be InetAddressType) to list if param type is InetAddress mib_node, = self.mib_builder.importSymbols(mod_name, sym_name)[0:1] if isinstance(mib_node.getSyntax(), self.InetAddress): listed_oids.append(self._get_previous(mod_name, sym_name)[0]) if len(arguments) == 2 or len(arguments) == 5: oid_index = [] for oid_element in str(arguments[1]).split('.'): oid_index.append(int(oid_element)) listed_oids = [(oid + oid_index) for oid in listed_oids] else: self.suite_logger.debug("Wrong number of arguments in call: %s" % arguments) return 'None' # performing snmpget procedure: if version == 'v2': ip_addr = self.switches[switch_id]['host'] port = "161" end_time = time.time() + poll_timeout final_res = [univ.Null()] self.suite_logger.debug("Get OID: %s" % (listed_oids[0], )) while True: if time.time() < end_time: # do while at least one result list member is instance of univ.Null class if any(isinstance(result, univ.Null) for result in final_res): error_indication, error_status, error_index, var_binds = cmdgen.CommandGenerator().getCmd( cmdgen.CommunityData('test-agent', community, 1), cmdgen.UdpTransportTarget((ip_addr, port)), *listed_oids) if len(var_binds) > 0: final_res = list(res[1] for res in var_binds) self.suite_logger.debug("Returned SNMP Data:<<< %s >>>" % (final_res, )) else: self.suite_logger.debug("Returned SNMP response:<<< EMPTY >>>") else: self.suite_logger.debug("SNMP Data is not None:<<< %s >>>" % (final_res, )) break else: self.suite_logger.debug("Timeout exceeded and SNMP data is not appeared") break elif version == 'v3': username = arguments[-3] authpass = arguments[-2][1] privpass = arguments[-1][1] auth = arguments[-2][0] priv = arguments[-1][0] authprtcl = {'MD5': cmdgen.usmHMACMD5AuthProtocol, 'SHA': cmdgen.usmHMACSHAAuthProtocol, 'no_auth': cmdgen.usmNoAuthProtocol} privprtcl = {'DES': cmdgen.usmDESPrivProtocol, 'AES': cmdgen.usmAesCfb128Protocol, 'no_priv': cmdgen.usmNoPrivProtocol} error_indication, error_status, error_index, var_binds = cmdgen.CommandGenerator().getCmd( cmdgen.UsmUserData(username, authKey=authpass, privKey=privpass, authProtocol=authprtcl[auth], privProtocol=privprtcl[priv]), cmdgen.UdpTransportTarget((self.switches[switch_id]['host'], self.switches[switch_id]['port'])), *listed_oids) final_res = list(res[1] for res in var_binds) if any(isinstance(result, univ.Null) for result in final_res): self.suite_logger.debug("Result is \'None\':<<< %s >>>" % (final_res[0].prettyPrint())) final_res = 'None' else: final_res = self._normalize_result(mod_name, sym_name, final_res, to_oid) self.suite_logger.debug("Normalized result:<<< %s >>>" % final_res) return final_res
[docs] def snmp_get(self, elements_list, community, version, poll_timeout=20): """Walking through list of element to get and calling self._snmp_call() method. Args: elements_list(list): List of (SNMP symbol name, index) pairs. Index can have inserted calls. community(str): SNMP community to read. version(str): version of SNMP protocol to use. poll_timeout(int): timeout to appearing SNMP data. Returns: list: List of SNMP-GET command results. Examples:: self._snmp_get([{"1":[["onsSnmpAgentStatisticsPortId", "1"]]}]) self._snmp_get([{"1":[["onsSnmpAgentStatisticsPortId", ["1.{}.3", ["onsSnmpAgentStatisticsPortKey", "2.4.5"]]]]}], "sppCommunity", "v2") """ if not community: community = self.get_community result = [] result_dict = {} result_list = [] for elements_dict in elements_list: for key in list(elements_dict.keys()): if elements_dict[key] != [["readOnly"]]: for arguments in elements_dict[key]: result.append([self._snmp_get_call(int(key), arguments, community, version, False, poll_timeout)]) else: result = [["readOnly"]] result_dict[key] = result result_list.append(result_dict) return result_list
[docs] def snmp_set(self, elements_list, community, mib_dir=None): """Setting values by SNMP. Args: elements_list(list): List of (SNMP symbol name, index) pairs. Index can have inserted calls. community(str): SNMP community to read. mib_dir(str): MIB module name. Returns: list: List of SNMP-SET command results. Examples:: self._snmp_get(conf[test]['snmp_set']) """ if not mib_dir: mib_dir = self.mib_dir if not community: community = self.set_community result = [] result_dict = {} result_list = [] # try: for elements_dict in elements_list: for key in list(elements_dict.keys()): if elements_dict[key] != [["readOnly"]]: for arguments in elements_dict[key]: sym_name = str(arguments[0]) if len(arguments) == 4: arg_type = 3 arg_value = 2 else: arg_type = 2 arg_value = 1 if arguments[arg_type] == 'INTEGER': try: set_value = univ.Integer(int(arguments[arg_value])) except Exception: result_dict[key] = [["None"]] return result_list.append(result_dict) else: try: set_value = univ.OctetString(arg_value) except Exception: result_dict[key] = [["None"]] return result_list.append(result_dict) # getting mib-number: listed_oid = self._get_oid(mib_dir, sym_name) if len(arguments) == 4: for oid_element in str(arguments[1]).split('.'): listed_oid.append(int(oid_element)) mib_number = tuple(listed_oid) error_indication, error_status, error_index, var_binds = cmdgen.CommandGenerator().setCmd( cmdgen.CommunityData('test-agent', 'private', 1), cmdgen.UdpTransportTarget((self.switches[int(key)]['host'], self.switches[int(key)]['port'])), (mib_number, set_value)) if error_index != 0: final_res = error_status else: final_res = 0 result.append(final_res) else: result = ["readOnly"] result_dict[key] = [result] result_list.append(result_dict) return result_list
[docs] @staticmethod def snmp_walk(community, host, port, oid): """Perform SNMP walk for submitted oid. Args: community(str): SNMP community to read. host(str): SNMP host. port(int): SNMP host port. oid(str): SNMP OID. Raises: CustomException """ from testlib.custom_exceptions import CustomException cmd_gen = cmdgen.CommandGenerator() error_indication, error_status, error_index, var_binds = cmd_gen.nextCmd(cmdgen.CommunityData('test-agent', community, 1), cmdgen.UdpTransportTarget((host, port)), oid) # Check for errors and print out results if error_indication: raise CustomException(error_indication) else: if error_status: messages = ('%s at %s' % (error_status.prettyPrint(), # pylint: disable=no-member error_index and var_binds[int(error_index) - 1] or '?')) raise CustomException(messages) else: if var_binds: for name, val in var_binds[0]: messages = '%s = %s' % (name.prettyPrint(), val.prettyPrint()) else: messages = 'Empty Replay' return messages