Source code for taf.testlib.ui_helpers

# Copyright (c) 2015 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``ui_helpers.py``

`UiHelper class for mixin for switch.ui object`

"""

import time
import configparser
from itertools import chain
import xmlrpc.client

import pytest

from .custom_exceptions import SwitchException, BoundaryError, InvalidCommandError
from .custom_exceptions import ExistsError, NotExistsError, ArgumentError, CustomException
from .custom_exceptions import CLIException, UIException, UICmdException
from testlib import loggers
from testlib import helpers


# create logger for module
mod_logger = loggers.module_logger(name=__name__)


[docs]class UiHelperMixin(object): """Mixing class for switch.ui. """ # Port Helpers
[docs] def wait_for_port_value_to_change( self, ports, port_parameter, value, interval=1, timeout=30): """Wait until value is changed in port table. Args: ports(list[int | str]): list of ports port_parameter(str): port parameter value(int | str): checking value timeout(int): timeout Raises: StandardError Returns: None """ end_time = time.time() + timeout while time.time() < end_time: table = self.get_table_ports(ports=ports, all_params=True, ip_addr=True) time.sleep(interval) if all(value in r[port_parameter] if isinstance(r[port_parameter], (list, tuple, dict, set)) else r[port_parameter] == value for r in table): time.sleep(3) return True raise Exception( ("Timeout exceeded: Port {0} does not have value {1} for {2}.").format( ports, value, port_parameter))
[docs] def wait_until_ops_state(self, port=1, state="Up", timeout=30): """Obsoleted function. Use wait_until_value_is_changed with proper option instead. """ end_time = time.time() + timeout while time.time() < end_time: table = self.get_table_ports([port, ]) if table[0]['operationalStatus'] == state: return time.sleep(3) raise Exception( "Timeout exceeded: Port state wasn't changed during timeout %s into %s value" % (timeout, state))
[docs] def ui_raises(self, method, *args, **values): """UI raises. Args: method(str): method to call """ try: getattr(self, method)(*args, **values) except (AttributeError, BoundaryError, AssertionError, InvalidCommandError, NotExistsError, ExistsError, ArgumentError, xmlrpc.client.Fault, TypeError, UICmdException, CLIException, UIException): pass else: pytest.fail("Incorrect command has been executed.")
[docs] def compose_unique_mac_addr(self, pckt_type='unicast', prefix=None, ports=None): """Creates unique mac addresses for given ports. Each addres is concatenation of packet type speecific prefix, switch own IP address and port number. Args: pckt_type(str): type of packet: 'unicast', 'multicast' or 'broadcast' prefix(str): custom packet prefix consisting of two octets,eg '01:80', ignored if pck_type == 'broadcast' ports(list): list of port names or numbers Returns: map of port names and mac adresses """ type_prefix = {'unicast': 0x0, 'multicast': 0x1, 'broadcast': 0xff} port_mac = {} for port_id in ports: if pckt_type != 'broadcast': if prefix is None: ip_split = self.switch.ipaddr.split('.') prefix = '{oct_a:02x}:{oct_b:02x}'.format(oct_a=type_prefix[pckt_type], oct_b=int(ip_split[0])) mac = ':'.join([prefix] + ['{:02x}'.format(int(x)) for x in ip_split[1:4] + [str(port_id).replace('sw0p', '')]]) else: mac = 'ff:ff:ff:ff:ff:ff' port_mac[port_id] = mac return port_mac
# FDB Helpers
[docs] def delete_static_macs_from_port(self, port): """Deletes all static MAC addresses from port. Args: port(str | int): port """ table_fdb = self.get_table_fdb(table="static") table_fdb_port = (r for r in table_fdb if r['portId'] == port) for entry in table_fdb_port: if 'vlanId' in entry: self.delete_static_mac(port=port, mac=entry['macAddress'], vlan=entry['vlanId'])
# LAG Helpers
[docs] def wait_for_port_status(self, lag_id, state, value, interval): """Wait for LAG/port state to become value. Args: lag_id(int | str): LAG/port id state(str): state value(int | str): expected value interval(int): timeout Raises: SwitchException Returns: None """ end_time = time.time() + interval while time.time() < end_time: lag = self.get_table_ports(ports=[lag_id, ], all_params=True) lag_state = lag[0][state] if lag_state == value: return time.sleep(1) raise SwitchException( "Port {} is not changed to {} during timeout.".format(state, value))
[docs] def clear_lag_table(self): """Removes all ports from LAG and clears LAG table. """ table = self.get_table_lags() table_ports2lag = self.get_table_ports2lag() for row in table_ports2lag: self.delete_lag_ports(ports=[row['portId']], lag=row['lagId']) lag_ids = [x['lagId'] for x in table] self.delete_lags(lag_ids)
[docs] def is_lag_added(self, lag_id): """Check if lag has been added to LAG table. Args: lag_id(str|int): id of lag Returns: bool """ table = self.get_table_lags() return any(r['lagId'] == lag_id for r in table)
[docs] def is_port_added_to_lag(self, port, lag_id): """Check if port added to LAG. Args: port(int): port lag_id(str|int): id of lag Returns: bool """ # Enforcing str for ONP/ONS compatibility table = self.get_table_ports2lag() return any(r['portId'] == port and r['lagId'] == lag_id for r in table)
[docs] def set_admin_mode_for_slave_ports(self, admin_mode="Down"): """Set adminMode for logical ports. Args: admin_mode(str): Ports adminMode Returns: True or raise exception Examples:: assert ui_helpers.set_admin_mode_for_slave_ports(admin_mode="Up") """ timeout = 10 end_time = time.time() + timeout # Temporary workaround according to ONS-28780. slave_ports_count = 4 for port in self.switch.hw.master_ports: port_info = self.get_table_ports([port, ]) if port_info[0]["speed"] == 10000: while True: if time.time() < end_time: # Need to wait until entry is appeared in Ports table. time.sleep(1) # break if get_table_ports is non-empty if any(self.get_table_ports([port + i]) for i in range(1, slave_ports_count)): break else: pytest.fail( "Slave ports are not available during timeout %s seconds" % timeout) for i in range(1, slave_ports_count): self.modify_ports([port + i, ], adminMode="Down") return True
[docs] def wait_for_state_lag_state(self, lag=None, port=1, state="Selected", timeout=30): """Wait until port state in RSTP table. """ end_time = time.time() + timeout while time.time() < end_time: table = [row for row in self.get_table_lags_local_ports(lag) if row["portId"] == port] if not table: return if table[0]['selected'] == state: return time.sleep(0.3) raise Exception( "Timeout exceeded: Port state wasn't changed during timeout %s into %s value" % (timeout, state))
# UFD Helpers
[docs] def build_and_create_ufd_network_file(self, port_type, ports, bind_carrier=''): """Creating network file for uplink and downlink ports. Args: port_type(str): type of interface (uplink/downlink) ports(list[int | str]): ports to assign to UFD group bind_carrier(str | list[int]): which uplink ports are bound to specified downlink ports Returns: None """ # Get the Config Parser Instance config = configparser.RawConfigParser() # Preserve the Key Name Original Case config.optionxform = str # Add sections for config parser config.add_section('Match') config.add_section('Network') bind_port_names = [self.port_map[port] if isinstance(port, int) else port for port in bind_carrier] bind_port_names = ' '.join(bind_port_names) for port in ports: port_name = self.port_map[port] if isinstance(port, int) else port # Set Match Section and Port name config.set('Match', 'Name', port_name) # Set Network Section and BindCarrier in case of Network Filetype downlink if port_type is 'downlink': config.set('Network', 'BindCarrier', bind_port_names) self.create_ufd_network_file( port_name=port_name, config_parser_instance=config)
[docs] def start_traffic_ufd(self, tg_instance, port_list): """Start sending traffic to the ports. Args: tg_instance(instance object): TG instance object port_list(list[dict]): List of interfaces to which traffic needs to be send Returns: list[int]: list of stream ids generated """ stream_ids = [] for port in port_list: # Define the packet packet = ( {"Ether": {"dst": port["dst_mac"], "src": port['src_mac'], "type": 0x0800}}, {"IP": {}}, {"UDP": {}}, ) # Create streams stream_id = tg_instance.set_stream(packet, continuous=True, iface=port['iface']) stream_ids.append(stream_id) # Clear statistics table self.clear_statistics() # Start streams tg_instance.start_streams(stream_ids) return stream_ids
[docs] def get_and_validate_statistics_ufd(self, ingress_ports, egress_ports, tg_instance, sniff_params, time_out=30): """Get the port statistics and validate the counters. Args: ingress_ports(list[tuple(int, built-in function)]): List of ingress ports that need to be validated egress_ports(list[tuple(int, built-in function)]): List of egress ports that need to be validated tg_instance(instance object): traffic generator instance object sniff_params(dict{(int, int, int): list[tuple(str, str, bool)]}): parms used for validating packets in the TG time_out(int): Time out required for the counters to get updated Returns: None """ # Start packet sniffing sniff_ports = list(sniff_params.keys()) tg_instance.start_sniff(sniff_ports) # Get start statistics value for all ingress ports start_ingress_port_statistics = [int(self.get_table_statistics( port[0], 'cntRxUcstPktsIPv4')) for port in ingress_ports] # Get start statistics value for all egress ports start_egress_port_statistics = [int(self.get_table_statistics( port[0], 'cntTxUcstPkts')) for port in egress_ports] # Wait for time_out seconds for the packets to reach and counters to get updated time.sleep(time_out) # Get end statistics value for all ingress ports end_ingress_port_statistics = [int(self.get_table_statistics( port[0], 'cntRxUcstPktsIPv4')) for port in ingress_ports] # Get end statistics value for all egress ports end_egress_port_statistics = [int(self.get_table_statistics( port[0], 'cntTxUcstPkts')) for port in egress_ports] # Stop packet sniffing data = tg_instance.stop_sniff(sniff_ports) # Validate the counters for port, start_stat, end_stat in zip( chain(ingress_ports, egress_ports), chain(start_ingress_port_statistics, start_egress_port_statistics), chain(end_ingress_port_statistics, end_egress_port_statistics)): assert port[1](start_stat, end_stat), "Counter mismatch occured" # Validate the packet for tg_port in sniff_params: for param in sniff_params[tg_port]: helpers.is_packet_received(data=data, iface_1=tg_port, layer_1="Ether", field_1="dst", value_1=param[1], layer_2="Ether", field_2="src", value_2=param[0], tg_instance=tg_instance, result=param[2])
[docs] def add_entry_to_fdb_ufd(self, vlan_id, fdb_entries): """Add static MAC to FDB. Args: vlan_id(int): vlan id to be used fdb_entries(list[tuple(int, str)]): port and mac details that neeed to be added to FDB Returns: None """ for entry in fdb_entries: self.create_static_macs(port=entry[0], vlans=[vlan_id], macs=[entry[1]])
# VLAN helpers
[docs] def is_entry_added_to_vlan_table(self, vlan_id=1): """Check if entry is added to VLAN table. Args: vlan_id(int): vlan number from where packet was sent (integer) Returns: bool: True or False Examples:: is_entry_added_to_vlan_table(vlan_id=vlan_id) """ table = self.get_table_vlans() return any(row['vlanId'] == vlan_id for row in table)
[docs] def is_entry_added_to_ports2vlan_table(self, port_id, vlan_id, tagged='Tagged', pvid=None, table_ports2vlan=None): """Check if entry is added to Ports2Vlan table. Args: port_id(int): port number vlan_id(int): vlan number tagged(str): tagged or untagged pvid(bool): true or false, if inputted vlan is pvid table_ports2vlan(list[dict]): table of ports2vlan Returns: bool: True or False Examples:: is_entry_added_to_ports2vlan_table(port_id=port_id, vlan_id=vlan_id, pvid=True, tagged=tagged) """ if table_ports2vlan is None: table_ports2vlan = self.get_table_ports2vlans() if pvid is not None: return any(row['portId'] == port_id and row['vlanId'] == vlan_id and row['pvid'] == pvid and row['tagged'] == tagged for row in table_ports2vlan) else: return any(row['portId'] == port_id and row['vlanId'] == vlan_id and row['tagged'] == tagged for row in table_ports2vlan)
# STP helpers
[docs] def wait_until_stp_param(self, mode="STP", port=1, param='rootGuard', value="Disabled", timeout=30, instance=0): """Wait until port role in xSTP table. """ end_time = time.time() + timeout while time.time() < end_time: if mode == 'RSTP' or mode == 'STP': table = self.get_table_rstp_ports([port, ]) if mode == 'MSTP': table = self.get_table_mstp_ports([port, ], instance) if table[0][param] == value: return time.sleep(0.3) raise CustomException( "Timeout exceeded: Port %s wasn't changed during timeout %s into %s value" % (param, timeout, value))