Source code for taf.testlib.tg_helpers

# Copyright (c) 2016 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``tg_helpers.py``

`TGHelperMixin class for mixin for tg object`

"""
import re
import time
from copy import deepcopy

import pypacker
from netaddr import IPAddress, EUI, mac_unix_expanded, mac_cisco

from .custom_exceptions import TGException, UIException
from . import loggers


[docs]class TGHelperMixin(object): """Mixin class for switch.tg """ # Create logger for the class class_logger = loggers.module_logger(name=__name__)
[docs] def emulate_isis_neighbor_on_port(self, tg_port, mac_address=None, system_id=None, mtu=None, ipv4_address=None, ipv6_address=None, hostname=None, isis_emul_hosts=1, ipgap=1, wait_for_packets=10): """Emulation of IS-IS router on specified TG's port. Args: tg_port(str): Name of TG port for emulation mac_address(str): Source MAC address for IS-IS PDUs system_id(str): System ID for IS-IS PDUs in format "xxxx.xxxx.xxxx" ipv4_address(str): IPv4 address to announce as management IPv4 address ipv6_address(str): IPv6 address for announcement hostname(str): Hostname for announcement isis_emul_hosts(int): number of ISIS Hosts to emulate ipgap(int): rate of ISIS LSP ID's packets to emulate wait_for_packets(int): Time in seconds for sniffing packets Returns: dict: IS-IS neighbor parameters that are propagated in IS-IS packets e.g. MAC address or area ID. Notes: The second IS-IS neighbor emulation on other port leads to link state changes and updates (LSPs) sent to already existing neighbors. LSPs require confirmations. The correct way seems to be running an interactive sniffer in separate thread and confirm every received LSP by an appropriate PSNP/CSNP. But current IXIA TAF implementation doesn't support threading, so for now, in terms of compatibility, this approach isn't implemented. """ MAX_ISISLSP_NEIGHBOR_TLV = 22 # max number of neighbor entries in LSP packet self.class_logger.info("Start IS-IS neighbor emulation on TG port {}".format(tg_port)) # Sniff IS-IS Hello packets self.start_sniff([tg_port], packets_count=1, filter_layer="IS-IS_IIH_P2P", sniffing_time=wait_for_packets) # Get sniffed packets, find Hello packets sniffed = self.stop_sniff([tg_port]) if not sniffed[tg_port]: raise TGException("No IS-IS Hellos were sniffed " "during {} second.".format(wait_for_packets)) self.class_logger.debug("Received IS-IS Hello packet from neighbor \"{}\" on \"{}\" " "port".format(sniffed[tg_port][0].sourceid, tg_port)) # Get values from received IS-IS Hello to create an appropriate IS-IS packets neighbor_id = sniffed[tg_port][0].sourceid + ".00" area_id = sniffed[tg_port][0].getlayer(pypacker.ISIS_AreaEntry).areaid # pylint: disable=no-member # Prepare other required variables for IS-IS packets mac_address = mac_address if mac_address else str(pypacker.RandMAC()) # pylint: disable=no-member system_id = system_id if system_id else str(EUI(mac_address, dialect=mac_cisco)) all_is_mac_address = "09:00:2b:00:00:05" ipv4_address = IPAddress(ipv4_address) if ipv4_address else IPAddress(str(pypacker.RandIP())) # pylint: disable=no-member ipv6_address = ipv6_address if ipv6_address else str(EUI(mac_address).ipv6_link_local()) hostname = hostname if hostname else "isis_host_on_{}".format(tg_port) # Count required amount of padding TLVs in accordance to current port's MTU tg_port_mtu = mtu if mtu else self.get_os_mtu(iface=tg_port) # Get host mtu take min of both values ... max_pad_tlv_len = 257 isis_headers_len = 58 max_sized_padding_count = (tg_port_mtu - isis_headers_len) // max_pad_tlv_len rest_padding = (tg_port_mtu - isis_headers_len) % max_pad_tlv_len - 2 # Prepare basic TLVs for IS-IS Hello packet tlvs = [{"ISIS_AreaTlv": {"areas": {"ISIS_AreaEntry": {"areaid": area_id}}}}, {"ISIS_ProtocolsSupportedTlv": {"nlpids": ["IPv6"]}}, {"ISIS_Ipv6InterfaceAddressTlv": {"addresses": [ipv6_address]}}] # Prepare Padding TLVs tlvs.extend([{"ISIS_PaddingTlv": {"len": 255, "padding": "\x00" * 255}}] * max_sized_padding_count) tlvs.append({"ISIS_PaddingTlv": {"len": rest_padding, "padding": "\x00" * rest_padding}}) # Prepare IS-IS Hello packet isis_hello = ({"Dot3": {"dst": all_is_mac_address, "src": mac_address, 'len': tg_port_mtu}}, {"LLC": {"dsap": 254, "ssap": 254, "ctrl": 3}}, {"ISIS_CommonHdr": {}}, {"ISIS_P2P_Hello": {"circuittype": "L1", "sourceid": system_id, "localcircuitid": 10, "tlvs": tlvs}}) # Prepare base IS-IS LSP packet isis_lsp_host = ( {"Dot3": {"dst": all_is_mac_address, "src": mac_address}}, {"LLC": {"dsap": 254, "ssap": 254, "ctrl": 3}}, {"ISIS_CommonHdr": {}}, {"ISIS_L1_LSP": {"lspid": system_id + '.00-00', "lifetime": 1200, "seqnum": 1, "typeblock": "L1", "tlvs": [ {"ISIS_AreaTlv": { "areas": { "ISIS_AreaEntry": {"areaid": area_id}}}}, {"ISIS_ProtocolsSupportedTlv": {"nlpids": ["IPv6"]}}, {"ISIS_DynamicHostnameTlv": {"hostname": hostname.encode('hex')}}, {"ISIS_IpInterfaceAddressTlv": {"len": 4, "addresses": [str(ipv4_address)]}}, {"ISIS_GenericTlv": {"type": 134, "val": str(ipv4_address.packed)}}, {"ISIS_ExtendedIsReachabilityTlv": { "neighbours": [{ "ISIS_ExtendedIsNeighbourEntry": { "metric": 10, "neighbourid": neighbor_id}}]}}]}}) self.class_logger.debug('Send IS-IS Hello messages from "{}" port ' 'with system ID "{}"'.format(tg_port, system_id)) # Set stream with hello packets to send them continuously every 5 seconds hello_stream = self.set_stream(isis_hello, inter=5, continuous=True, adjust_size=False, iface=tg_port) # Start sniff to get IS-IS LSP packets after IS-IS Hello is sent self.start_sniff([tg_port], filter_layer="IS-IS_LSP1", sniffing_time=wait_for_packets) # Send stream with IS-IS Hello Packets self.start_streams([hello_stream]) # Get sniffed packets data = self.stop_sniff([tg_port]) # If IS-IS packets are sniffed, get LSP entries from them for confirmation using CSNP lsp_dict = {} for lsp_packet in data[tg_port]: lsp_entry = {'ISIS_LspEntry': {'lspid': lsp_packet.lspid, 'lifetime': lsp_packet.lifetime, 'seqnum': lsp_packet.seqnum, 'checksum': lsp_packet.checksum}} entry_in_dict = lsp_dict.setdefault(lsp_packet.lspid, lsp_entry) if lsp_entry['ISIS_LspEntry']['seqnum'] > entry_in_dict['ISIS_LspEntry']['seqnum']: lsp_dict[lsp_packet.lspid] = lsp_entry # Raise exception if no IS-IS LSPs were received if not lsp_dict: raise TGException("No LSP was received from " "the device during {} seconds.".format(wait_for_packets)) self.class_logger.debug("Clarify received IS-IS LSPs by IS-IS CSNP.") # Prepare IS-IS CSNP packet for LSPs confirmation and set stream isis_csnp = ({"Dot3": {"dst": all_is_mac_address, "src": mac_address}}, {"LLC": {"dsap": 254, "ssap": 254, "ctrl": 3}}, {"ISIS_CommonHdr": {}}, {"ISIS_L1_CSNP": {"sourceid": system_id + "-00", "tlvs": [{"ISIS_LspEntryTlv": {"entries": list(lsp_dict.values())}}]}}) csnp_stream = self.set_stream(isis_csnp, count=1, adjust_size=False, iface=tg_port) # Send CSNP and LSP streams self.send_stream(csnp_stream) # Prepare ISIS LSP's from emulated hosts host_streams = [] neigh_id_n = [] for ix in range(1, isis_emul_hosts): if ix % MAX_ISISLSP_NEIGHBOR_TLV == 1: neigh_id_n = [] isis_lsp_host1 = deepcopy(isis_lsp_host) isis_lsp_host1[3]["ISIS_L1_LSP"]["lspid"] = "{0}.00-{1:02}".format( EUI(mac_address, dialect=mac_cisco), (ix - 1) / MAX_ISISLSP_NEIGHBOR_TLV) neigh_id_n.append( {"ISIS_ExtendedIsNeighbourEntry": { "metric": 11, "neighbourid": str(EUI((EUI(mac_address).value + ix), dialect=mac_cisco)) + ".00"}}) if ix % MAX_ISISLSP_NEIGHBOR_TLV == 0 or ix == isis_emul_hosts - 1: isis_lsp_host1[3]["ISIS_L1_LSP"]["tlvs"][5]["ISIS_ExtendedIsReachabilityTlv"]["neighbours"].extend(neigh_id_n) host_streams.append( self.set_stream(isis_lsp_host1, count=1, adjust_size=False, iface=tg_port)) isis_lsp_ep = deepcopy(isis_lsp_host) lsp_tlv = isis_lsp_ep[3]["ISIS_L1_LSP"]["tlvs"] lsp_tlv[2]["ISIS_DynamicHostnameTlv"]["hostname"] = ("isis_host_{}".format(1)).encode('hex') lsp_tlv[3]["ISIS_IpInterfaceAddressTlv"]["addresses"] = str(ipv4_address + 1) lsp_tlv[4]["ISIS_GenericTlv"]["val"] = (ipv4_address + 1).packed lsp_tlv[5]["ISIS_ExtendedIsReachabilityTlv"]["neighbours"][0]["ISIS_ExtendedIsNeighbourEntry"]["neighbourid"] = system_id + ".00" isis_lsp_ep_stream = self.set_stream(isis_lsp_ep, count=isis_emul_hosts - 1, inter=ipgap, adjust_size=False, iface=tg_port, isis_lspid_increment=(1, isis_emul_hosts - 1)) for stream in host_streams: self.send_stream(stream) self.send_stream(isis_lsp_ep_stream) self.class_logger.info('IS-IS neighbor with system ID "{}" emulated ' 'on port "{}"'.format(system_id, tg_port)) return_values = {'area_id': area_id, 'mac_address': mac_address, 'system_id': system_id, 'ipv4_address': ipv4_address, 'ipv6_address': ipv6_address, 'hostname': hostname} return return_values
[docs] def isis_packets_sending(self, env, tg_port, switch_id_port, mac_address=None, ipgap=1.0, isis_nodes_cnt=5, mtu=1500): """Emulation of IS-IS DCRP node on specified TG's port. Args: env(Environment): environment data tg_port(str): Name of TG port for emulation switch_id_port(tuple(str, str)): Switch id and port id to set port UP mac_address(str): Source MAC address for IS-IS PDUs ipgap(int): rate of ISIS LSP ID's packets to emulate isis_nodes_cnt(int): Number of ISIS DCRP nodes to emulate mtu(int): MTU of Returns: dict: IS-IS neighbor parameters that are propagated in IS-IS packets e.g. MAC address or area ID. Notes: The second IS-IS neighbor emulation on other port leads to link state changes and updates (LSPs) sent to already existing neighbors. LSPs require confirmations. The correct way seems to be running an interactive sniffer in separate thread and confirm every received LSP by an appropriate PSNP/CSNP. But current IXIA TAF implementation doesn't support threading, so for now, in terms of compatibility, this approach isn't implemented. """ cmd = "match -f 5555 -p 30001 get_rules table" sw_neigh = {} if mac_address is None: mac_address = "02:aa:aa:aa:10:00" try: switch_id = switch_id_port[0] sw_port_id = int(switch_id_port[1]) except KeyError: raise UIException("Cannot get switch port id") sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == switch_id] if not sw_instance: raise UIException("Not found switch id and port pair connection in configuration") # prepare ports to emulate ISIS neighbors env.tg[1].connect_port(tg_port) if sw_port_id not in env.switch[1].mesh_ports[switch_id]: raise UIException("Port {} is not configured as mesh for " "switch {}".format(sw_port_id, sw_instance[0].name)) sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up') sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up") # number ISIS nodes to emulate nodes_cnt = isis_nodes_cnt - len(list(env.switch[1].node.values())) self.emulate_isis_neighbor_on_port(tg_port, mac_address=mac_address, mtu=mtu, isis_emul_hosts=nodes_cnt, ipgap=ipgap) # wait till HW tables 4, 5, 6 are populated time.sleep(5) for switch in env.switch[1].node.values(): for table in [4, 5, 6]: output = switch.ui.cli_send_command('{} {}'.format(cmd, table)).stdout sw_neigh[switch] = {table: len(re.findall(r'^table\s+:\s+{}'.format(table), output, re.MULTILINE))} return sw_neigh
[docs] def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None): """Prepare ports, packets for table 9 related tests. """ # Configure ARP packet and stream if arp_packet: srcmac = arp_packet[0]['Ether']['src'] arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded)) srcip = arp_packet[1]['ARP']['psrc'] arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset) else: raise UIException("ARP packet not supplied") # set admin status of host and switch to Up, wait till operational status is up sw_port_id = int(sw_port.split()[1]) sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]] sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == sw_port.split()[0]] if not sw_instance: raise UIException("Not found switch id and port pair connection in configuration") env.tg[1].connect_port(tg_port) sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up') sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up") # Prepare and send stream arp_stream = env.tg[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port, arp_sa_increment=(1, packet_num + offset), arp_sip_increment=(1, packet_num + offset)) env.tg[1].start_streams([arp_stream]) time.sleep(ipgap * packet_num) env.tg[1].stop_streams([arp_stream]) return sw_instances
[docs] def arp_packets_sending(self, env, tg_port, sw_port, packet_num, ipgap, arp_packet=None, offset=0, retry=False): cmd = "match -f 5555 -p 30001 get_rules table 9" sw_t9 = {} sw_instances = self.table_9_test_preparation(env, tg_port, sw_port, ipgap=ipgap, packet_num=packet_num, arp_packet=arp_packet, offset=offset) # wait for table 9 to be populated time.sleep(5) # check table 9 entries assert sw_instances, "No end point ports found connected to the host" for switch in sw_instances: output = switch.ui.cli_send_command(cmd).stdout sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)) if not retry: return sw_t9 failed = False for switch, count in sw_t9.items(): if count != packet_num: failed = True break if not failed: return sw_t9 # in case not all entries are learned fast, try add them again arp_packet = deepcopy(arp_packet) srcmac = arp_packet[0]['Ether']['src'] srcip = arp_packet[1]['ARP']['psrc'] count = 2 while failed and count: count -= 1 self.class_logger.info("Adding not learned MAC addresses to the table 9") sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]] for switch in sw_instances: output = switch.ui.cli_send_command("match -f 5555 -p 30001 get_rules table 9").stdout learned = re.findall(r'ethernet.dst_mac\s=\s((?:[\w]{2}:){5}[\w]{2})', output, re.MULTILINE) to_add = [el for el in [str(EUI(EUI(srcmac).value + ix, dialect=mac_unix_expanded)) for ix in range(packet_num)] if el not in learned] self.class_logger.info("{} entries are missing for switch {}".format(len(to_add), switch.name)) for mac in to_add: diff = EUI(mac).value - EUI(srcmac).value arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + diff, dialect=mac_unix_expanded)) arp_packet[1]['ARP']['psrc'] = str(IPAddress(IPAddress(srcip).value + diff)) # Prepare and send stream arp_stream = env.tg[1].set_stream(arp_packet, count=1, iface=tg_port) env.tg[1].send_stream(arp_stream) self.class_logger.info("Verify table#9 entries") for switch in sw_instances: output = switch.ui.cli_send_command(cmd).stdout if packet_num == len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)): failed = False else: failed = True time.sleep(3) break for switch in sw_instances: output = switch.ui.cli_send_command(cmd).stdout sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)) return sw_t9
[docs] def check_traffic(self, env, packets, ports, tg_indexes, rate=10, bi_dir=True, time_run=30): """Send traffic: packet_1, packet_2 between port_1 and port_2, with tg indexes: ix_1, ix_2. """ packet_1, packet_2 = packets port_1, port_2 = ports ix_1, ix_2 = tg_indexes mac_1 = packet_1[0]['Ether']['src'] mac_2 = packet_2[0]['Ether']['src'] stream_1 = env.tg[ix_1].set_stream(packet_1, rate=rate, iface=port_1, continuous=True) env.tg[ix_1].clear_statistics([port_1, ]) stream_2 = env.tg[ix_2].set_stream(packet_2, rate=rate, iface=port_2, continuous=True) env.tg[ix_2].clear_statistics([port_2, ]) self.class_logger.info("Start sending traffic from both end points") env.tg[ix_2].start_sniff([port_2, ], src_filter=mac_1, dst_filter=mac_2) if bi_dir: env.tg[ix_1].start_sniff([port_1, ], src_filter=mac_2, dst_filter=mac_1) env.tg[ix_1].start_streams([stream_1, ]) if bi_dir: env.tg[ix_2].start_streams([stream_2, ]) time.sleep(time_run) env.tg[ix_1].stop_streams([stream_1, ]) if bi_dir: env.tg[ix_2].stop_streams([stream_2, ]) time.sleep(4) env.tg[ix_2].stop_sniff([port_2, ], drop_packets=True) if bi_dir: env.tg[ix_1].stop_sniff([port_1, ], drop_packets=True) time.sleep(4) self.class_logger.info("Stop sending traffic check statistic") sent_count_stream_1 = env.tg[ix_1].get_sent_frames_count(port_1, ) get_count_stream_1 = env.tg[ix_2].get_filtered_frames_count(port_2, ) self.class_logger.info("Received packets for stream#1: {}".format(get_count_stream_1)) assert sent_count_stream_1 == get_count_stream_1, "Packets are lost for stream #1" if bi_dir: sent_count_stream_2 = env.tg[ix_2].get_sent_frames_count(port_2, ) get_count_stream_2 = env.tg[ix_1].get_filtered_frames_count(port_1, ) self.class_logger.info("Received packets for stream#2: {}".format(get_count_stream_2)) assert sent_count_stream_2 == get_count_stream_2, "Packets are lost for stream #2"