Source code for taf.testlib.Ixia.LACP

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``LACP.py``

`IxNetwork LACP protocol emulation functionality`

Note:
    TCL procedures::

        ::ixia::emulation_lacp_control
        ::ixia::emulation_lacp_info
        ::ixia::emulation_lacp_link_config

"""

import copy


[docs]class LACP(object): """IxNet LACP configuration wrapper. """
[docs] def __init__(self, ixia): """LACP class initialization. Args: ixia(IxiaHLTMixin): Ixia traffic generator """ self.ixia = ixia self.lacp_dict = {}
[docs] def control(self, port, link_handler_id=None, **kwargs): """Start/stop/restart protocol, start/stop sending PDU, send Marker Request, update link parameters after the link has been modified. Args: port(tuple(int)): TG port in format tuple(chassisID, cardId, portId) link_handler_id(str): LACP link name Raise: AssertionError: error in executing tcl code Returns: None Note: See description of keyword arguments in ixia_lacp_api.tcl Full path: /opt/ixos/lib/hltapi/library/ixia_lacp_api.tcl """ if port not in list(self.lacp_dict.keys()): self.lacp_dict[port] = {} if 'port_handle' not in self.lacp_dict[port]: if isinstance(port[0], tuple): _ports = [] for port_item in port: _ports.append("_".join(map(str, port_item))) _port = "_".join(_ports) port_handler_name = "port_handler_{0}".format(_port) self.ixia.set_var(**{port_handler_name: "[list]"}) for port_item in port: self.ixia.tcl("lappend {0} {1}".format(port_handler_name, "/".join(map(str, port_item)))) else: _port = "_".join(port.split("/")) port_handler_name = "port_handler_{0}".format(_port) self.ixia.set_var(**{port_handler_name: "/".join(map(str, port))}) self.lacp_dict[port]['port_handle'] = port_handler_name cfg_name = "lacp_control_{0}".format(self.lacp_dict[port]['port_handle'].replace("port_handler_", "")) if "mode" not in list(kwargs.keys()): kwargs["mode"] = "start" if link_handler_id is not None: kwargs["handle"] = "${0}".format(self.lacp_dict[port]['link_handler'][link_handler_id]) else: kwargs["port_handle"] = "${0}".format(self.lacp_dict[port]['port_handle']) self.ixia.ixia_emulation_lacp_control(**kwargs) assert self.ixia.check_return_code() == "" self.ixia.set_var(**{cfg_name: "$return_code"}) self.lacp_dict[port]['lacp_control'] = cfg_name
[docs] def info(self, port, link_handler_id=None, **kwargs): """Command to retrieve LACP statistics. Args: port(tuple(int)): TG port in format tuple(chassisID, cardId, portId) link_handler_id(str): LACP link name Raises: AssertionError: error in executing tcl code Returns: dict: LACP statistics Note: See description of keyword arguments in ixia_lacp_api.tcl Full path: /opt/ixos/lib/hltapi/library/ixia_lacp_api.tcl """ if port not in list(self.lacp_dict.keys()): self.lacp_dict[port] = {} if 'port_handle' not in self.lacp_dict[port]: if isinstance(port[0], tuple): _ports = [] for port_item in port: _ports.append("_".join(map(str, port_item))) _port = "_".join(_ports) port_handler_name = "port_handler_{0}".format(_port) self.ixia.set_var(**{port_handler_name: "[list]"}) for port_item in port: self.ixia.tcl("lappend {0} {1}".format(port_handler_name, "/".join(map(str, port_item)))) else: _port = "_".join(port.split("/")) port_handler_name = "port_handler_{0}".format(_port) self.ixia.set_var(**{port_handler_name: "/".join(map(str, port))}) self.lacp_dict[port]['port_handle'] = port_handler_name cfg_name = "lacp_info_{0}".format(self.lacp_dict[port]['port_handle'].replace("port_handler_", "")) if "mode" not in list(kwargs.keys()): kwargs["mode"] = "aggregate_stats" if link_handler_id is not None: kwargs["handle"] = "${0}".format(self.lacp_dict[port]['link_handler'][link_handler_id]) else: kwargs["port_handle"] = "${0}".format(self.lacp_dict[port]['port_handle']) self.ixia.ixia_emulation_lacp_info(**kwargs) assert self.ixia.check_return_code() == "" self.ixia.set_var(**{cfg_name: "$return_code"}) self.ixia.puts("$return_code") self.lacp_dict[port]["lacp_info"] = cfg_name # Create lacp info dictionary self.lacp_dict[port]["info"] = {} if not isinstance(port[0], tuple): port_list = (port, ) else: port_list = port for port_item in port_list: p_item = str(port_item[0]) + "/" + str(port_item[1]) + "/" + str(port_item[2]) self.lacp_dict[port]["info"][p_item] = {} _rlist = self.ixia.tcl("keylkeys {0} {1}".format(cfg_name, p_item)) _rlist = _rlist.split(" ") for key_item in _rlist: # item example: '::ixNet::OBJ-/vport:1/protocols/stp/bridge:1' self.lacp_dict[port]['info'][p_item][key_item] = self.ixia.tcl("lindex [keylget {0} {1}.{2}] 0".format(cfg_name, p_item, key_item)) return copy.deepcopy(self.lacp_dict[port]['info'])
[docs] def cleanup(self): """Clean all TCL variables and lacp_dict. Returns: None """ for port_values in list(self.lacp_dict.values()): for value in list(port_values.values()): if isinstance(value, dict): for inner_value in list(value.values()): self.ixia.tcl("unset {0}".format(inner_value)) else: self.ixia.tcl("unset {0}".format(value)) self.lacp_dict = {}