Source code for unittests.traffic_generator.test_pp

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``test_pp.py``

`Packet Processor's unittests`

"""

import pytest

from testlib.custom_exceptions import PypackerException
from .packet_constants import (ETH_IP_ICMP, DOT1Q_IP_ICMP,
                               IP_TCP, PACKET_DEFS, IP_FLAGS,
                               IP4_VERSION_FIELD, IP_TTL, IP_PROTO_TCP)


[docs]class TestPacketProcessor(object):
[docs] def test_check_packet_field_1(self, tg): packet = tg._build_pypacker_packet(ETH_IP_ICMP) verification_params = ((layer, field, value) for p in ETH_IP_ICMP for layer in p for (field, value) in p[layer].items()) for layer, field, value in verification_params: assert tg.check_packet_field(packet=packet, layer=layer, field=field, value=value)
[docs] def test_packet_with_empty_layer(self, tg): """Test building packet with empty layer. """ packet = tg._build_pypacker_packet(PACKET_DEFS[2]) assert tg.get_packet_layer(packet, "IP") is not None
[docs] def test_pproc_packet_field_setting(self, tg): """Check packet field setting. """ fragments = tg.packet_fragment(ETH_IP_ICMP, required_size=200) assert len(fragments) == 1 src = '12:34:56:78:98:76' fragments[0].set_field('Ether', 'src', src) assert src == tg.get_packet_field(fragments[0], 'Ether', 'src') load = 'abcdefg' fragments[0].set_field('Raw', 'load', load) assert load == tg.get_packet_field(fragments[0], 'Raw', 'load')
[docs] def test_assembling_dot1q_icmp_packet_1(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.start_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 1 # Send assembled packet pac = data_1[0] tg.streams[stream_id_1]['packet'] = pac tg.start_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) # Verify that assembled packet has len = 996 assert len(data[iface][0]) == 996
[docs] def test_assembling_dot1q_icmp_packet_2(self, tg): """ Check assembling of Dot1Q.ICMP fragmented packet. Overlapped fragments. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) offset = tg.get_packet_field(tg.streams[stream_id_1]['packet'][2], "IP", "offset") tg.streams[stream_id_1]['packet'][2].ip.offset = offset - 1 tg.start_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 5
[docs] def test_assembling_dot1q_icmp_packet_3(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Missed fragment 1. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'].pop(0) tg.start_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 4
[docs] def test_assembling_dot1q_icmp_packet_4(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Wrong frag in Fragment 1. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'][0].ip.offset = 1 tg.start_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 5
[docs] def test_assembling_dot1q_icmp_packet_5(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Wrong flags in Fragment 1. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'][0].ip.offset = 0 tg.start_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 5
[docs] def test_assembling_dot1q_icmp_packet_6(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Missed last fragment. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'].pop(4) tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 4
[docs] def test_assembling_dot1q_icmp_packet_7(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Wrong frag in last fragment. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'][4].ip.offset = 0 tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 5
[docs] def test_assembling_dot1q_icmp_packet_8(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Wrong flags in last fragment. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'][4].ip.offset = 1 tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 5
[docs] def test_assembling_dot1q_icmp_packet_9(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Missed fragment after 2. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) tg.streams[stream_id_1]['packet'].pop(2) tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 4
[docs] def test_assembling_dot1q_icmp_packet_10(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Duplicate fragment 2. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) pack_2 = tg.streams[stream_id_1]['packet'][2] pack_2.time += 1 tg.streams[stream_id_1]['packet'].append(pack_2) tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 1 # Send assembled packet pac = data_1[0] tg.streams[stream_id_1]['packet'] = pac tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) data = tg.stop_sniff([iface]) # Verify that assembled packet has len = 996 assert len(data[iface][0]) == 996
[docs] def test_assembling_dot1q_icmp_packet_11(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Two packets. """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) stream_id_2 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1200, fragsize=100, iface=iface) for pac in tg.streams[stream_id_2]['packet']: pac.ip.id = 2 tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) tg.send_stream(stream_id_2) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 2 # Send assembled packet pac_1 = data_1[0] pac_2 = data_1[1] tg.streams[stream_id_1]['packet'] = pac_1 tg.streams[stream_id_2]['packet'] = pac_2 tg.stop_sniff([iface], sniffing_time=5, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) tg.send_stream(stream_id_2) data = tg.stop_sniff([iface]) assert len(data[iface][0]) == 996 assert len(data[iface][1]) == 1196
[docs] def test_assembling_dot1q_icmp_packet_12(self, tg): """Check assembling of Dot1Q.ICMP fragmented packet. Four packets (fragmented and not). """ iface = tg.ports[0] stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1000, fragsize=200, iface=iface) stream_id_2 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=1200, fragsize=100, iface=iface) stream_id_3 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=500, iface=iface) stream_id_4 = tg.set_stream(DOT1Q_IP_ICMP, count=1, required_size=800, iface=iface) for pac in tg.streams[stream_id_2]['packet']: pac.ip.id = 2 tg.streams[stream_id_3]['packet'].ip.id = 3 tg.streams[stream_id_4]['packet'].ip.id = 4 tg.stop_sniff([iface], sniffing_time=8, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) tg.send_stream(stream_id_2) tg.send_stream(stream_id_3) tg.send_stream(stream_id_4) data = tg.stop_sniff([iface]) data_1 = tg.assemble_fragmented_packets(data[iface]) # Verify that fragmented packet is assembled assert len(data_1) == 4 # Send assembled packet pac_1 = data_1[0] pac_2 = data_1[1] pac_3 = data_1[2] pac_4 = data_1[3] tg.streams[stream_id_1]['packet'] = pac_1 tg.streams[stream_id_2]['packet'] = pac_2 tg.streams[stream_id_3]['packet'] = pac_3 tg.streams[stream_id_4]['packet'] = pac_4 tg.stop_sniff([iface], sniffing_time=8, filter_layer='Dot1Q.ICMP') tg.send_stream(stream_id_1) tg.send_stream(stream_id_2) tg.send_stream(stream_id_3) tg.send_stream(stream_id_4) data = tg.stop_sniff([iface]) assert len(data[iface][0]) == 996 assert len(data[iface][1]) == 1196 assert len(data[iface][2]) == 496 assert len(data[iface][3]) == 796
[docs] def test_get_packet_field_negative_1(self, tg): """Verify that method get_packet_field returns Error message when layer is not defined in packet(1). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) with pytest.raises(PypackerException) as excepinfo: tg.get_packet_field(packet=packet_pypacker, layer="Ethernet") exception_message = excepinfo.value.parameter # verify expected result result = "Layer Ethernet is not defined" assert exception_message == result
[docs] def test_get_packet_field_negative_2(self, tg): """Verify that method get_packet_field returns Error message when layer is not defined in packet(2). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) with pytest.raises(PypackerException) as excepinfo: tg.get_packet_field(packet=packet_pypacker, layer="Ip") exception_message = excepinfo.value.parameter # verify expected result result = "Layer Ip is not defined" assert exception_message == result
[docs] def test_get_packet_field_negative_3(self, tg): """Verify that method get_packet_field returns Error message when field is not defined in packet(1). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) with pytest.raises(PypackerException) as excepinfo: tg.get_packet_field(packet=packet_pypacker, layer="IP", field="Src") exception_message = excepinfo.value.parameter # verify expected result result = "Field Src is not defined in IP" assert exception_message == result
[docs] def test_get_packet_field_4(self, tg): """Verify that method get_packet_field returns correct value(4). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) flag = tg.get_packet_field(packet=packet_pypacker, layer="IP", field="flags") # verify expected result assert flag == IP_FLAGS
[docs] def test_get_packet_field_1(self, tg): """Verify that method get_packet_field returns correct value(1). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) ttl = tg.get_packet_field(packet=packet_pypacker, layer="IP", field="ttl") # verify expected result assert ttl == IP_TTL
[docs] def test_get_packet_field_2(self, tg): """Verify that method get_packet_field returns correct value(2). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) proto = tg.get_packet_field(packet=packet_pypacker, layer="IP", field="p") # verify expected result assert proto == IP_PROTO_TCP
[docs] def test_get_packet_field_3(self, tg): """Verify that method get_packet_field returns correct value(3). """ packet_pypacker = tg._build_pypacker_packet(IP_TCP) version = tg.get_packet_field(packet=packet_pypacker, layer="IP", field="v") # verify expected result assert version == IP4_VERSION_FIELD
[docs] @pytest.mark.skip("Pypacker does not support") def test_set_field(self, tg): """Verify that method set_field sets correct field value. """ packet = ({"IP": {"src": "20.0.10.2", "dst": "10.10.10.1", "flags": 0}}, {"TCP": {"flags": 1}}) packet_pypacker = tg._build_pypacker_packet(packet) packet_pypacker.set_field('IP', 'src', '10.10.10.10') assert tg.get_packet_field(packet=packet_pypacker, layer="IP", field="src") == '10.10.10.10' # negative scenarious: not existing layer with pytest.raises(AttributeError): packet_pypacker.set_field('Ether', 'src', '11:11:11:11:11:11') assert tg.get_packet_layer(packet, "Ether") is None # negative scenarious: wrong field value with pytest.raises(Exception): packet_pypacker.set_field('IP', 'src', '11:11:11:11:11:11') assert tg.get_packet_field(packet=packet_pypacker, layer="IP", field="src") == '10.10.10.10' # negative scenarious: wrong field name packet_pypacker.set_field('IP', 'hwsrc', '11:11:11:11:11:11') with pytest.raises(PypackerException): tg.get_packet_field(packet=packet_pypacker, layer="IP", field="hwsrc")
[docs] @pytest.mark.skip("Pypacker does not support") def test_get_lcount(self, tg): """Verify that method get_lcount returns correct count of layers. """ packet = ({"Dot1Q": {"vlan": 1}}, {"Dot1Q": {"vlan": 2}}, {"IP": {"src": "20.0.10.2", "dst": "10.10.10.1", "flags": 0}}, {"TCP": {"flags": 1}}) packet_pypacker = tg._build_pypacker_packet(packet) assert packet_pypacker.get_lcount('Dot1Q') == 2 assert packet_pypacker.get_lcount('IP') == 1 assert packet_pypacker.get_lcount('Ether') == 0 # negative scenarious: not existing layer assert packet_pypacker.get_lcount('dot') == 0
[docs] @pytest.mark.skip("Pypacker does not support") def test_get_lfield(self, tg): """Verify that method get_lfield returns correct value. """ packet = ({"Dot1Q": {"vlan": 1}}, {"Dot1Q": {"vlan": 2}}, {"IP": {"src": "20.0.10.2", "dst": "10.10.10.1", "flags": 0}}, {"TCP": {"flags": 1}}) packet_pypacker = tg._build_pypacker_packet(packet) assert packet_pypacker.get_lfield('Dot1Q', 'vlan') == 1 assert packet_pypacker.get_lfield('Dot1Q', 'vlan', l_id=2) == 2 assert packet_pypacker.get_lfield('IP', 'src') == '20.0.10.2' assert packet_pypacker.get_lfield('Ether', 'src') is None # negative scenarious: not existing layer assert packet_pypacker.get_lfield('dot', 'src') is None # negative scenarious: not existing field assert packet_pypacker.get_lfield('IP', 'vlan') is None
[docs] @pytest.mark.skip("Pypacker does not support") def test_rechecksum(self, tg): """Verify that method rechecksum returns correct value. """ packet = ({"Dot1Q": {"vlan": 1}}, {"Dot1Q": {"vlan": 2}}, {"IP": {"src": "20.0.10.2", "dst": "10.10.10.1", "flags": 0}}, {"TCP": {"flags": 1}}) packet_pypacker = tg._build_pypacker_packet(packet) ip_chksum = packet_pypacker.get_lfield('IP', 'chksum') tcp_chksum = packet_pypacker.get_lfield('TCP', 'chksum') assert packet_pypacker.rechecksum('IP') == ip_chksum assert packet_pypacker.rechecksum('TCP') == tcp_chksum # negative scenarious: not existing layer assert packet_pypacker.rechecksum('dot') is None