Source code for taf.testlib.common3

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``common3.py``

`Testlib common functionality version 3.x`

"""

# Python built-in imports
import sys
import json
import os
import itertools
import traceback
from threading import Thread

import pytest

# Testlib imports
from .custom_exceptions import TAFCoreException
from . import entry_template
from . import environment
from . import loggers


VERSION = "3.0"

# Accessible from other modules list of loaded classes from dev_ files.
custom_classes = {}


# Add soft exit with environment sanitizing before exit.
[docs]def softexit(message, env=None): """Sanitizing environment and exit py.test execution. Args: message(str): Exit message env(Environment): Environment instance """ if env is not None: env.sanitize() pytest.exit(message)
pytest.softexit = softexit # Environment is inherited from dict to provide backward compatibility with TAFv1 suites
[docs]class Environment(dict): """Main class of all test environment. Notes: This class has to be used as base fixture in all test cases. It provides number of common methods to initialize, shutdown, cleanup environment functions which basically call appropriate methods of particular device classes. """ class_logger = loggers.ClassLogger()
[docs] def __init__(self, opts=None, **kwargs): """Read configuration files and create device objects. Args: opts(OptionParser): py.test config.option object which contains all py.test cli options. Raises: TAFCoreException: unexpected entry_type """ super(Environment, self).__init__(**kwargs) self.opts = opts self._dict = {} self.config = self._get_conf(self.opts.env) self.setup = {} if self.opts.setup: self.setup = self._get_setup(self.opts.setup) # Map acroname to conf id self.dut_map = {} # Map config Id to instance index self.id_map = {} # Environment properties self.env_prop = {} # Device classes self.__dev = {} # Map autoname to conf Id self.autoname_map = {} # Get device classes device_module_names = self._find_dev_modules() self._import_device_modules(device_module_names) # Make loaded classes from dev_ file accessible for other modules for key, value in self.__dev.items(): custom_classes[key] = value # Create env config according to setup new_config = [self.create_conf_entry(setup_entry) for setup_entry in self.setup['env']] # create a set from related ids lists related_ids = set(itertools.chain.from_iterable( conf_entry.get('related_id', []) for conf_entry in new_config)) # Add related config entries from environment config if they are not already in new_config new_config_ids = set(_x['id'] for _x in new_config) # find the unadded related_ids. new_related_ids = related_ids - new_config_ids related_configs = [ next(_e for _e in self.config if _e['id'] == rid) for rid in new_related_ids] new_config.extend(related_configs) # Save updated config self.config = new_config self.class_logger.info("Preparing environment objects.") # reading and appending config and creating instances for entry in self.config: self.class_logger.info( "Creating {0}:{1}:{2}".format(entry['entry_type'], entry['instance_type'], entry['id'])) # Append related configs if "related_id" in entry: entry['related_conf'] = self._append_related_confs(entry['related_id']) # Creating setup entries instances try: ename = self.__dev[entry['entry_type']]['NAME'] except KeyError: message = ("Unexpected value for entry_type: '%s' specified with id: '%s' " + "added in config.") % (entry['entry_type'], entry['id']) raise TAFCoreException(message) # always create a switch objects so that # all the switch related plugins that expect a switch attribute # fail gracefully if not hasattr(self, "switch"): setattr(self, "switch", {}) if not hasattr(self, ename): setattr(self, ename, {}) eid = len(getattr(self, ename)) + 1 # Append ID maps self.dut_map["%s%s" % (self.__dev[entry['entry_type']]['LINK_NAME'], eid)] = entry['id'] # Create entry instance getattr(self, ename)[eid] = self.__dev[entry['entry_type']][entry['instance_type']]( entry, self.opts) getattr(self, ename)[eid].env = self self.id_map[entry['id']] = getattr(self, ename)[eid] # In case entry contains autoname Environment object will contain d_<autoname> # attribute. if entry.get('autoname', False): # Append autoname and Id setattr(self, "d_{0}".format(entry['autoname']), getattr(self, ename)[eid]) self.autoname_map[entry['autoname']] = entry['id'] # Pass required by entries related objects: for entry in self.config: if "related_id" in entry: self.id_map[entry['id']].related_obj = dict( [(_id, self.id_map[_id]) for _id in entry['related_id']]) # To support heterogeneous setup we need to support multiple Cross connection types, # but allow user to be independent from this. # Cross object automatically detects connection owner and forward it to proper cross instance. self.cross = Cross(self.setup, self) # Append connections lists for cross entries. if "cross" in self.setup: for c_id in self.setup['cross']: self.id_map[c_id].connections = self.setup['cross'][c_id]
# TODO: Add transparent support of multiple TG instances in one. def _import_device_modules(self, device_module_names): for mod_name in device_module_names: self.class_logger.debug("Loading %s module...", mod_name) try: new_module = __import__("testlib." + mod_name, fromlist=[mod_name]) except ImportError: self.class_logger.warning("failed to import %s", mod_name, exc_info=True) # ignore modules that can't load, e.g. dependency problems such as tempest # instead failed when we try to instantiate the class continue # insert into global namespace globals()[mod_name] = new_module if new_module.ENTRY_TYPE and new_module.ENTRY_TYPE not in self.__dev: self.__dev[new_module.ENTRY_TYPE] = { "NAME": new_module.NAME, "LINK_NAME": getattr(new_module, 'LINK_NAME', new_module.NAME), } for instance_name, entry_class in new_module.INSTANCES.items(): if issubclass(entry_class, entry_template.GenericEntry): self.class_logger.debug( "Found entry_type {0}, instance_type {1}.".format(new_module.ENTRY_TYPE, instance_name)) self.__dev[new_module.ENTRY_TYPE][instance_name] = entry_class
[docs] def create_conf_entry(self, setup_entry): # Search for id in environment config # Add environment entry in setup if it's found, or leave setup entry as is. conf_entry = next( (cfg_e for cfg_e in self.config if cfg_e['id'] == setup_entry['id']), setup_entry) # Updating env keys according to setup conf_entry.update(setup_entry) return conf_entry
def _find_dev_modules(self): # extract this so we can override in unittests devices = [] testlib_path = os.path.dirname(__file__) for root, dirname, filenames in os.walk(testlib_path): for m in filenames: if m.startswith("dev_") and m.endswith(".py"): rel_path = os.path.relpath(os.path.join(root, m), testlib_path) # create module name relative to testlib # foo/dev_bar.py -> foo.dev_bar devices.append(os.path.splitext(rel_path)[0].replace(os.sep, '.')) return devices
[docs] def _get_conf(self, file_name=None): """Load environment config from file. Args: file_name(str): Name of a json file with a test environment configuration. Raises: TAFCoreException: configuration file is not found IOError: error on reading configuration file Returns: dict: dict of the selected configuration. Notes: This method shouldn't be used outside this class. Use "config" attribute to access environment configuration. """ if not file_name: self.class_logger.info("Environment file isn't set. All configurations will be taken from setup file.") # Return empty dict return dict() path_to_config = environment.get_conf_file(conf_name=file_name, conf_type="env") if not path_to_config: message = "Specified configuration file %s not found." % (file_name, ) raise TAFCoreException(message) try: config = json.loads(open(path_to_config).read(), encoding="latin-1") except: message = "Cannot read specified configuration: %s" % (path_to_config, ) self.class_logger.error(message) raise IOError(message) return config
[docs] def _get_setup(self, file_name): """Reads setup file based on provided name. Args: file_name(str): Name of a json file with setup. Raises: TAFCoreException: setup file is not found IOError: error on reading setup file Returns: list[dict]: setup json content. """ if not file_name: message = "Setup name must be specified." raise TAFCoreException(message) path_to_config = environment.get_conf_file(conf_name=file_name, conf_type="setup") if not path_to_config: message = "Cannot find given setup %s" % (file_name, ) raise TAFCoreException(message) try: setup = json.loads(open(path_to_config).read(), encoding='ascii') except: message = "Cannot read specified setup configuration: %s" % (path_to_config, ) self.class_logger.error(message) raise IOError(message) return setup
[docs] def _get_device_conf(self, device_id): """Return config entry by given Id if one, else return None. Args: device_id(str): Entry ID. Returns: dict: Entry config. """ return next((entry for entry in self.config if entry['id'] == device_id), None)
[docs] def id2instance(self, device_id): """Returns entry instance by device id. Args: device_id(str): Could be one of: device LINK_NAME, 'autoname' or 'id' from config. Returns: GenericEntry: Entry instance Examples:: # by LINK_NAME env.id2instance("sw1") # by "autoname" env.get_device_id("DEV2") # by ID env.get_device_id("9") """ dev_id = self.get_device_id(device_id) entry = [e for e in self.config if e['id'] == dev_id][0] instance = None for _i in list(getattr(self, self.__dev[entry['entry_type']]['NAME']).values()): if _i.id == dev_id: instance = _i break return instance
[docs] def safe_executor(self, obj, method, *args, **kwargs): """Invokes obj.method(*args, **kwargs) in try block and return error message with traceback. Args: obj(GenericEntry): Entry instance method(str): method name that has to be executed Returns: str: Error message with traceback Warning: - Don't use in case obj.method has to return something. - Don't use in case an exception has to be handled by py.test. """ try: self.class_logger.debug("Perform %s(*%s, **%s) on entry_type=%s, id=%s" % (method, args, kwargs, obj.type, obj.id)) getattr(obj, method)(*args, **kwargs) except Exception: exc_type, exc_value, exc_traceback = sys.exc_info() traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback) message = ("Error while call %s of entry_type=%s id=%s:\n%s" % (method, obj.type, obj.id, "".join(traceback_message))) self.class_logger.error(message) return message
[docs] def parallelize(self, objects, method, safe=False): """Run objects method in multiple threads. Args: objects(list[GenericEntry]): list of device objects. method(str): method name that has to be executed. safe(bool): Hide exception raisings, but print log message. Returns: None Examples:: objects = [env.lhost[1], env.lhost[2]] env.parallelize(objects, "cleanup", False) """ threads = [] def executor(o, m): return getattr(o, m)() for obj in objects: func = self.safe_executor if safe else executor thread = Thread(target=func, args=(obj, method)) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join()
[docs] def ordered_action(self, action, prio, entry_types): """Perform action on entries with type in entry_types and ordered by prio. Args: action(str): method name to execute. prio(str): priority name to sort objects by. entry_types(list[str]): entry types to apply action (apply action to all entry types if None). Returns: None """ # Select all types in case list isn't set. if not entry_types: entry_types = list(self.__dev.keys()) # Sort by start priorities prio_dict = self._get_prio_dict(prio) s_list = sorted(prio_dict.keys()) # Leave only selected entry types in prio_dict for prio in prio_dict: for item in prio_dict[prio][:]: if self._get_device_conf(item.id)['entry_type'] not in entry_types: prio_dict[prio].remove(item) for _s in s_list: if len(prio_dict[_s]) > 1 and self.opts.use_parallel_init: self.parallelize(prio_dict[_s], action) else: for obj in prio_dict[_s]: self.class_logger.debug("Perform %s() on entry_type=%s, id=%s" % (action, obj.type, obj.id)) getattr(obj, action)()
[docs] def _get_prio_dict(self, prio): """Return dict of entries by prio. Args: prio(str): Priority name to order dict by . Returns: dict: dict of lists where key = priority, value = list of device objects. """ prio_dict = {} for _e in self.config: # Set default prio value (0) in case it's not set. _prio = _e[prio] if prio in _e else 0 # Create/append list od device objects with the same priority. if _prio not in prio_dict: prio_dict[_prio] = [] prio_dict[_prio].append(self.id_map[_e['id']]) return prio_dict
[docs] def initialize(self, entry_types=None): """Initialize test environment. Args: entry_types(list[str]): List of entry types """ self.class_logger.info("Initialize environment...") self.ordered_action("create", "sprio", entry_types)
[docs] def cleanup(self, entry_types=None): """Cleaning environment. Args: entry_types(list[str]): List of entry types """ self.class_logger.info("Cleanup environment...") self.ordered_action("cleanup", "cprio", entry_types)
[docs] def sanitize(self, entry_types=None): """Sanitizing environment. Args: entry_types(list[str]): List of entry types """ self.class_logger.info("Sanitizing environment...") self.ordered_action("sanitize", "kprio", entry_types)
[docs] def check(self, entry_types=None): """Checking environment. Args: entry_types(list[str]): List of entry types """ self.class_logger.info("Check environment...") self.ordered_action("check", "tprio", entry_types)
[docs] def shutdown(self, entry_types=None): """Stopping/Disconnecting environment. Args: entry_types(list[str]): List of entry types Note: This method cares to release all environment even an exception is raised during destroy process. """ # Keep all error messages and print them at the end. # This object won't be append in case parallelize execution. error_messages = [] # Sort by start priorities prio_dict = self._get_prio_dict("kprio") s_list = sorted(prio_dict.keys()) # In further method calling we set safe flag or use safe_executor # to log and pass exceptions on destroy. for _s in s_list: if len(prio_dict[_s]) > 1 and self.opts.use_parallel_init: self.parallelize(prio_dict[_s], "destroy", True) else: for obj in prio_dict[_s]: err_msg = self.safe_executor(obj, "destroy") if err_msg: error_messages.append(err_msg) if error_messages: message = "The following errors encountered on environment shutdown:\n%s" % ("".join(error_messages), ) self.class_logger.error(message) # if stdout logging is disabled print error messages anyway if not loggers.LOG_STREAM: sys.stderr.write("ERROR:\n%s" % (message, )) sys.stderr.flush()
[docs] def get_device_id(self, dut): """Search device in config object by device name. Args: dut(str): Could be one of: device LINK_NAME, 'autoname' or 'id' from config. Raises: TAFCoreException: unknown device type Returns: str, int: Device id which configured. Examples (Config object like):: { "env": [ {"id": 5, "port_list": [["port1", 10000], ["port2", 40000]}, {"id": 9, "autoname": "DEV2", "port_list": [["port1", 10000], ["port2", 40000]} ] "cross": {"ID": [[5, 1, 9, 2], [5, 2, 9, 1]]} } Result is:: # by LINK_NAME env.get_device_id("sw1") == 5 # by "autoname" env.get_device_id("DEV2") == 9 # by ID env.get_device_id(9) == 9 """ # Find dut in dut_map if it is ID device if dut in list(self.dut_map.values()): return dut # Find dut acronym in dut_map elif dut in self.dut_map: # If acronym in dut_map dev_id = self.dut_map[dut] return dev_id # Find dut acronym in autoname_map elif dut in self.autoname_map: # If acronym in autoname_map dev_id = self.autoname_map[dut] return dev_id # Raise an exception if invalid device type else: message = "This device type not found. This method supports only %s or %s device types." % (list(self.dut_map.keys()), list(self.autoname_map.keys())) raise TAFCoreException(message)
[docs] def get_real_port_name(self, dut, port_id): """Search real port number/name by device name and port Id in config object. Args: dut(str): Could be one of: device LINK_NAME, 'autoname' or 'id' from config. port_id(int): Port Id from config object (ids starts from 1). Raises: TAFCoreException: port_id is not found in configuration; device doesn't have ports or port_list attributes Returns: int, str: Real port number/name or exception if there is no port with given Id in config. Examples (Config object like):: { "env": [ {"id": 99, "autoname": "DEV1", "port_list": [["port1", 10000], ["port2", 10000]}, {"id": 100, "ports": ["port10", 11]} ] "cross": {"ID": [[99, 1, 100, 2], [99, 2, 100, 1]]} } Result is:: # by LINK_MAME env.get_real_port_name("sw2", 2) == 11 # by "autoname" env.get_real_port_name("DEV1", 1) == "port1" """ # find device ID by acronym dev_id = self.get_device_id(dut) # find device object dev_obj = self.id_map[dev_id] # find port_id in port_list # WARNING: We HAVE to check ports and port_list in objects instead of configs, # because some device classes modify port names. # E.g.: json doesn't support tuples, but ports have to be hashable type. if hasattr(dev_obj, "port_list") and dev_obj.port_list: try: return dev_obj.port_list[port_id - 1][0] except IndexError: message = "Port ID %s is not found in 'port_list' of %s(%s)." % (port_id, dev_id, dut) raise TAFCoreException(message) # find port_id in ports elif hasattr(dev_obj, "ports") and dev_obj.ports: try: return dev_obj.ports[port_id - 1] except IndexError: message = "Port ID %s is not found in 'ports' of %s(%s)." % (port_id, dev_id, dut) raise TAFCoreException(message) else: message = "Device %s(%s) doesn't have 'ports' or 'port_list' attributes." % (dev_id, dut) raise TAFCoreException(message)
[docs] def get_port_speed(self, dut, port_id): """Search speed port in config object namely in 'port_list' by device name and port Id. Args: dut(str): Could be one of: device LINK_NAME, 'autoname' or 'id' from config. port_id(int): Port Id from config object (ids starts from 1) Raises: TAFCoreException: port is not present in configuration's 'port_list' Returns: int: Port speed or exception if there is no port with given Id in config. Examples (Config object like):: { "env": [ {"id": 5, "autoname": "DEV1", "port_list": [["port1", 10000], ["port2", 40000]}, {"id": 9, "ports": ["port10", 11]} ] "cross": {"ID": [[5, 1, 9, 2], [5, 2, 9, 1]]} } Result is:: env.get_port_speed("sw1", 2) == 40000 env.get_port_speed("DEV1", 1) == 10000 """ # find device id by acronym dev_id = self.get_device_id(dut) # find device id in config for dev_config in self.config: if dev_config['id'] == dev_id: # find port_id and speed in port_list if 'port_list' in dev_config: try: return dev_config['port_list'][port_id - 1][1] # raise exception if no speed for port except IndexError: message = "Port id %s is not configured on device %s." % (port_id, dut) raise TAFCoreException(message) # raise exception if not configured port_list else: message = "List of ports speed is not configured on device %s." % dut raise TAFCoreException(message)
[docs] def get_ports(self, links=None): """Returns dictionary of ports based on links between devices. Args: links(list[list]): List of devices in format [['dev1', 'dev2', number_of_links, port_speed], ] (list of lists). Where: \a number_of_links - optional parameter(int or enum - "ALL"); \a port_speed - optional parameter. Raises: TAFCoreException: wrong link format Returns: dict: ports Examples (Config object like):: { "env": [ {"id": 99, "autoname": "DEV1", "port_list": [["port1", 10000], ["port2", 40000], ["port3", 10000]}, {"id": 100, "port_list": [["port10", 40000], [11, 10000], ["port12", 40000]} ] "cross": {"ID": [[99, 1, 100, 2], [99, 2, 100, 1]]} } Result is:: ports = env.get_ports([['sw1', 'sw2', 1], ]) assert ports == {('sw2', 'sw1'): {1: "port10"}, ('sw1', 'sw2'): {1: "port1"}} ports = env.get_ports([['DEV1', 'sw2', 2], ]) assert ports == {('sw2', 'sw1'): {1: "port10", 2: 11}, ('sw1', 'sw2'): {1: "port1", 2: "port2"}} # with optional parameter "port_speed" ports = env.get_ports([['sw1', 'sw2', 1, 10000], ]) assert ports == {('sw1', 'sw2'): {1: "port1"}, ('sw2', 'sw1'): {1: "11"}} # Method returns all links between devices if no any optional parameters ports = env.get_ports([['sw1', 'sw2', ], ]) assert ports == {('sw1', 'sw2'): {1: "port1", 2: "port2"}, ('sw2', 'sw1'): {1: "port10", 2: 11}} # The same with enum "ALL" ports = env.get_ports([['sw1', 'sw2', "ALL"], ]) assert ports == {('sw1', 'sw2'): {1: "port1", 2: "port2"}, ('sw2', 'sw1'): {1: "port10", 2: 11}} # With optional parameters "port_speed" and "ALL" ports = env.get_ports([['sw1', 'sw2', "ALL", 40000], ]) assert ports == {('sw1', 'sw2'): {1: "port2"}, ('sw2', 'sw1'): {1: "port10"}} # Method returns all links between devices if no parameter ports = env.get_ports() assert ports == {('sw1', 'sw2'): {1: "port1", 2: "port2"}, ('sw2', 'sw1'): {1: "port10", 2: 11}} """ if links: # Create empty prototype for ports dictionary ports = {} for link in links: # if not specified all devices if len(link) < 2: message = "At list is not specified devices." raise TAFCoreException(message) ports[(link[0], link[1])] = {} ports[(link[1], link[0])] = {} # Process each link in links for link in links: # link Ids counter link_id = 0 # if not specified number of links return all links between devices if len(link) == 2: link.append("ALL") # if number of links specified zero then raise exception if link[2] == 0: message = "Number of links cannot equal zero." raise TAFCoreException(message) # the flag indicates that was set parameter port_speed port_speed_flag = False if len(link) == 4: port_speed_flag = True port_speed = link[3] # ports Ids counter ports_count = link[2] if link[2] == "ALL": ports_count = 1 # Process setups for each cross for cross_id in self.setup['cross']: # Each link in setup for setup_link in self.setup['cross'][cross_id]: # This list will contain port Ids from setup port_ids = [] try: # Search for link in setup. Compare links by devices ID if [setup_link[0], setup_link[2]] == [self.get_device_id(link[0]), self.get_device_id(link[1])]: port_ids = [setup_link[1], setup_link[3]] elif [setup_link[2], setup_link[0]] == [self.get_device_id(link[0]), self.get_device_id(link[1])]: port_ids = [setup_link[3], setup_link[1]] except TAFCoreException as err: message = "Insufficient devices count required for test" pytest.skip(message) # Append ports if port_ids: if port_speed_flag: if link_id < ports_count: if self.get_port_speed(link[0], port_ids[0]) == self.get_port_speed(link[1], port_ids[1]) == port_speed: link_id += 1 ports[(link[0], link[1])][link_id] = self.get_real_port_name(link[0], port_ids[0]) ports[(link[1], link[0])][link_id] = self.get_real_port_name(link[1], port_ids[1]) else: if link_id < ports_count: link_id += 1 ports[(link[0], link[1])][link_id] = self.get_real_port_name(link[0], port_ids[0]) ports[(link[1], link[0])][link_id] = self.get_real_port_name(link[1], port_ids[1]) if link[2] == "ALL": ports_count += 1 # If all links are collected if link_id == ports_count: break if link[2] == "ALL": ports_count = link_id # Verify that ports dictionary full filed if (len(ports[(link[0], link[1])]) < ports_count or len(ports[(link[1], link[0])]) < ports_count or not ports[(link[0], link[1])] or not ports[(link[1], link[0])]): if port_speed_flag: message = "No links with required speed {0}".format(port_speed) else: message = "Insufficient links count required for test" pytest.skip(message) self.class_logger.debug("Got the following ports: %s." % (ports, )) return ports else: ports = {} # create tuples of existing device connection pairs for cross_id in self.setup['cross']: for setup_link in self.setup['cross'][cross_id]: ports[setup_link[0], setup_link[2]] = {} ports[setup_link[2], setup_link[0]] = {} # Process each tuple in ports for key in ports: # link Ids counter link_id = 0 # Process setups for each cross for cross_id in self.setup['cross']: # Each link in setup for setup_link in self.setup['cross'][cross_id]: # Search for link in setup. Compare links by devices ID if [setup_link[0], setup_link[2]] == [self.get_device_id(key[0]), self.get_device_id(key[1])]: link_id += 1 # Append ports ports[(key[0], key[1])][link_id] = self.get_real_port_name(key[0], setup_link[1]) elif [setup_link[2], setup_link[0]] == [self.get_device_id(key[0]), self.get_device_id(key[1])]: link_id += 1 ports[(key[0], key[1])][link_id] = self.get_real_port_name(key[0], setup_link[3]) self.class_logger.debug("Got the following ports: %s." % (ports, )) return ports
[docs]class Cross(dict): """New interface to cross object without device id. """
[docs] def __init__(self, setup, env): """Initialize Cross class. """ super(Cross, self).__init__() self.setup = setup self.env = env if hasattr(env, "cross"): for key, value in list(env.cross.items()): self[key] = value
[docs] def get_device_id(self, connection): """Search device in setup object by given connection. Args: connection(list): Connection info in format [sw1, port1, sw2, port2] Raises: Exception: no device in connection Returns: int: device id which own connection """ connection_reverse = connection[2:] + connection[:2] try: match = next(cross_id for cross_id, crosses in self.setup['cross'].items() if connection in crosses or connection_reverse in crosses) # keys() is not guaranteed to be stable, why does this work? return list(self.setup['cross'].keys()).index(match) + 1 except StopIteration: raise Exception("Can not find device with such connection: %s in config" % connection)
[docs] def xconnect(self, connection): """Wrapper for xconnect method defined in xconnect.py module. Args: connection(list): Connection info in format [sw1, port1, sw2, port2] """ id_real_device = self.get_device_id(connection) return self[id_real_device].xconnect(connection)
[docs] def xdisconnect(self, connection): """Wrapper for xdisconnect method defined in xconnect.py module. Args: connection(list): in format [sw1, port1, sw2, port2] """ id_real_device = self.get_device_id(connection) return self[id_real_device].xdisconnect(connection)
[docs] def cross_connect(self, conn_list): """Wrapper for cross_connect method defined in xconnect.py module. Args: conn_list(list[list]): List of connections Raises: Exception: conn_list is empty """ if conn_list: connection = conn_list[0] id_real_device = self.get_device_id(connection) return self[id_real_device].cross_connect([connection]) else: raise Exception("conn_list is empty")
[docs] def cross_disconnect(self, disconn_list): """Wrapper for cross_disconnect method defined in xconnect.py module. Args: disconn_list(list[list]): List of connections Raises: Exception: disconn_list is empty """ if disconn_list: connection = disconn_list[0] id_real_device = self.get_device_id(connection) return self[id_real_device].cross_disconnect(disconn_list) else: raise Exception("disconn_list is empty")
[docs] def get_connection(self, dev_id, port_no): """Get connection for device port. Args: dev_id(str): Device ID/autoname/linkname ('tg1') port_no(int): Device port number. Raises: Exception: no connection for current port Returns: list: Connection info """ # Get device device_id = self.env.get_device_id(dev_id) dev_obj = self.env.id_map[device_id] # Get port_id from port_no port_id = dev_obj.ports.index(port_no) # Check for connection in setup connection = None for device in self.setup['cross']: for conn in self.setup['cross'][device]: if (device_id == conn[0] and port_id == conn[1] - 1) or (device_id == conn[2] and port_id == conn[3] - 1): connection = conn break if connection is None: raise Exception("Port {0} on device {1} is not used in current setup.".format(port_no, dev_id)) # dev_id has to be source if connection[0] != device_id: connection = connection[2:] + connection[:2] return connection
[docs] def device_port_disconnect(self, dev_id, port_no): """Connect/Disconnect device port. Args: dev_id(str): Device ID/autoname/linkname ('tg1') port_no(int): Device port number. """ # Get connection connection = self.get_connection(dev_id, port_no) # Emulate port disconnection self.cross_disconnect([connection, ])
[docs] def device_port_connect(self, dev_id, port_no): """Connect/Disconnect device port. Args: dev_id(str): Device ID/autoname/linkname ('tg1') port_no(int): Device port number. """ # Get connection connection = self.get_connection(dev_id, port_no) # Emulate port connection self.cross_connect([connection, ])