Source code for taf.testlib.packet_processor

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""``packet_processor.py``

`Packet processor specific functionality`

"""

import struct
import codecs
import operator
from functools import reduce
from contextlib import suppress
from collections import OrderedDict

import pytest
import pypacker
from pypacker.layer3 import ip
from pypacker.layer12 import ethernet

from . import loggers
from .custom_exceptions import PypackerException


[docs]class PacketProcessor(object): """This class contents only one method to build packet from tuple of dictionaries. """ class_logger = loggers.ClassLogger() inner_classes = {"Echo", "Unreach", "Redirect", "Error", "TooBig", "TimeExceed", "ParamProb"} lldp_lacp_tlvs = {"LLDPGeneric", "LLDPDUEnd", "LLDPChassisId", "LLDPPortId", "LLDPTTL", "LLDPPortDescription", "LLDPSystemName", "LLDPSystemDescription", "LLDPSystemCapabilities", "LLDPManagementAddress", "LLDPOrgSpecGeneric", "LLDPDot1PortVlanId", "DCBXCongestionNotification", "DCBXConfiguration", "DCBXRecommendation", "DCBXPriorityBasedFlowControlConfiguration", "DCBXApplicationPriority", "DCBXApplicationPriorityTable", "LACPActorInfoTlv", "LACPPartnerInfoTlv", "LACPCollectorInfoTlv", "LACPTerminatorTlv", "LACPReserved", } flt_patterns = { "ARP": {'ptrn1': ["08 06", "00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'arp', None) and not x.vlan}, "notARP": {'ptrn1': ["08 06", "00 00", "12"], 'mt1': "matchUser", 'cfp': "notPattern1", 'lfilter': lambda x: not getattr(x, 'arp', None) or x.vlan}, "Dot1Q.ARP": {'ptrn1': ["81 00 00 00 08 06", "00 00 FF FF 00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'arp', None) and x.vlan}, "Dot1Q": {'ptrn1': None, 'mt1': "matchVlan", 'cfp': "pattern1", 'lfilter': lambda x: x.vlan != []}, "IP": {'ptrn1': ["08 00", "00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and not x.vlan}, "IPv6": {'ptrn1': ["86 dd", "00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip6', None) and not x.vlan}, "notIP": {'ptrn1': ["08 00", "00 00", "12"], 'mt1': "matchUser", 'cfp': "notPattern1", 'lfilter': lambda x: not getattr(x, 'ip', None) or x.vlan}, "Dot1Q.IP": {'ptrn1': ["81 00 00 00 08 00", "00 00 FF FF 00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and x.vlan}, "Dot1Q.IPv6": {'ptrn1': ["81 00 00 00 86 dd", "00 00 FF FF 00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip6', None) and x.vlan}, "STP": {'ptrn1': ["42 42 03 00 00", "00 00 00 00 00", "14"], 'mt1': "matchUser", 'cfp': "pattern1"}, "LLDP": {'ptrn1': ["01 80 c2 00 00 0e 00 00 00 00 00 00 88 cc", "00 00 00 00 00 00 FF FF FF FF FF FF 00 00", "0"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'lldp', None) and not x.vlan}, "notSTP": {'ptrn1': ["42 42 03 00 00", "00 00 00 00 00", "14"], 'mt1': "matchUser", 'cfp': "notPattern1"}, "TCP": {'ptrn1': ["08 00 00 00 00 00 00 00 00 00 00 06", "00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and getattr(x.ip, 'tcp', None) and not x.vlan}, "Dot1Q.TCP": {'ptrn1': ["81 00 00 00 08 00 00 00 00 00 00 00 00 00 00 06", "00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and getattr(x.ip, 'tcp', None) and x.vlan}, "UDP": {'ptrn1': ["08 00 00 00 00 00 00 00 00 00 00 11", "00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and getattr(x.ip, 'udp', None) and not x.vlan}, "notUDP": {'ptrn1': ["08 00 00 00 00 00 00 00 00 00 00 11", "00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "notPattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and not getattr(x.ip, 'udp', None) and x.vlan}, "Dot1Q.UDP": {'ptrn1': ["81 00 00 00 08 00 00 00 00 00 00 00 00 00 00 11", "00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and getattr(x.ip, 'udp', None) and x.vlan}, "ICMP": {'ptrn1': ["08 00 00 00 00 00 00 00 00 00 00 01", "00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and getattr(x.ip, 'icmp', None) and not x.vlan}, "ICMPv6": {'ptrn1': ["86 dd 00 00 00 00 00 00 00 00 00 01", "00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip6', None) and getattr(x.ip, 'icmp6', None) and not x.vlan}, "Dot1Q.ICMP": {'ptrn1': ["81 00 00 00 08 00 00 00 00 00 00 00 00 00 00 01", "00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip', None) and getattr(x.ip, 'icmp', None) and x.vlan}, "Dot1Q.ICMPv6": {'ptrn1': ["81 00 00 00 86 dd 00 00 00 00 00 00 00 00 00 01", "00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1", 'lfilter': lambda x: getattr(x, 'ip6', None) and getattr(x.ip, 'icmp6', None) and x.vlan}, "PAUSE": {'ptrn1': ["88 08", "00 00", "12"], 'mt1': "matchUser", 'cfp': "pattern1"}, "BOOTP": {'ptrn1': ["08 00", "00 00", "42"], 'mt1': "matchUser", 'cfp': "pattern1"}, }
[docs] def _build_trex_packet(self, packet_definition, adjust_size=True, required_size=64): """Builds trex packet based on provided packet definition. Args: packet_definition(tuple(dict)): Packet representation (tuple of dictionaries of dictionaries) adjust_size(bool): If set to True packet size will be increased to 60 bytes (CRC not included) required_size(int): Size (in bytes) of the result packet Returns: trex.Packet: trex.packet or list of pypacker packets (if fragsize defined) Examples:: packet_definition = ({"Ether": {"dst": "00:80:C2:00:00:00", "src": "00:00:00:00:00:02"}}, {"Dot1Q": {"vlan": 4}}, {"IP": {}}) packet=env.tg[1]._build_trex_packet(packet_definition) """ import trex_stl_lib.api as TApi def _value_repr(value): """Check if value contains layers. """ if isinstance(value, (list, tuple)): return type(value)(list(map(_value_repr, value))) elif isinstance(value, dict): return _trex_layer(value) else: return value def _trex_layer(layer_dict): """Return trex Packet object built according to definition. """ layer_name = list(layer_dict.keys())[0] sl = getattr(TApi, layer_name)() field_list = [_fl.name for _fl in sl.fields_desc] fields = layer_dict[layer_name] list([setattr(sl, f, _value_repr(fields[f])) for f in [f for f in field_list if f in fields]]) return sl # Converting packet_definition to trex.Packet. packet = reduce(lambda a, b: a / b, list(map(_trex_layer, packet_definition))) # Adjust packet size with padding. if adjust_size: packet_length = len(packet) if packet_length < required_size: packet.add_payload(b"\x00" * (required_size - packet_length)) elif packet_length > required_size and required_size < 60: self.class_logger.warning("required_size is less than actual size. Packet will be cut off") packet_string = bytes(packet) packet_string = packet_string[0:required_size] packet = TApi.Ether(packet_string) return packet
[docs] def _get_pypacker_layer(self, layer): """Get Pypacker protocol object. Args: layer(str): Protocol name e.g IP, ICMP.Echo Returns: pypacker.Packet: return Pypacker object Raises: PypackerException: Pypacker library does not support protocol """ if '.' in layer: # Handle inner pypacker class e.g. icmp.ICMP.Echo layer, inner_layer = layer.split('.') pypacker_layer = self._get_pypacker_layer(layer) return getattr(pypacker_layer, inner_layer) elif layer in ["Ethernet", "ARP", "LLC", "STP", "LLDP", "LACP"]: return getattr(getattr(pypacker.layer12, layer.lower()), layer) elif layer in ["IP", "IP6", "ICMP", "IGMP"]: return getattr(getattr(pypacker.layer3, layer.lower()), layer) elif layer in ["TCP", "UDP"]: return getattr(getattr(pypacker.layer4, layer.lower()), layer) elif layer == "FlowControl": return getattr(getattr(pypacker.layer12, "flow_control"), layer) elif layer in ["Pause", "PFC"]: return getattr(getattr(getattr(pypacker.layer12, "flow_control"), "FlowControl"), layer) elif layer.startswith("LLDP") or layer.startswith("DCBX"): return getattr(getattr(pypacker.layer12, "lldp"), layer) elif layer.startswith("LACP"): return getattr(getattr(pypacker.layer12, "lacp"), layer) else: raise PypackerException("Pypacker does not support protocol {0}".format(layer))
[docs] @staticmethod def _get_pypacker_layer_fields(packet): """Get packet fields. Args: packet(pypacker.Packet): pypacker packet Returns: set: set of packet fields """ # Get header fields fields = {field.strip('_') for field in packet._header_field_names} # Get subfields that value is less than 1 byte sub_fields = {f for f, v in packet.__class__.__dict__.items() if isinstance(v, property)} return fields.union(sub_fields)
[docs] def _build_pypacker_packet(self, packet_definition, adjust_size=True, required_size=64): """Builds pypacker packet based on provided packet definition. Args: packet_definition(tuple(dict)): Packet representation (tuple of dictionaries of dictionaries) adjust_size(bool): If set to True packet size will be increased to 64 bytes for Pypacker TG (CRC is included) and to 60 bytes for Ixia TG without CRC. Otherwise Ixia TG will add 4 bytes(CRC), Pypacker TG will not add CRC. required_size(int): Size (in bytes) of the result packet Returns: pypacker.Packet: pypacker.packet or list of pypacker packets (if fragsize defined) Examples:: packet_definition = ({"Ethernet": {"dst": "00:80:C2:00:00:00", "src": "00:00:00:00:00:02"}}, {"Dot1Q": {"vid": 4}}, {"IP": {}}) packet=env.tg[1]._build_pypacker_packet(packet_definition) """ def _pypacker_layer(layer_dict): """Return pypacker Packet object built according to definition. """ layer_name = next(iter(layer_dict)) try: pypacker_layer = self._get_pypacker_layer(layer_name) except PypackerException: # Skip undefined layers e.g. Dot1Q return None packet_layer = pypacker_layer() for field, value in layer_dict[layer_name].items(): if getattr(packet_layer, '{0}_s'.format(field), None): setattr(packet_layer, '{0}_s'.format(field), value) else: setattr(packet_layer, field, value) return packet_layer # Converting packet_definition to pypacker.Packet. packet = reduce(lambda a, b: a + b, map(_pypacker_layer, packet_definition)) # Handle Dot1Q layer and IP options in packet_definition first = operator.itemgetter(0) for layer in packet_definition: with suppress(KeyError): dot1q_definition = layer["Dot1Q"] pypacker_vlan = struct.pack("!H", pypacker.layer12.ethernet.ETH_TYPE_8021Q) + \ struct.pack("!H", dot1q_definition["vlan"]) packet.vlan = pypacker_vlan with suppress(KeyError): opts = layer["IP"]["opts"] packet.ip.opts = [ip.IPOptMulti(**opt) for opt in opts] with suppress(KeyError): tlvlist_def = layer["LLDP"]["tlvlist"] packet.lldp.tlvlist = [] for tlv_dict in tlvlist_def: tlv_type = next(iter(tlv_dict)) packet.lldp.tlvlist.append(_pypacker_layer( {tlv_type: OrderedDict(sorted(tlv_dict[tlv_type].items(), key=first))}, )) with suppress(KeyError): # Inner list of DCBXApplicationPriorityTable apppriotables = tlv_dict['DCBXApplicationPriority']['apppriotable'] app_class_name = "DCBXApplicationPriorityTable" app_tables_list = [] for app_table in apppriotables: app_tables_list.append(_pypacker_layer( {app_class_name: OrderedDict(sorted(app_table[app_class_name].items(), key=first))} )) packet.lldp.tlvlist[-1].apppriotable = app_tables_list with suppress(KeyError): tlvlist_def = layer["LACP"]["tlvlist"] packet.lacp.tlvlist = [] for tlv_dict in tlvlist_def: tlv_type = next(iter(tlv_dict)) packet.lacp.tlvlist.append(_pypacker_layer({tlv_type: OrderedDict(sorted(tlv_dict[tlv_type].items(), key=first))})) # Adjust packet size with padding. if adjust_size: packet_length = len(packet) if packet_length < required_size: loss_bytes = required_size - packet_length try: packet.padding = b"\x00" * (loss_bytes + len(packet.padding)) except AttributeError: packet.upper_layer.body_bytes = b"\x00" * loss_bytes # packet length cannot be less 14 bytes for Ethernet layer due to pypacker behavior elif packet_length > required_size and required_size < 60: self.class_logger.warning("required_size is less than actual size. Packet will be cut off") packet_string = packet.bin() packet_string = packet_string[:required_size] packet = ethernet.Ethernet(packet_string) return packet
[docs] def check_packet_field(self, packet=None, layer=None, field=None, value=None): """Check if specified field is present (for specified layer) and checks if field value matches specified value. Args: packet(pypacker.Packet): Packet to analyze layer(str): Layer to analyze field(str): Field to look for value(str, int): Filed value to compare (may be different types, depending on field) Returns: bool: True or False Examples:: assert check_packet_field(packet=pypacker_packet, layer="Dot1Q", field="prio", value=4) assert check_packet_field(packet=pypacker_packet, layer="Dot1Q", field="type") """ try: packet_value = self.get_packet_field(packet, layer, field) return value is None or packet_value == value except PypackerException: return False
[docs] def get_packet_field(self, packet=None, layer=None, field=None): """Returns field value (for specified layer) from specified packet. Args: packet(pypacker.Packet): Packet to analyze layer(str): Layer to analyze field(str): Field to look for Raises: PypackerException: unknown layer or field Returns: int, str: value (may be different types, depending on field) Examples:: value = get_packet_field(packet=pypacker_packet, layer="Dot1Q", field="vlan") """ tag_id = {"S-Dot1Q": 0, "C-Dot1Q": 1} try: with suppress(KeyError): layer_tag = tag_id[layer] vlan_tags = self.get_packet_field(packet, "Ethernet", "vlan") try: return getattr(vlan_tags[layer_tag], field) except IndexError: raise PypackerException("VLAN tag is not defined") packet_layer = self.get_packet_layer(packet, layer) if packet_layer is None: message = "Layer {0} is not defined".format(layer) raise PypackerException(message) with suppress(AttributeError): return getattr(packet_layer, '{0}_s'.format(field)) return getattr(packet_layer, field) except AttributeError: message = "Field {0} is not defined in {1}".format(field, layer) raise PypackerException(message)
[docs] def get_packet_layer(self, packet=None, layer=None, output_format="pypacker"): """Returns packet layer in pypacker or hex format. Args: packet(pypacker.Packet): Packet to analyze layer(str): Layer to analyze output_format(str): Output format - "pypacker" or "hex" or "bytes_array" Returns: pypacker.Packet, str: pypacker.Packet or str Examples:: packet_definition = ({"Ethernet": {"dst": "00:80:C2:00:00:00", "src": "00:00:00:00:00:02"}}, {"Dot1Q": {"vlan": 4}}, {"IP": {}}) pypacker_packet = _build_pypacker_packet(packet_definition) ip_layer_hex = get_packet_layer(packet=pypacker_packet, layer="IP", output_format="hex") assert ip_layer_hex[-8:] == '7f000001' """ def search_pypacker_layer_in_packet(packet=None, layer=None): """Search and return packet layer Returns: pypacker.Packet, None: pypacker.Packet for success, None otherwise """ if layer not in self.lldp_lacp_tlvs: pypacker_layer = self._get_pypacker_layer(layer) return packet[pypacker_layer] if layer.startswith("LLDP") or layer.startswith("DCBX"): pypacker_layer = self._get_pypacker_layer("LLDP") elif layer.startswith("LACP"): pypacker_layer = self._get_pypacker_layer("LACP") # Shallow copy is required. In case if copy is not specified tlvlist attribute will be empty for tlv in getattr(packet[pypacker_layer], "tlvlist", [])[:]: if tlv.__class__.__name__ == layer: return tlv return None try: layer = search_pypacker_layer_in_packet(packet, layer) if output_format == "pypacker": return layer hex_repr = codecs.encode(layer.bin(), "hex_codec").decode('utf-8') if output_format == "hex": return hex_repr if output_format == "bytes_array": hexbytes = [] for i in range(0, len(hex_repr), 2): hexbytes.append(int(hex_repr[i:i + 2], 16)) return hexbytes except Exception: return None
[docs] def packet_dictionary(self, packet): """Get packet dictionary from pypacker.Packet. Args: packet(pypacker.Packet): pypacker packet Returns: dict: dictionary created from pypacker.Packet Examples:: p = pypacker.Ethernet(dst="ff:ff:ff:ff:ff:ff", src="90:e6:ba:c3:17:13", type=0x806)/ pypacker.ARP(hwtype=0x1, ptype=0x800, hwlen=6, plen=4, op=1, hwsrc="90:e6:ba:c3:17:13", psrc="172.20.20.175", hwdst="00:00:00:00:00:00", pdst="172.20.20.211")/ pypacker.Padding(load='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') pd = PacketProcessor().packet_dictionary(p) assert pd == ({'Ether': {'src': '90:e6:ba:c3:17:13', 'dst': 'ff:ff:ff:ff:ff:ff', 'type': 2054}}, {'ARP': {'hwdst': '00:00:00:00:00:00', 'ptype': 2048, 'hwtype': 1, 'psrc': '172.20.20.175', 'plen': 4, 'hwlen': 6, 'pdst': '172.20.20.211', 'hwsrc': '90:e6:ba:c3:17:13', 'op': 1}}, {'Padding': {'load': '\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'}}) """ packet_def = [] vlans = [] opts = [] tlvlist = [] for layer in packet: class_name = layer.__class__.__name__ # Handle inner pypacker class e.g. icmp.ICMP.Echo, icmp6.ICMP6.Echo if class_name in self.inner_classes: # get main class ICMP + echo class_name = "{0}.{1}".format(next(iter(layer_dict)), class_name) # NOQA pylint: disable=used-before-assignment fields = self._get_pypacker_layer_fields(layer) layer_dict = {class_name: {}} for field in fields: if field == "vlan" and self.get_packet_field(packet, "Ethernet", "vlan"): for vlan in self.get_packet_field(packet, "Ethernet", "vlan"): vlans.append({"Dot1Q": {f: getattr(vlan, f) for f in self._get_pypacker_layer_fields(vlan)}}) elif field == "opts" and class_name in ["IP", "IP6"]: # Shallow copy is required. In case if copy is not specified opts attribute will be empty pypacker_opts = self.get_packet_field(packet, class_name, "opts")[:] opts_fields = {"len", "type", "body_bytes"} opts.append({f: getattr(opt, f) for opt in pypacker_opts for f in opts_fields}) layer_dict[class_name][field] = opts elif field == "tlvlist" and class_name in ["LLDP", "LACP"]: # Shallow copy is required. In case if copy is not specified tlvlist attribute will be empty for tlv in self.get_packet_field(packet, class_name, field)[:]: tlvlist.append({tlv.__class__.__name__: {f: getattr(tlv, f) for f in self._get_pypacker_layer_fields(tlv)}}) if tlv.__class__.__name__ == "DCBXApplicationPriority": app_tables_list = [] for app_table in tlv.apppriotable: app_tables_list.append( {app_table.__class__.__name__: {f: getattr(app_table, f) for f in self._get_pypacker_layer_fields(app_table)}} ) tlvlist[-1]['DCBXApplicationPriority']['apppriotable'] = app_tables_list layer_dict[class_name][field] = tlvlist elif field != "padding": layer_dict[class_name][field] = self.get_packet_field(layer, class_name, field) packet_def.append(layer_dict) if class_name == "Ethernet" and vlans: packet_def.extend(vlans) return tuple(packet_def)
[docs] def packet_fragment(self, packet, adjust_size=True, required_size=64, fragsize=None): """Method for packet fragmentation. Args: packet(pypacker.Packet): Packet to be fragmented adjust_size(bool): If set to True packet size will be increased to 60 bytes required_size(int): Size (in bytes) of the result packet fragsize(int): length of each fragment Returns: list[pypacker.Packet]: list of pypacker.Packets - fragments of received packet """ pytest.skip("Packet fragmentation is not integrated yet")
[docs] def assemble_fragmented_packets(self, packets): """Method for assembling packets from fragments in packet list. Args: packets(list[pypacker.Packet]): List of fragmened packets Returns: list[pypacker.Packet]: List of assembled packets """ pytest.skip("Packet fragmentation is not integrated yet")