Source code for taf.testlib.multiple_tg

# Copyright (c) 2015 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``pytest_onsenv.py``

`Multiple traffic generator specific functionality`

"""
from collections import namedtuple

from . import loggers
from .tg_template import GenericTG
from .packet_processor import PacketProcessor


DEFAULT_SPEED = 10000


Port = namedtuple('Port', 'tg, port')


[docs]class MultipleTG(PacketProcessor, GenericTG): """Class for general TG instance combined with multiple different TGs. """ class_logger = loggers.ClassLogger()
[docs] def __init__(self, traffic_generators, config, opts): """Initialize RemoteMultiHostTG class. Args: traffic_generators(dict): Dictionary with TG instances in format {id:tg_instance} config(dict): Configuration information. opts(OptionParser): py.test config.option object which contains all py.test cli options. """ super(MultipleTG, self).__init__(config, opts) # TG instances self.tgs = {x.id: x for x in traffic_generators.values()} # Get ports and port lists # For ports use namedtuple(tg.id, port.id) self.ports, self.port_list = self._get_speed_ports() self.streams = [] # Indicates if TG object supports high level protocol emulation (can emulate dialogs). self.is_protocol_emulation_present = all(x.is_protocol_emulation_present for x in self.tgs.values())
[docs] def _get_speed_ports(self): """Get ports with speed from TG instances. Returns: tuple(list[tuple], list[tuple, int]): Tuple with list of ports used in real config and list of port/speed values """ ports = [] ports_list = [] if any(x.port_list for x in self.tgs.values()): for tg in self.tgs.values(): if tg.port_list: ports_list.extend([[Port(tg.id, _port[0]), _port[1]] for _port in tg.port_list]) else: ports_list.extend([[Port(tg.id, _port), DEFAULT_SPEED] for _port in tg.ports]) ports = [_port[0] for _port in ports_list] else: ports = [Port(x.id, port) for x in self.tgs.values() for port in x.ports] return ports, ports_list
[docs] def get_tg_port_map(self, ifaces): """Return ports related to specific TG. Args: ifaces(list(tuple)): list of interfaces in format (tg_id, port_id) Returns: dict: dictionary in format {'host id': [port ids]} """ iface_map = {} for iface in ifaces: iface_map.setdefault(iface.tg, []).append(iface.port) return iface_map
[docs] def get_port_id(self, tg_id, port_id): """Return port's sequence number in list of ports. Args: tg_id(int): TG instance ID port_id(int): TG instance port's sequence number Raises: ValueError: in case expected port is not in list of ports Returns: int: Port sequence number in list of ports starting from 1 """ port_name = self.tgs[tg_id].ports[port_id - 1] return self.ports.index(Port(tg_id, port_name)) + 1
[docs] def start(self, wait_on=True): """Start TG instances. """ for tg in self.tgs.values(): tg.start() self.status = all(x.status for x in self.tgs.values()) # pylint: disable=attribute-defined-outside-init
[docs] def stop(self): """Shutdown TG instances. """ for tg in self.tgs.values(): tg.stop()
[docs] def create(self): """Start TG instances or get running ones. """ for tg in self.tgs.values(): tg.create()
[docs] def destroy(self): """Stop or release TG instances. """ for tg in self.tgs.values(): tg.destroy()
[docs] def cleanup(self, *args, **kwargs): """Cleanup TG instances. """ self.streams = [] for tg in self.tgs.values(): tg.cleanup()
[docs] def check(self): """Check TG instances. """ for tg in self.tgs.values(): tg.check()
[docs] def sanitize(self): """Perform any necessary operations to leave environment in normal state. """ self.streams = [] for tg in self.tgs.values(): tg.sanitize()
[docs] def stop_sniff(self, *args, **kwargs): """Collect sniffed data from all TG instances. """ iface_map = self.get_tg_port_map(*args) data_hosts = {} for tg, ifaces in iface_map.items(): data_hosts[tg] = self.tgs[tg].stop_sniff(ifaces, **kwargs) data = {} for tg, ifaces in data_hosts.items(): for iface in ifaces: data['{} {}'.format(tg, iface)] = data_hosts[tg][iface] return data
[docs] def connect_port(self, iface): """Simulate port link connecting (set it to admin up etc). Args: iface(str): Interface to connect. Raises: NotImplementedError: not implemented Returns: None or raise and exception. """ self.tgs[iface.tg].connect_port(iface.port)
[docs] def disconnect_port(self, iface): """Simulate port link disconnecting (set it to admin down etc). Args: iface(str): Interface to disconnect. Raises: NotImplementedError: not implemented Returns: None or raise and exception. """ self.tgs[iface.tg].disconnect_port(iface.port)
[docs] def clear_streams(self): """Stop and remove all streams. """ self.streams = [] for tg in self.tgs.values(): tg.clear_streams()
[docs] def set_stream(self, *args, **kwargs): """Set traffic stream with specified parameters on specified TG port. Returns: int: stream id Notes: It's not expected to configure a lot of incrementation options. Different traffic generator could have different limitations for these options. Examples:: stream_id_1 = tg.set_stream(pack_ip, count=100, iface=iface) stream_id_2 = tg.set_stream(pack_ip, continuous=True, inter=0.1, iface=iface) stream_id_3 = tg.set_stream(pack_ip_udp, count=5, protocol_increment=(3, 5), iface=iface) stream_id_4 = tg.set_stream(pack_ip_udp, count=18, sip_increment=(3, 3), dip_increment=(3, 3), iface=iface, udf_dependancies={'sip_increment': 'dip_increment'}) """ tg, kwargs['iface'] = kwargs['iface'] stream_id = self.tgs[tg].set_stream(*args, **kwargs) tg_stream_id = Port(tg, stream_id) self.streams.append(tg_stream_id) return tg_stream_id
[docs] def send_stream(self, stream_id, **kwargs): """Sends the stream created by 'set_stream' method. Args: stream_id(int): ID of the stream to be send. Returns: float: timestamp. """ tg, stream = stream_id self.tgs[tg].send_stream(stream, **kwargs)
[docs] def start_streams(self, stream_list): """Enable and start streams from the list simultaneously. Args: stream_list(list[int]): List of stream IDs. Returns: None """ stream_map = self.get_tg_port_map(stream_list) for tg, streams in stream_map.items(): self.tgs[tg].start_streams(streams)
[docs] def stop_streams(self, stream_list=None): """ Disable streams from the list. Args: stream_list(list[int]): Stream IDs to stop. In case stream_list is not set all running streams will be stopped. Returns: None """ stream_map = self.get_tg_port_map(stream_list) for tg, streams in stream_map.items(): self.tgs[tg].stop_streams(streams)
[docs] def start_sniff(self, ifaces, **kwargs): """Starts sniffing on specified interfaces. Args: ifaces(list): List of TG interfaces for capturing. kwargs(dict): Possible parameters to configure. Returns: None Notes: This method introduces additional 1.5 seconds timeout after capture enabling. It's required by Ixia sniffer to wait until capturing is started. Examples:: env.tg[1].start_sniff(['eth0', ], filter_layer='IP', src_filter='00:00:00:01:01:01', dst_filter='00:00:00:22:22:22') """ iface_map = self.get_tg_port_map(ifaces) for tg, ports in iface_map.items(): self.tgs[tg].start_sniff(ports, **kwargs)
[docs] def clear_statistics(self, sniff_port_list): """Clearing statistics on TG ports. """ iface_map = self.get_tg_port_map(sniff_port_list) for tg, ports in iface_map.items(): self.tgs[tg].clear_statistics(ports)
[docs] def get_received_frames_count(self, iface): """Read statistics - framesReceived. """ return self.tgs[iface.tg].get_received_frames_count(iface.port)
[docs] def get_filtered_frames_count(self, iface): """Read statistics - filtered frames received. """ return self.tgs[iface.tg].get_filtered_frames_count(iface.port)
[docs] def get_uds_3_frames_count(self, iface): """Read statistics - UDS3 - Capture Trigger (UDS3) - count of non-filtered received packets (valid and invalid). """ return self.tgs[iface.tg].get_uds_3_frames_count(iface.port)
[docs] def clear_received_statistics(self, iface): """Clear statistics. """ return self.tgs[iface.tg].clear_received_statistics(iface.port)
[docs] def get_sent_frames_count(self, iface): """Read statistics - framesSent. """ return self.tgs[iface.tg].get_sent_frames_count(iface.port)
[docs] def get_port_txrate(self, iface): """Get port Tx rate. """ return self.tgs[iface.tg].get_port_txrate(iface.port)
[docs] def get_port_rxrate(self, iface): """Get port Rx rate. """ return self.tgs[iface.tg].get_port_rxrate(iface.port)
[docs] def get_port_qos_rxrate(self, iface, qos): """Get ports Rx rate for specific qos. """ return self.tgs[iface.tg].get_port_qos_rxrate(iface.port, qos)
[docs] def get_qos_frames_count(self, iface, prio): """Get QoS packets count. """ return self.tgs[iface.tg].get_qos_frames_count(iface.port, prio)
[docs] def set_qos_stat_type(self, iface, ptype): """Set QoS stats type. """ return self.tgs[iface.tg].set_qos_stat_type(iface.port, ptype)
[docs] def set_flow_control(self, iface, mode): """Set Flow Control. """ return self.tgs[iface.tg].set_flow_control(iface.port, mode)
[docs] def get_os_mtu(self, iface=None): """Get MTU value in host OS. Args: iface(str): Interface for getting MTU in host OS Returns: int: Original MTU value Examples:: env.tg[1].get_os_mtu(iface=ports[('tg1', 'sw1')][1]) """ return self.tgs[iface.tg].get_os_mtu(iface.port)
[docs] def set_os_mtu(self, iface=None, mtu=None): """Set MTU value in host OS. Args: iface(str): Interface for changing MTU in host OS mtu(int): New MTU value Returns: int: Original MTU value Examples:: env.tg[1].set_os_mtu(iface=ports[('tg1', 'sw1')][1], mtu=1650) """ return self.tgs[iface.tg].set_os_mtu(iface.port, mtu)