# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``test_tg.py``
`Traffic generator's unittests`
"""
import time
import copy
import random
import itertools
from collections import namedtuple
import pytest
from testlib.custom_exceptions import PypackerException
from .packet_constants import (PACKET_DEFINITION, PACKET_DEFS,
ARP, DOT1Q, IP6, QINQ, STP, RSTP,
MSTP, MSTI_BPDU, LLDP, PAUSE, PFC,
ETH_IP_ICMP, ETH_IP_UDP, ETH_IP_TCP,
ETH_IP_IGMP, DOT1Q_PRIO_1, DOT1Q_PRIO_2,
DOT1Q_IP_UDP, DOT1Q_IP_ICMP, DOT1Q_IP_TCP,
DOT1Q_ARP, DOT1Q_IP6, ETHER_TYPE_EFC,
ETHER_TYPE_PBRIDGE, ETHER_TYPE_IP,
ETHER_TYPE_8021Q, ETHER_TYPE_TUNNELING,
SRC_MAC, DST_MAC, BROADCAT_MAC, IP_SRC, IP_DST,
DOT1Q_DEFAULT_CFI, VLAN_1, PAUSE_CODE,
PFC_CODE, PAUSE_TIME, PFC_LS, PFC_TIME,
PFC_MS, IP_PROTO_IP, LLDP_DCBX, ETHER_TYPE_LLDP,
LLDP_DST_MAC, LLDP_DCBX_APP_PRIO, LLDP_CHASSIS_ID_TLV,
LLDP_PORT_ID_TLV, LLDP_TTL_TLV, LLDP_END_TLV,
LLDP_CHASSIS_ID_TLV_TYPE, LLDP_MAC_SUBTYPE,
LLDP_PORT_ID_TLV_TYPE, LLDP_PORT_ID_INTERFACE_SUBTYPE,
LLDP_PORT_ID_INTERFACE_VALUE, LLDP_TTL_TLV_TYPE,
LLDP_TTL_SECONDS, LLDP_SYS_CAPAB_TLV, LLDP_MAN_ADDR_TLV,
LACP_DST_MAC, LACP)
[docs]@pytest.mark.unittests
class TestTGs(object):
[docs] def verify_packets_data(self, initial_packet_def, received_packet_def):
""" Check 2 packet definitions """
initial_packet_layers = [layer for p in initial_packet_def for layer in p]
received_packet_layers = [layer for p in received_packet_def for layer in p]
assert initial_packet_layers == received_packet_layers, \
"Sent packet layers {0} do not match with received {1}".format(initial_packet_layers, received_packet_layers)
layer_param = namedtuple("layer_param", ("layer", "field", "value"))
initial_packet_params = (layer_param(layer, field, value) for p in initial_packet_def for layer in p
for (field, value) in p[layer].items())
received_packet_params = [layer_param(layer, field, value) for p in received_packet_def for layer in p
for (field, value) in p[layer].items()]
for init_param in initial_packet_params:
if not isinstance(init_param.value, list):
assert init_param in received_packet_params, \
"Field '{packet.field}' with value {packet.value} from " \
"layer '{packet.layer}' is not found in received packet".format(packet=init_param)
continue
for received_param in received_packet_params:
if init_param.layer == received_param.layer and init_param.field == received_param.field:
for initial_value, received_value in itertools.zip_longest(init_param.value, received_param.value):
if not isinstance(initial_value, dict):
assert initial_value == received_value
elif len(initial_value.values()) > 1:
assert initial_value == received_value
else:
self.verify_packets_data((initial_value,), (received_value,))
[docs] def test_stream(self, tg):
"""Verify that send stream send exact packets count.
"""
iface = tg.ports[0]
packet_count = 100
src_mac = PACKET_DEFINITION[0]["Ethernet"]["src"]
stream_id = tg.set_stream(PACKET_DEFINITION, count=packet_count,
iface=iface, adjust_size=True, required_size=1450)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_single_packet(self, tg):
"""Single packet.
"""
time_stamp = time.time()
stream_id = tg.set_stream(PACKET_DEFINITION, count=1, iface=tg.ports[0])
print("Stream set time %2.6fs." % (time.time() - time_stamp))
time_stamp = time.time()
tg.send_stream(stream_id)
print("Packet send time %2.6fs." % (time.time() - time_stamp))
[docs] def test_single_stream(self, tg):
"""Single stream.
"""
stream_id = tg.set_stream(PACKET_DEFINITION, count=5, inter=1, iface=tg.ports[0], adjust_size=True)
time_stamp = time.time()
tg.start_streams([stream_id])
print("Time to start stream %2.6fs." % (time.time() - time_stamp))
tg.stop_streams([stream_id])
[docs] def test_multistreams_and_multifaces(self, tg):
"""Multiple streams and multiple ifaces.
"""
stream_list = []
for packet_definition, port in zip(PACKET_DEFS, tg.ports):
stream_id = tg.set_stream(packet_definition, count=25, inter=0.5, iface=port, adjust_size=True)
stream_list.append(stream_id)
time_stamp = time.time()
tg.start_streams(stream_list)
print("Time to start stream %2.6fs." % (time.time() - time_stamp))
tg.stop_streams(stream_list)
[docs] def test_multistreams_on_single_iface(self, tg):
"""Multiple streams and one iface.
"""
stream_list = []
for packet_definition in PACKET_DEFS:
stream_id = tg.set_stream(packet_definition, count=25, inter=0.5, iface=tg.ports[0], adjust_size=True)
stream_list.append(stream_id)
time_stamp = time.time()
tg.start_streams(stream_list)
print("Time to start stream %2.6fs." % (time.time() - time_stamp))
tg.stop_streams(stream_list)
[docs] def test_multistreams_and_one(self, tg):
"""Multiple streams and one on same iface.
"""
stream_list = []
for packet_definition in PACKET_DEFS[:2]:
stream_id = tg.set_stream(packet_definition, count=3, inter=2, iface=tg.ports[0], adjust_size=True)
stream_list.append(stream_id)
tg.start_streams(stream_list)
tg.stop_streams(stream_list)
stream_id = tg.set_stream(PACKET_DEFS[2], count=2, inter=1, iface=tg.ports[0], adjust_size=True)
tg.send_stream(stream_id)
[docs] def test_exact_packets_delivery(self, tg):
"""Verify that send stream send exact packets count.
"""
iface = tg.ports[0]
packet_count = 1000
src_mac = PACKET_DEFINITION[0]["Ethernet"]["src"]
stream_id = tg.set_stream(PACKET_DEFINITION, count=packet_count, iface=iface,
adjust_size=True, required_size=200, inter=0.005)
tg.start_sniff([iface], sniffing_time=10, filter_layer="IP", src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_start_stop_parallel_and_independent_set_quantity_streams(self, tg):
"""Verify parallel and independent set quantity of streams.
"""
iface = tg.ports[0]
packet_count = 11
stream_id_1 = tg.set_stream(PACKET_DEFS[0], count=packet_count - 1, iface=iface)
stream_id_2 = tg.set_stream(PACKET_DEFS[1], count=packet_count - 10, iface=iface)
tg.start_sniff([iface], sniffing_time=3, filter_layer="IP", dst_filter=BROADCAT_MAC)
tg.send_stream(stream_id_1)
tg.send_stream(stream_id_2)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_start_stop_parallel_and_independent_continuous_streams(self, tg):
"""Verify parallel and independent streams starts and stops.
"""
iface = tg.ports[0]
# Packet count per stream equals 1 by default
expected_count = 2
stream_id_1 = tg.set_stream(PACKET_DEFS[0], iface=iface)
stream_id_2 = tg.set_stream(PACKET_DEFS[1], iface=iface)
tg.start_sniff([iface], sniffing_time=3, filter_layer="IP", dst_filter=BROADCAT_MAC)
tg.start_streams([stream_id_1])
tg.start_streams([stream_id_2])
tg.stop_streams([stream_id_1])
tg.stop_streams([stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == expected_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), expected_count)
[docs] def test_streams_corruption_1(self, tg):
"""Verify that set_stream does not corrupt already started streams.
"""
iface = tg.ports[0]
packet_count = 10
stream_id_1 = tg.set_stream(PACKET_DEFS[0], count=packet_count, inter=0.1, iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", dst_filter=BROADCAT_MAC)
tg.start_streams([stream_id_1])
tg.set_stream(PACKET_DEFS[1], count=1, iface=iface)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
for packet in packets:
assert tg.get_packet_field(packet, "Ethernet", "src") != PACKET_DEFS[1][0]['Ethernet']['src']
[docs] def test_streams_corruption_2(self, tg):
"""Verify that set_stream does not corrupt already started streams.
"""
iface = tg.ports[0]
packet_count = 10
stream_id_1 = tg.set_stream(PACKET_DEFS[0], count=packet_count, inter=1, iface=iface)
tg.start_sniff([iface], sniffing_time=10, dst_filter=BROADCAT_MAC)
tg.start_streams([stream_id_1])
stream_id_2 = tg.set_stream(PACKET_DEFS[1], count=1, iface=iface)
tg.start_streams([stream_id_2])
data = tg.stop_sniff([iface])
tg.stop_streams([stream_id_1, stream_id_2])
packets = data.get(iface, [])
assert len(packets) >= packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_stop_all_streams(self, tg):
"""Verify that stop_streams stop all streams by default.
"""
iface = tg.ports[0]
stream_id_1 = tg.set_stream(PACKET_DEFS[0], count=10, inter=1, iface=iface)
stream_id_2 = tg.set_stream(PACKET_DEFS[1], count=10, inter=1, iface=iface)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
tg.start_sniff([iface], sniffing_time=5, dst_filter=BROADCAT_MAC)
data = tg.stop_sniff([iface])
assert data[iface] == []
[docs] def test_arp_sniff_pattern(self, tg):
"""Verify ARP sniff pattern.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(PACKET_DEFINITION, count=1, iface=iface)
stream_id_2 = tg.set_stream(ARP, count=packet_count, iface=iface)
stream_id_3 = tg.set_stream(DOT1Q_ARP, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ARP")
tg.start_streams([stream_id_1, stream_id_2, stream_id_3])
tg.stop_streams([stream_id_1, stream_id_2, stream_id_3])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_sniffing_negative(self, tg):
"""Sniff for one packet, but sniff nothing.
"""
iface = tg.ports[0]
stream_id = tg.set_stream(DOT1Q_ARP, count=5, inter=0.02, iface=iface)
tg.start_sniff([iface], sniffing_time=3, packets_count=1, filter_layer="ARP")
tg.start_streams([stream_id])
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
assert data[iface] == []
[docs] def test_qinq_packets_sniffer(self, tg):
"""Check QinQ packet send.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(QINQ, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=3)
tg.start_streams([stream_id_1])
tg.stop_streams([stream_id_1])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(QINQ, received)
[docs] def test_check_statistics(self, tg):
"""Send 100 packets and check statistics.
"""
iface = tg.ports[0]
src_mac = PACKET_DEFINITION[0]["Ethernet"]["src"]
stream_id_1 = tg.set_stream(PACKET_DEFINITION, count=100, iface=iface)
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=src_mac)
tg.send_stream(stream_id_1)
tg.stop_sniff([iface])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == 100
assert end_sent_statistics == 100
[docs] def test_incremented_streams(self, tg):
"""Send incremented streams.
"""
iface = tg.ports[0]
packet1 = ({"Ethernet": {'src': "00:00:00:00:00:04", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet2 = ({"Ethernet": {'src': "00:00:00:00:00:05", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet3 = ({"Ethernet": {'src': "00:00:00:00:00:06", 'dst': "00:00:00:00:00:02"}},)
packet4 = ({"Ethernet": {'src': "00:00:00:00:00:07", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet5 = ({"Ethernet": {'src': "00:00:00:00:00:08", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet6 = ({"Ethernet": {'src': "00:00:00:00:00:09", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet7 = ({"Ethernet": {'src': "00:00:00:00:00:0a", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet8 = ({"Ethernet": {'src': "00:00:00:00:00:0b", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
packet9 = ({"Ethernet": {'src': "00:00:00:00:00:0c", 'dst': "00:00:00:00:00:02"}}, {"IP": {}})
stream1 = tg.set_stream(packet1, count=20, sa_increment=(2, 20), iface=iface)
stream2 = tg.set_stream(packet2, count=15, da_increment=(3, 15), iface=iface)
stream3 = tg.set_stream(packet3, count=300, sa_increment=(2, 20), da_increment=(3, 15), iface=iface)
stream4 = tg.set_stream(packet4, count=1, sa_increment=(2, 20), continuous=True, iface=iface)
stream5 = tg.set_stream(packet5, count=1, da_increment=(2, 20), continuous=True, iface=iface)
stream6 = tg.set_stream(packet6, count=20, sa_increment=(-2, 20), iface=iface)
stream7 = tg.set_stream(packet7, count=15, da_increment=(-3, 15), iface=iface)
stream8 = tg.set_stream(packet8, count=1, sa_increment=(-2, 20), continuous=True, iface=iface)
stream9 = tg.set_stream(packet9, count=1, da_increment=(-3, 15), continuous=True, iface=iface)
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5, dst_filter="00:00:00:00:00:02")
tg.start_streams([stream1])
tg.stop_sniff([iface])
tg.stop_streams([stream1])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == 20
assert end_sent_statistics == 20
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:00:00:00:05")
tg.start_streams([stream2])
tg.stop_sniff([iface])
tg.stop_streams([stream2])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == 15
assert end_sent_statistics == 15
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5, filter_layer="notIP")
tg.start_streams([stream3])
tg.stop_sniff([iface])
tg.stop_streams([stream3])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics >= 300
assert end_sent_statistics == 300
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5)
tg.start_streams([stream4])
middle_receive_statistics = tg.get_received_frames_count(iface)
middle_sent_statistics = tg.get_sent_frames_count(iface)
assert middle_receive_statistics > 0
assert middle_sent_statistics > 0
tg.stop_sniff([iface])
tg.stop_streams([stream4])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics > middle_receive_statistics
assert end_sent_statistics > middle_sent_statistics
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5)
tg.start_streams([stream5])
middle_receive_statistics = tg.get_received_frames_count(iface)
middle_sent_statistics = tg.get_sent_frames_count(iface)
assert middle_receive_statistics > 0
assert middle_sent_statistics > 0
tg.stop_sniff([iface])
tg.stop_streams([stream5])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics > middle_receive_statistics
assert end_sent_statistics > middle_sent_statistics
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5, dst_filter="00:00:00:00:00:02")
tg.start_streams([stream6])
tg.stop_sniff([iface])
tg.stop_streams([stream6])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == 20
assert end_sent_statistics == 20
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:00:00:00:0a")
tg.start_streams([stream7])
tg.stop_sniff([iface])
tg.stop_streams([stream7])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == 15
assert end_sent_statistics == 15
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5)
tg.start_streams([stream8])
middle_receive_statistics = tg.get_received_frames_count(iface)
middle_sent_statistics = tg.get_sent_frames_count(iface)
assert middle_receive_statistics > 0
assert middle_sent_statistics > 0
tg.stop_sniff([iface])
tg.stop_streams([stream8])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics > middle_receive_statistics
assert end_sent_statistics > middle_sent_statistics
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5)
tg.start_streams([stream9])
time.sleep(1)
middle_receive_statistics = tg.get_received_frames_count(iface)
middle_sent_statistics = tg.get_sent_frames_count(iface)
assert middle_receive_statistics > 0
assert middle_sent_statistics > 0
tg.stop_sniff([iface])
tg.stop_streams([stream9])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics > middle_receive_statistics
assert end_sent_statistics > middle_sent_statistics
[docs] def test_packet_fragmentation(self, tg):
"""Check packet fragmentation.
"""
ix_iface = tg.ports[0]
packet_count = 2
stream_id = tg.set_stream(ETH_IP_ICMP, count=1, iface=ix_iface, required_size=200, fragsize=110)
tg.start_sniff([ix_iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([ix_iface])
packets = data.get(ix_iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_sa_incrementation_1(self, tg):
"""Check SA incrementation. Count == Increment count.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, sa_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", dst_filter=DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src macs
src_mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
assert len(src_mac_set) == packet_count
[docs] def test_sa_incrementation_2(self, tg):
"""Check SA incrementation. Count > Increment count.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, sa_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, packets_count=10, filter_layer="ICMP", dst_filter=DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src macs
src_mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
assert len(src_mac_set) == packet_count // 2
[docs] def test_da_incrementation_1(self, tg):
"""Check DA incrementation. Count == Increment count.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, da_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different dst macs
dst_mac_set = set(tg.get_packet_field(packet, "Ethernet", "dst") for packet in packets)
assert len(dst_mac_set) == packet_count
[docs] def test_da_incrementation_2(self, tg):
"""Check DA incrementation. Count > Increment count.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, da_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different dst macs
dst_mac_set = set(tg.get_packet_field(packet, "Ethernet", "dst") for packet in packets)
assert len(dst_mac_set) == packet_count // 2
[docs] def test_sa_incrementation_and_packet_fragmentation(self, tg):
"""Check SA incrementation + packet fragmentation. Count == Increment count.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_ICMP, iface=iface,
count=packet_count, sa_increment=(1, 5),
required_size=200, fragsize=110)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", dst_filter=DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that sniffed count == (count of packets) * (number of fragments per packet)
assert len(packets) == packet_count * 2, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count * 2)
# Verify that all packets with different src macs
src_mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
assert len(src_mac_set) == 5
[docs] def test_packet_random_size_1(self, tg):
"""Check packet random size setting. Count=1.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(ETH_IP_ICMP, iface=iface,
count=packet_count, required_size=("Random", 100, 1500))
tg.start_sniff([iface], sniffing_time=3, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that length of packet is random value between 100 and 1500 bytes
packet_length = len(packets[0])
assert 100 <= packet_length <= 1500
[docs] def test_packet_random_size_2(self, tg):
"""Check packet random size setting. Count=5.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_ICMP, iface=iface,
count=packet_count, required_size=("Random", 1000, 1500))
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
size_set = set([len(packet) for packet in packets])
# Verify that length of packet is random value between 1000 and 1500 bytes
assert all([1000 <= size <= 1500 for size in size_set])
[docs] def test_packet_size_incrementing_1(self, tg):
"""Check packet size incrementing. Count=1, increment count=5.
"""
iface = tg.ports[0]
packet_count = 1
start_size = 70
stream_id = tg.set_stream(ETH_IP_ICMP, iface=iface, count=packet_count,
required_size=("Increment", 2, start_size, 78))
tg.start_sniff([iface], sniffing_time=3, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
assert len(packets[0]) == start_size
[docs] def test_packet_size_incrementing_2(self, tg):
"""Check packet size incrementing. Count=5, increment count=5.
"""
iface = tg.ports[0]
packet_count = 5
strat_size = 70
step = 2
end_size = 78
expected_size_set = set(range(strat_size, end_size + 1, step))
stream_id = tg.set_stream(ETH_IP_ICMP, iface=iface, count=packet_count,
required_size=("Increment", step, strat_size, end_size))
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different size
size_set = set([len(packet) for packet in packets])
assert len(size_set) == packet_count
# Verify that length of packet is from (70,72,74,76,78)
assert size_set == expected_size_set
[docs] def test_packet_size_decrementing(self, tg):
"""Check packet size decrementing. Count=9, decrement count=9.
"""
iface = tg.ports[0]
packet_count = 5
start_size = 70
step = -1
end_size = 78
# Ixia starts count from max frame size when step is negative
expected_size_set = sorted(range(end_size, start_size - 1, step))[packet_count - 1:]
stream_id = tg.set_stream(ETH_IP_ICMP, iface=iface, count=packet_count,
required_size=("Increment", step, start_size, end_size))
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different size
size_set = set([len(packet) for packet in packets])
assert len(size_set) == packet_count
# Verify that length of packet is from (70,71,72,73,74)
assert sorted(size_set) == expected_size_set
[docs] def test_src_ip_incrementation_dot1q_disabled_1(self, tg):
"""Check source_ip incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, sip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(src_ip_set) == packet_count
[docs] def test_src_ip_incrementation_dot1q_disabled_2(self, tg):
"""Check source_ip incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, sip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(src_ip_set) == packet_count // 2
[docs] def test_src_ip_incrementation_dot1q_enabled_1(self, tg):
"""Check source_ip incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, sip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(src_ip_set) == packet_count
[docs] def test_src_ip_incrementation_dot1q_enabled_2(self, tg):
"""Check source_ip incrementation. Count = 2*Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, sip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(src_ip_set) == packet_count
[docs] def test_dst_ip_incrementation_dot1q_disabled_1(self, tg):
"""Check destination_ip incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, dip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different dst ip
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
assert len(dst_ip_set) == packet_count
[docs] def test_dst_ip_incrementation_dot1q_disabled_2(self, tg):
"""Check destination_ip incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, dip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different dst ip
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
assert len(dst_ip_set) == packet_count // 2
[docs] def test_dst_ip_incrementation_dot1q_enabled_1(self, tg):
"""Check destination_ip incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, dip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different dst ip
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
assert len(dst_ip_set) == packet_count
[docs] def test_dst_ip_incrementation_dot1q_enabled_2(self, tg):
"""Check destination_ip incrementation. Count = 2*Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, dip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.ICMP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different dst ip
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
assert len(dst_ip_set) == packet_count // 2
[docs] def test_clear_and_check_statistics(self, tg):
"""Send 100 packets, clear and check statistics.
"""
iface = tg.ports[0]
packet_count = 100
src_mac = PACKET_DEFINITION[0]["Ethernet"]["src"]
tg.clear_statistics([iface])
stream_id_1 = tg.set_stream(PACKET_DEFINITION, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=7, filter_layer="IP", src_filter=src_mac)
tg.start_streams([stream_id_1])
tg.stop_sniff([iface])
tg.stop_streams([stream_id_1])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == packet_count
assert end_sent_statistics == packet_count
tg.clear_statistics([iface])
end_receive_statistics = tg.get_received_frames_count(iface)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_receive_statistics == 0
assert end_sent_statistics == 0
[docs] def test_arp_incrementation_dot1q_disabled_1(self, tg):
"""Check arp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ARP, count=packet_count, arp_sa_increment=(3, 5), arp_sip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ARP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip, src mac and hwsrc
src_ip_set = set(tg.get_packet_field(packet, "ARP", "spa") for packet in packets)
src_mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
hwsrc_set = set(tg.get_packet_field(packet, "ARP", "sha") for packet in packets)
assert len(src_ip_set) == packet_count
assert len(src_mac_set) == packet_count
assert len(hwsrc_set) == packet_count
[docs] def test_arp_incrementation_dot1q_enabled(self, tg):
"""Check arp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_ARP, count=packet_count, arp_sa_increment=(3, 5),
arp_sip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.ARP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip, src mac and hwsrc
src_ip_set = set(tg.get_packet_field(packet, "ARP", "spa") for packet in packets)
src_mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
hwsrc_set = set(tg.get_packet_field(packet, "ARP", "sha") for packet in packets)
assert len(src_ip_set) == packet_count
assert len(src_mac_set) == packet_count
assert len(hwsrc_set) == packet_count
[docs] def test_arp_incrementation_dot1q_disabled_2(self, tg):
"""Check arp incrementation. Count == 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ARP, count=packet_count, arp_sa_increment=(3, 5),
arp_sip_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ARP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip, src mac and hwsrc
src_ip_set = set(tg.get_packet_field(packet, "ARP", "spa") for packet in packets)
src_mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
hwsrc_set = set(tg.get_packet_field(packet, "ARP", "sha") for packet in packets)
assert len(src_ip_set) == packet_count // 2
assert len(src_mac_set) == packet_count // 2
assert len(hwsrc_set) == packet_count // 2
[docs] def test_vlan_incrementation_increment_count_1(self, tg):
"""Check vlan incrementation. Count == Increment count.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_TCP, count=packet_count, vlan_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip, src mac and hwsrc
vlan_set = set(tg.get_packet_field(packet, "S-Dot1Q", "vid") for packet in packets)
assert len(vlan_set) == packet_count
[docs] def test_vlan_incrementation_increment_count_2(self, tg):
"""Check vlan incrementation. Count == 2*Increment count.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(DOT1Q_IP_TCP, count=packet_count, vlan_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src ip, src mac and hwsrc
vlan_set = set(tg.get_packet_field(packet, "S-Dot1Q", "vid") for packet in packets)
assert len(vlan_set) == packet_count // 2
[docs] def test_da_incrementation_continuous_traffic(self, tg):
"""Check DA incrementation. Continuous traffic.
"""
iface = tg.ports[0]
start_mac_val = 1
end_mac_val = 5
min_packet_count = len(range(start_mac_val, end_mac_val + 1))
stream_id = tg.set_stream(ETH_IP_ICMP, continuous=True,
da_increment=(start_mac_val, end_mac_val), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="ICMP", src_filter=SRC_MAC)
tg.start_streams([stream_id])
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) >= min_packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), min_packet_count)
# Verify that all packets with 5 different dst macs
dst_mac_set = set(tg.get_packet_field(packet, "Ethernet", "dst") for packet in packets)
assert len(dst_mac_set) == min_packet_count
[docs] def test_sniffed_packets_timestamp(self, tg):
"""Check sniffed packets timestamp.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, inter=0.5, iface=iface)
tg.start_sniff([iface], sniffing_time=10, packets_count=10, filter_layer="IP", src_filter=SRC_MAC)
tg.start_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with 10 different timestamps
time_set = set([packet.time for packet in packets])
assert len(time_set) == packet_count
[docs] def test_srcmac_filter(self, tg):
"""Check srcMac filter.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sa_increment=(1, 2), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.start_streams([stream_id])
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count // 2, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with specified srcMac are sniffed
src_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
assert len(src_set) == 1
[docs] def test_dstmac_filter(self, tg):
"""Check dstMac filter.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, da_increment=(1, 2), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", dst_filter=DST_MAC)
tg.start_streams([stream_id])
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count // 2, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with specified dstMac are sniffed
dst_set = set(tg.get_packet_field(packet, "Ethernet", "dst") for packet in packets)
assert len(dst_set) == 1
[docs] def test_srcmac_and_dstmac_filter(self, tg):
"""Check srcMac and dstMac filter.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sa_increment=(1, 2), da_increment=(1, 2), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC, dst_filter=DST_MAC)
tg.start_streams([stream_id])
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count // 2, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with specified dstMac and srcMac are sniffed
dst_set = set(tg.get_packet_field(packet, "Ethernet", "dst") for packet in packets)
src_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
assert len(dst_set) == 1
assert len(src_set) == 1
[docs] def test_srcmac_and_dstmac_wrong_layer_filter(self, tg):
"""Check srcMac and dstMac filter with wrong filter_layer.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sa_increment=(1, 2), da_increment=(1, 2), iface=iface)
tg.start_sniff([iface], sniffing_time=7, packets_count=packet_count, filter_layer="ARP",
src_filter=SRC_MAC, dst_filter=DST_MAC)
tg.start_streams([stream_id])
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert packets == [], \
"Captured packets count {0} does not match expected {1}".format(len(packets), 0)
[docs] def test_lldp_incrementation_increment_count_1(self, tg):
"""Check lldp incrementation. Count == Increment count.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(LLDP, count=packet_count, sa_increment=(1, 5), lldp_sa_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=3, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that LLDP Chassis macs are different
mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
lldp_set = set(tg.get_packet_field(packet, "LLDPChassisId", "value") for packet in packets)
assert len(mac_set) == packet_count
assert len(lldp_set) == packet_count
[docs] def test_lldp_incrementation_increment_count_2(self, tg):
"""Check lldp incrementation. Count == 2*Increment count.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(LLDP, count=packet_count, sa_increment=(1, 5), lldp_sa_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=3, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that LLDP Chassis macs are different
mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
lldp_set = set(tg.get_packet_field(packet, "LLDPChassisId", "value") for packet in packets)
assert len(mac_set) == packet_count // 2
assert len(lldp_set) == packet_count // 2
[docs] def test_lldp_incrementation_continuous_traffic_1(self, tg):
"""Check lldp incrementation. Continuous traffic.
"""
iface = tg.ports[0]
packet_count = 20
stream_id = tg.set_stream(LLDP, continuous=True, sa_increment=(1, 5), lldp_sa_increment=(1, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, packets_count=packet_count, filter_layer="LLDP", dst_filter=LLDP_DST_MAC)
tg.start_streams([stream_id])
time.sleep(5)
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) >= packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that LLDP Chassis macs are different
mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
lldp_set = set(tg.get_packet_field(packet, "LLDPChassisId", "value") for packet in packets)
assert len(mac_set) == packet_count // 4
assert len(lldp_set) == packet_count // 4
[docs] def test_lldp_incrementation_continuous_traffic_2(self, tg):
"""Check lldp incrementation. Continuous traffic.
"""
iface = tg.ports[0]
packet_count = 20
stream_id = tg.set_stream(LLDP, continuous=True, sa_increment=(1, 0), lldp_sa_increment=(1, 0), iface=iface)
tg.start_sniff([iface], sniffing_time=5, packets_count=packet_count, dst_filter=LLDP_DST_MAC)
tg.start_streams([stream_id])
time.sleep(5)
tg.stop_streams([stream_id])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) >= packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that LLDP Chassis macs are different
mac_set = set(tg.get_packet_field(packet, "Ethernet", "src") for packet in packets)
lldp_set = set(tg.get_packet_field(packet, "LLDPChassisId", "value") for packet in packets)
assert len(mac_set) >= packet_count
assert len(lldp_set) >= packet_count
[docs] def test_src_udp_incrementation_dot1q_disabled_1(self, tg):
"""Check source_udp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="UDP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different src udp
src_udp_set = set(tg.get_packet_field(packet, "UDP", "sport") for packet in packets)
assert len(src_udp_set) == packet_count
[docs] def test_src_tcp_incrementation_dot1q_disabled_1(self, tg):
"""Check source_tcp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_TCP, count=packet_count, stcp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="TCP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src tcp
src_tcp_set = set(tg.get_packet_field(packet, "TCP", "sport") for packet in packets)
assert len(src_tcp_set) == packet_count
[docs] def test_src_udp_incrementation_dot1q_disabled_2(self, tg):
"""Check source_udp incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="UDP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different src udp
src_udp_set = set(tg.get_packet_field(packet, "UDP", "sport") for packet in packets)
assert len(src_udp_set) == packet_count // 2
[docs] def test_src_tcp_incrementation_dot1q_disabled_2(self, tg):
"""Check source_tcp incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_TCP, count=packet_count, stcp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="TCP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different src udp
src_tcp_set = set(tg.get_packet_field(packet, "TCP", "sport") for packet in packets)
assert len(src_tcp_set) == packet_count // 2
[docs] def test_src_udp_incrementation_dot1q_enabled(self, tg):
"""Check source_udp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, sudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different src udp
src_udp_set = set(tg.get_packet_field(packet, "UDP", "sport") for packet in packets)
assert len(src_udp_set) == packet_count
[docs] def test_src_tcp_incrementation_dot1q_enabled(self, tg):
"""Check source_tcp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_TCP, count=packet_count, stcp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different src tcp
src_tcp_set = set(tg.get_packet_field(packet, "TCP", "sport") for packet in packets)
assert len(src_tcp_set) == packet_count
[docs] def test_dst_udp_incrementation_dot1q_disabled_1(self, tg):
"""Check destination_udp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, dudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="UDP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst udp
dst_udp_set = set(tg.get_packet_field(packet, "UDP", "dport") for packet in packets)
assert len(dst_udp_set) == packet_count
[docs] def test_dst_udp_incrementation_dot1q_disabled_2(self, tg):
"""Check destination_udp incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, dudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="UDP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst udp
dst_udp_set = set(tg.get_packet_field(packet, "UDP", "dport") for packet in packets)
assert len(dst_udp_set) == packet_count // 2
[docs] def test_dst_udp_incrementation_dot1q_enabled(self, tg):
"""Check destination_udp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, dudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst udp
dst_udp_set = set(tg.get_packet_field(packet, "UDP", "dport") for packet in packets)
assert len(dst_udp_set) == packet_count
[docs] def test_src_udp_and_dst_udp_incrementation_dot1q_disabled_1(self, tg):
"""Check source_udp and destination_udp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sudp_increment=(3, 5), dudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="UDP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst and src udp
dst_udp_set = set(tg.get_packet_field(packet, "UDP", "dport") for packet in packets)
src_udp_set = set(tg.get_packet_field(packet, "UDP", "sport") for packet in packets)
assert len(dst_udp_set) == packet_count
assert len(src_udp_set) == packet_count
[docs] def test_src_tcp_and_dst_tcp_incrementation_dot1q_disabled_1(self, tg):
"""Check source_tcp and destination_tcp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_TCP, count=packet_count, stcp_increment=(3, 5),
dtcp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="TCP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst and src tcp
dst_tcp_set = set(tg.get_packet_field(packet, "TCP", "dport") for packet in packets)
src_tcp_set = set(tg.get_packet_field(packet, "TCP", "sport") for packet in packets)
assert len(dst_tcp_set) == packet_count
assert len(src_tcp_set) == packet_count
[docs] def test_src_udp_and_dst_udp_incrementation_dot1q_disabled_2(self, tg):
"""Check source_udp and destination_udp incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sudp_increment=(3, 5), dudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="UDP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst and src udp
dst_udp_set = set(tg.get_packet_field(packet, "UDP", "dport") for packet in packets)
src_udp_set = set(tg.get_packet_field(packet, "UDP", "sport") for packet in packets)
assert len(dst_udp_set) == packet_count // 2
assert len(src_udp_set) == packet_count // 2
[docs] def test_src_tcp_and_dst_tcp_incrementation_dot1q_disabled_2(self, tg):
"""Check source_tcp and destination_tcp incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_TCP, count=packet_count, stcp_increment=(3, 5), dtcp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="TCP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst and src tcp
dst_tcp_set = set(tg.get_packet_field(packet, "TCP", "dport") for packet in packets)
src_tcp_set = set(tg.get_packet_field(packet, "TCP", "sport") for packet in packets)
assert len(dst_tcp_set) == packet_count // 2
assert len(src_tcp_set) == packet_count // 2
[docs] def test_src_udp_and_dst_udp_incrementation_dot1q_enabled(self, tg):
"""Check source_udp and destination_udp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, sudp_increment=(3, 5), dudp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst and src udp
dst_udp_set = set(tg.get_packet_field(packet, "UDP", "dport") for packet in packets)
src_udp_set = set(tg.get_packet_field(packet, "UDP", "sport") for packet in packets)
assert len(dst_udp_set) == packet_count
assert len(src_udp_set) == packet_count
[docs] def test_src_tcp_and_dst_tcp_incrementation_dot1q_enabled(self, tg):
"""Check source_tcp and destination_tcp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_TCP, count=packet_count, stcp_increment=(3, 5), dtcp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different dst and src tcp
dst_tcp_set = set(tg.get_packet_field(packet, "TCP", "dport") for packet in packets)
src_tcp_set = set(tg.get_packet_field(packet, "TCP", "sport") for packet in packets)
assert len(dst_tcp_set) == packet_count
assert len(src_tcp_set) == packet_count
[docs] def test_ip_protocol_incrementation_dot1q_disabled(self, tg):
"""Check ip protocol incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, protocol_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip proto
proto_ip_set = set(tg.get_packet_field(packet, "IP", "p") for packet in packets)
assert len(proto_ip_set) == packet_count
[docs] def test_ip_protocol_incrementation_dot1q_disabled_2(self, tg):
"""Check ip protocol incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, protocol_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip proto
proto_ip_set = set(tg.get_packet_field(packet, "IP", "p") for packet in packets)
assert len(proto_ip_set) == packet_count // 2
[docs] def test_ip_protocol_incrementation_dot1q_enabled(self, tg):
"""Check destination_udp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, protocol_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip proto
proto_ip_set = set(tg.get_packet_field(packet, "IP", "p") for packet in packets)
assert len(proto_ip_set) == packet_count
[docs] def test_ip_protocol_and_sip_increment_dot1q_disabled(self, tg):
"""Check ip protocol and sip_increment incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sip_increment=(3, 5), protocol_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different ip proto and src ip.
proto_ip_set = set(tg.get_packet_field(packet, "IP", "p") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(proto_ip_set) == packet_count // 2
assert len(src_ip_set) == packet_count // 2
[docs] def test_ip_protocol_and_sip_increment_dot1q_enabled(self, tg):
"""Check ip protocol and sip_increment incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, sip_increment=(3, 5), protocol_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different ip proto and src ip.
proto_ip_set = set(tg.get_packet_field(packet, "IP", "p") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(proto_ip_set) == packet_count
assert len(src_ip_set) == packet_count
[docs] def test_ether_incrementation_dot1q_disabled_1(self, tg):
"""Check ether type incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, eth_type_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ether type
eth_type_set = set(tg.get_packet_field(packet, "Ethernet", "type") for packet in packets)
assert len(eth_type_set) == packet_count
[docs] def test_ether_incrementation_dot1q_disabled_2(self, tg):
"""Check ip protocol incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, eth_type_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ether type
eth_type_set = set(tg.get_packet_field(packet, "Ethernet", "type") for packet in packets)
assert len(eth_type_set) == packet_count // 2
[docs] def test_dscp_incrementation_dot1q_disabled_1(self, tg):
"""Check dscp incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, dscp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip dscp
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
assert len(dscp_set) == packet_count
[docs] def test_dscp_incrementation_dot1q_disabled_2(self, tg):
"""Check dscp incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, dscp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip dscp
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
assert len(dscp_set) == packet_count // 2
[docs] def test_ip_dscp_incrementation_dot1q_enabled(self, tg):
"""Check ip dscp incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, dscp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip dscp
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
assert len(dscp_set) == packet_count
[docs] def test_ip_dscp_and_sip_increment_dot1q_disabled_1(self, tg):
"""Check ip dscp and sip_increment incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sip_increment=(3, 5), dscp_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip dscp and src
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
assert len(dscp_set) == packet_count
assert len(src_ip_set) == packet_count
[docs] def test_ip_dscp_and_sip_increment_dot1q_disabled_2(self, tg):
"""Check ip dscp and sip_increment incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sip_increment=(3, 15), dip_increment=(3, 10),
dscp_increment=(3, 5), protocol_increment=(3, 30), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only packets with different ip dscp, src, dst and proto
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
proto_ip_set = set(tg.get_packet_field(packet, "IP", "p") for packet in packets)
assert len(dscp_set) == packet_count
assert len(src_ip_set) == packet_count
assert len(dst_ip_set) == packet_count
assert len(proto_ip_set) == packet_count
[docs] def test_ip_dip_and_sip_increment_udf_dependant(self, tg):
"""Check ip dip and sip_increment incrementation. Dip increment dependant from sip increment.
"""
iface = tg.ports[0]
packet_count = 18
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sip_increment=(3, 3), dip_increment=(3, 3),
dscp_increment=(3, 3), iface=iface, udf_dependancies={'sip_increment': 'dip_increment'})
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only 9 different packets received
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
packet_set = set(packet.bin() for packet in packets)
assert len(dscp_set) == 3
assert len(src_ip_set) == 3
assert len(dst_ip_set) == 3
assert len(packet_set) == 9
[docs] def test_ip_dscp_dip_sip_increment_udf_dependant(self, tg):
"""Check ip dscp, dip and sip_increment incrementation. Dependant increments.
"""
iface = tg.ports[0]
packet_count = 54
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sip_increment=(3, 3),
dip_increment=(3, 3), dscp_increment=(3, 3), iface=iface,
udf_dependancies={'dip_increment': 'sip_increment', 'dscp_increment': 'dip_increment'})
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only 27 different packets received
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
packet_set = set(packet.bin() for packet in packets)
assert len(dscp_set) == 3
assert len(src_ip_set) == 3
assert len(dst_ip_set) == 3
assert len(packet_set) == 27
[docs] def test_ip_dscp_dip_sip_increment_udf_one_dependant(self, tg):
"""Check ip dscp, dip and sip_increment incrementation. Dependant increments form sip.
"""
iface = tg.ports[0]
packet_count = 54
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sip_increment=(3, 3),
dip_increment=(3, 3), dscp_increment=(3, 3), iface=iface,
udf_dependancies={'dip_increment': 'sip_increment', 'dscp_increment': 'sip_increment'})
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that only 9 different packets received
dscp_set = set(tg.get_packet_field(packet, "IP", "tos") for packet in packets)
src_ip_set = set(tg.get_packet_field(packet, "IP", "src") for packet in packets)
dst_ip_set = set(tg.get_packet_field(packet, "IP", "dst") for packet in packets)
packet_set = set(packet.bin() for packet in packets)
assert len(dscp_set) == 3
assert len(src_ip_set) == 3
assert len(dst_ip_set) == 3
assert len(packet_set) == 9
[docs] def test_src_ipv6_incrementation_dot1q_disabled_1(self, tg):
"""Check SRC IPv6 incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(IP6, count=packet_count, sipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 src address
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
assert len(sipv6_set) == packet_count
[docs] def test_src_ipv6_incrementation_dot1q_disabled_2(self, tg):
"""Check SRC IPv6 incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(IP6, count=packet_count, sipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 src address
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
assert len(sipv6_set) == packet_count // 2
[docs] def test_src_ipv6_incrementation_dot1q_enabled_1(self, tg):
"""Check SRC IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, sipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IPv6", src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 src address
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
assert len(sipv6_set) == packet_count
[docs] def test_src_ipv6_incrementation_dot1q_enabled_2(self, tg):
"""Check SRC IPv6 incrementation. Count > Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, sipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IPv6", src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 src address
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
assert len(sipv6_set) == packet_count // 2
[docs] def test_src_and_dst_ipv6_incrementation_dot1q_disabled(self, tg):
"""Check SRC and DST IPv6 incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(IP6, count=packet_count, sipv6_increment=(3, 5),
dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 src and dst address
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(sipv6_set) == packet_count
assert len(dipv6_set) == packet_count
[docs] def test_src_and_dst_ipv6_incrementation_dot1q_enabled(self, tg):
"""Check SRC and DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, sipv6_increment=(3, 5), dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IPv6", src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 src and dst address
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(sipv6_set) == packet_count
assert len(dipv6_set) == packet_count
[docs] def test_dst_ipv6_incrementation_dot1q_disabled_1(self, tg):
"""Check DST IPv6 incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(IP6, count=packet_count, dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 dst address
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(dipv6_set) == packet_count
[docs] def test_dst_ipv6_incrementation_dot1q_disabled_2(self, tg):
"""Check DST IPv6 incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(IP6, count=packet_count, dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 dst address
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(dipv6_set) == packet_count // 2
[docs] def test_dst_ipv6_incrementation_dot1q_enabled_1(self, tg):
"""Check DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IPv6", src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 dst address
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(dipv6_set) == packet_count
[docs] def test_dst_ipv6_incrementation_dot1q_enabled_2(self, tg):
"""Check DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.IPv6", src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 dst address
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(dipv6_set) == packet_count // 2
[docs] def test_flow_label_ipv6_incrementation_dot1q_disabled_1(self, tg):
"""Check Flow Label IPv6 incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(IP6, count=packet_count, fl_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 flow
fl_set = set(tg.get_packet_field(packet, "IP6", "flow") for packet in packets)
assert len(fl_set) == packet_count
[docs] def test_flow_label_ipv6_incrementation_dot1q_disabled_2(self, tg):
"""Check Flow Label incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(IP6, count=packet_count, fl_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 flow
fl_set = set(tg.get_packet_field(packet, "IP6", "flow") for packet in packets)
assert len(fl_set) == packet_count // 2
[docs] def test_flow_label_ipv6_incrementation_dot1q_enabled(self, tg):
"""Check Flow Label IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, fl_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 flow
fl_set = set(tg.get_packet_field(packet, "IP6", "flow") for packet in packets)
assert len(fl_set) == packet_count
[docs] def test_flow_label_src_ipv6_incrementation(self, tg):
"""Check Flow Label with SRC IPv6 incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(IP6, count=packet_count, fl_increment=(3, 5), sipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 flow and src address
fl_set = set(tg.get_packet_field(packet, "IP6", "flow") for packet in packets)
sipv6_set = set(tg.get_packet_field(packet, "IP6", "src") for packet in packets)
assert len(fl_set) == packet_count // 2
assert len(sipv6_set) == packet_count // 2
[docs] def test_flow_label_dst_ipv6_incrementation(self, tg):
"""Check Flow Label and DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, fl_increment=(3, 5), dipv6_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 flow and dst address
fl_set = set(tg.get_packet_field(packet, "IP6", "flow") for packet in packets)
dipv6_set = set(tg.get_packet_field(packet, "IP6", "dst") for packet in packets)
assert len(fl_set) == packet_count
assert len(dipv6_set) == packet_count
[docs] def test_traffic_class_ipv6_incrementation_dot1q_disabled(self, tg):
"""Check traffic class incrementation. Count == Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(IP6, count=packet_count, tc_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 traffic class
tc_set = set(tg.get_packet_field(packet, "IP6", "fc") for packet in packets)
assert len(tc_set) == packet_count
[docs] def test_traffic_class_ipv6_incrementation_dot1q_disabled_2(self, tg):
"""Check traffic class incrementation. Count = 2*Increment count. Dot1Q disabled.
"""
iface = tg.ports[0]
packet_count = 10
stream_id = tg.set_stream(IP6, count=packet_count, tc_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:0a:00:02:08")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 traffic class
tc_set = set(tg.get_packet_field(packet, "IP6", "fc") for packet in packets)
assert len(tc_set) == packet_count // 2
[docs] def test_traffic_class_ipv6_incrementation_dot1q_enabled(self, tg):
"""Check traffic class IPv6 incrementation. Count == Increment count. Dot1Q enabled.
"""
iface = tg.ports[0]
packet_count = 5
stream_id = tg.set_stream(DOT1Q_IP6, count=packet_count, tc_increment=(3, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=5, src_filter="00:00:00:03:02:01")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
# Verify that all packets with different IPv6 traffic class
tc_set = set(tg.get_packet_field(packet, "IP6", "fc") for packet in packets)
assert len(tc_set) == packet_count
[docs] def test_qos_vlan_stat(self, tg):
"""Check Ixia QoS vlan stat reading.
"""
if tg.type != "ixiahl":
pytest.skip("Get Qos Frames count increment isn't supported by Pypacker TG")
iface = tg.ports[0]
packet_count = 52
dst_mac = "00:00:00:00:00:aa"
src_mac = "00:00:00:00:00:bb"
pack_p0 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 0}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p1 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 1}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p2 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 2}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p3 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 3}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p4 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 4}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p5 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 5}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p6 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 6}},
{"IP": {"p": 17}}, {"UDP": {}}, )
pack_p7 = ({"Ethernet": {"dst": dst_mac, "src": src_mac}},
{"Dot1Q": {"vid": 10, "prio": 7}},
{"IP": {"p": 17}}, {"UDP": {}}, )
stream_ids = []
stream_ids.append(tg.set_stream(pack_p0, count=8, iface=iface))
stream_ids.append(tg.set_stream(pack_p1, count=7, iface=iface))
stream_ids.append(tg.set_stream(pack_p2, count=6, iface=iface))
stream_ids.append(tg.set_stream(pack_p3, count=5, iface=iface))
stream_ids.append(tg.set_stream(pack_p4, count=5, iface=iface))
stream_ids.append(tg.set_stream(pack_p5, count=6, iface=iface))
stream_ids.append(tg.set_stream(pack_p6, count=7, iface=iface))
stream_ids.append(tg.set_stream(pack_p7, count=8, iface=iface))
tg.set_qos_stat_type(iface, "VLAN")
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5)
tg.start_streams(stream_ids)
data = tg.stop_sniff([iface])
tg.stop_streams(stream_ids)
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
assert tg.get_received_frames_count(iface) == packet_count
assert tg.get_qos_frames_count(iface, 0) == 8
assert tg.get_qos_frames_count(iface, 1) == 7
assert tg.get_qos_frames_count(iface, 2) == 6
assert tg.get_qos_frames_count(iface, 3) == 5
assert tg.get_qos_frames_count(iface, 4) == 5
assert tg.get_qos_frames_count(iface, 5) == 6
assert tg.get_qos_frames_count(iface, 6) == 7
assert tg.get_qos_frames_count(iface, 7) == 8
[docs] def test_qos_iptos_stat(self, tg):
"""Check Ixia QoS IP TOS stat reading.
"""
if tg.type != "ixiahl":
pytest.skip("Get Qos Frames count increment isn't supported by Pypacker TG")
iface = tg.ports[0]
dst_mac = "00:00:00:00:00:55"
src_mac = "00:00:00:00:00:77"
pack_list = []
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x00}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x0f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x1f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x20}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x30}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x3f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x40}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x5a}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x5f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x60}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x71}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x7f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x80}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x8f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0x9f}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xa0}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xb3}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xbf}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xc0}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xc5}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xdf}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xe0}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xe1}}, {"TCP": {}}))
pack_list.append(({"Ethernet": {"dst": dst_mac, "src": src_mac}}, {"IP": {"tos": 0xff}}, {"TCP": {}}))
packet_count = len(pack_list)
stream_ids = []
for pack in pack_list:
stream_ids.append(tg.set_stream(pack, count=1, iface=iface))
tg.set_qos_stat_type(iface, "IP")
tg.clear_statistics([iface])
tg.start_sniff([iface], sniffing_time=5)
tg.start_streams(stream_ids)
data = tg.stop_sniff([iface])
tg.stop_streams(stream_ids)
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
assert tg.get_received_frames_count(iface) == packet_count
for prio in range(8):
assert tg.get_qos_frames_count(iface, prio) == 3
[docs] def test_get_rate_stat(self, tg):
"""Check transmit rate reading.
"""
if tg.type != "ixiahl":
pytest.skip("Get port txrate increment isn't supported by Pypacker TG.")
iface = tg.ports[0]
stream_id_1 = tg.set_stream(ETH_IP_TCP, continuous=True, inter=0.1, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_TCP, continuous=True, inter=0.05, iface=iface)
tg.start_streams([stream_id_1])
time.sleep(1)
assert 10 * 0.9 <= tg.get_port_txrate(iface) <= 10 * 1.1
assert 10 * 0.9 <= tg.get_port_rxrate(iface) <= 10 * 1.1
tg.stop_streams([stream_id_1])
tg.start_streams([stream_id_2])
time.sleep(1)
assert 20 * 0.95 <= tg.get_port_txrate(iface) <= 20 * 1.05
assert 20 * 0.95 <= tg.get_port_rxrate(iface) <= 20 * 1.05
tg.stop_streams([stream_id_2])
tg.start_streams([stream_id_1, stream_id_2])
time.sleep(1)
assert 30 * 0.96 <= tg.get_port_txrate(iface) <= 30 * 1.04
assert 30 * 0.96 <= tg.get_port_rxrate(iface) <= 30 * 1.04
tg.stop_streams([stream_id_1, stream_id_2])
[docs] def test_check_increment_ip_src(self, tg):
"""Check all fields in incremented packet. IP.src increment.
"""
iface = tg.ports[0]
packet_count = 5
src_mac = DOT1Q_IP_UDP[0]["Ethernet"]["src"]
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, sip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_check_increment_ip_dst(self, tg):
"""Check all fields in incremented packet. IP.dst increment.
"""
iface = tg.ports[0]
packet_count = 1
src_mac = DOT1Q_IP_UDP[0]["Ethernet"]["src"]
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, dip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_check_increment_ip_dscp(self, tg):
"""Check all fields in incremented packet. IP.tos increment.
"""
iface = tg.ports[0]
packet_count = 1
src_mac = DOT1Q_IP_UDP[0]["Ethernet"]["src"]
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, dscp_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_check_increment_ip_proto(self, tg):
"""Check all fields in incremented packet. IP.proto increment.
"""
iface = tg.ports[0]
packet_count = 1
src_mac = DOT1Q_IP_UDP[0]["Ethernet"]["src"]
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, protocol_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_check_increment_arp_hwsrc(self, tg):
"""Check all fields in incremented packet. APR.hwsrc increment.
"""
iface = tg.ports[0]
packet_count = 3
src_mac = DOT1Q_ARP[0]["Ethernet"]["src"]
stream_id = tg.set_stream(DOT1Q_ARP, count=3, arp_sa_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=3, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == 1, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_ARP, received)
[docs] def test_check_increment_arp_psrc(self, tg):
"""Check all fields in incremented packet. APR.psrc increment.
"""
iface = tg.ports[0]
packet_count = 1
src_mac = DOT1Q_ARP[0]["Ethernet"]["src"]
stream_id = tg.set_stream(DOT1Q_ARP, count=1, arp_sip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=3, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_ARP, received)
[docs] def test_check_increment_igmp_ip(self, tg):
"""Check all fields in incremented packet. IGMP.ip increment.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(ETH_IP_IGMP, count=packet_count, igmp_ip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_IGMP, received)
packet = copy.deepcopy(ETH_IP_IGMP)
packet[3]["IGMP"]["type"] = 18
stream_id = tg.set_stream(packet, count=packet_count, igmp_ip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(packet, received)
packet = copy.deepcopy(ETH_IP_IGMP)
packet[3]["IGMP"]["type"] = 23
stream_id = tg.set_stream(packet, count=packet_count, igmp_ip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(packet, received)
packet = copy.deepcopy(ETH_IP_IGMP)
packet[3]["IGMP"]["type"] = 34
packet[3]["IGMP"]["maxresp"] = 0
stream_id = tg.set_stream(packet, count=packet_count, igmp_ip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(packet, received)
packet_count = 4
packet = copy.deepcopy(ETH_IP_IGMP)
packet[3]["IGMP"]["type"] = 22
stream_id = tg.set_stream(packet, count=packet_count, igmp_ip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(packet, received)
[docs] def test_check_increment_ip_icmp(self, tg):
"""Check all fields in incremented packet. IP.src increment.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(ETH_IP_ICMP, count=packet_count, sip_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=3, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_ICMP, received)
[docs] def test_check_increment_udp_sport(self, tg):
"""Check all fields in incremented packet. UDP.sport increment.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, sudp_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_UDP, received)
[docs] def test_check_increment_udp_dport(self, tg):
"""Check all fields in incremented packet. UDP.dport increment.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(ETH_IP_UDP, count=packet_count, dudp_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_UDP, received)
[docs] def test_check_increment_dot1q_vlan_single(self, tg):
"""Check all fields in incremented packet. Dot1Q.vlan increment.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, vlan_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_check_increment_dot1q_vlan_double(self, tg):
"""Check all fields in incremented packet. Dot1Q.vlan increment.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(QINQ, count=packet_count, vlan_increment=(2, 5), iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(QINQ, received)
[docs] def test_stop_sniffing(self, tg):
"""Start continuous stream and stop sniffing.
"""
iface = tg.ports[0]
stream_id_1 = tg.set_stream(PACKET_DEFINITION, continuous=True, iface=iface)
tg.start_sniff([iface])
tg.start_streams([stream_id_1])
tg.stop_streams([stream_id_1])
tg.stop_sniff([iface])
start_receive_statistics = tg.get_received_frames_count(iface)
end_receive_statistics = tg.get_received_frames_count(iface)
assert start_receive_statistics == end_receive_statistics
[docs] def test_packet_with_ipoption(self, tg):
"""Test building packet with IPOption.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(ETH_IP_IGMP, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_IGMP, received)
[docs] def test_dot1q_arp_filter(self, tg):
"""Check Dot1Q.ARP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_ARP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ARP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.ARP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.ARP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_ARP, received)
[docs] def test_dot1q_arp_custom_filter(self, tg):
"""Check Dot1Q.ARP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_dot1q_arp = (12, "81 00 00 00 08 06", "00 00 FF FF 00 00")
stream_id_1 = tg.set_stream(DOT1Q_ARP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ARP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="Dot1Q.ARP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.ARP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=5, filter_layer=filter_dot1q_arp)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.ARP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_not_arp_filter(self, tg):
"""Check notARP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ARP, count=packet_count, iface=iface)
stream_id_3 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
stream_id_4 = tg.set_stream(DOT1Q_ARP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=4, filter_layer="notARP",
src_filter=SRC_MAC)
tg.start_streams([stream_id_1, stream_id_2, stream_id_3, stream_id_4])
tg.stop_streams([stream_id_1, stream_id_2, stream_id_3, stream_id_4])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified ARP filter layer are sniffed
assert len(packets) == packet_count * 3, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
[docs] def test_dot1q_filter(self, tg):
"""Check Dot1Q filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_ARP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ARP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=4, filter_layer="Dot1Q.ARP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
assert tg.get_packet_field(packets[0], "Ethernet", "vlan")
[docs] def test_dot1q_custom_filter(self, tg):
"""Check Dot1Q filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_dot1q_arp = (12, "81 00 00 00 08 06", "00 00 FF FF 00 00")
stream_id_1 = tg.set_stream(DOT1Q_ARP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ARP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.ARP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_dot1q_arp)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_ip_filter(self, tg):
"""Check IP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="IP", dst_filter=DST_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified IP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_UDP, received)
[docs] def test_ip_custom_filter(self, tg):
"""Check IP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_ip = (12, "08 00", "00 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="IP", dst_filter=DST_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified IP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_ip, dst_filter=DST_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified IP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_dot1q_ip_filter(self, tg):
"""Check Dot1Q.IP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.IP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.IP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_dot1q_ip_custom_filter(self, tg):
"""Check Dot1Q.IP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_dot1q_ip = (12, "81 00 00 00 08 00", "00 00 FF FF 00 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.IP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.IP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_dot1q_ip)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.IP filter layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] @pytest.mark.skip("STP is not integrated yet")
def test_stp_filter(self, tg):
"""Check STP filter.
"""
iface = tg.ports[0]
stream_id_1 = tg.set_stream(STP, count=1, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="STP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
# Verify that only packets with specified STP layer are sniffed
assert len(data[iface]) == 1
assert tg.get_packet_layer(data[iface][0], "STP") is not None
[docs] @pytest.mark.skip("STP is not integrated yet")
def test_stp_custom_filter(self, tg):
"""Check STP filter.
"""
iface = tg.ports[0]
stream_id_1 = tg.set_stream(STP, count=1, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="STP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
# Verify that only packets with specified STP layer are sniffed
assert len(data[iface]) == 1
p1 = data[iface][0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=(14, "42 42 03 00 00", "00 00 00 00 00"))
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
# Verify that only packets with specified STP layer are sniffed
assert len(data[iface]) == 1
p2 = data[iface][0]
assert p1.bin() == p2.bin()
[docs] @pytest.mark.skip("STP is not integrated yet")
def test_not_stp_filter(self, tg):
"""Check notSTP filter.
"""
iface = tg.ports[0]
stream_id_1 = tg.set_stream(STP, count=1, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="notSTP",
src_filter=SRC_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
# Verify that only packets with specified not STP layer are sniffed
assert len(data[iface]) == 1
assert tg.get_packet_layer(data[iface][0], "STP") is None
[docs] def test_tcp_filter(self, tg):
"""Check TCP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_TCP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="TCP", dst_filter=DST_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified TCP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_TCP, received)
[docs] def test_tcp_custom_filter(self, tg):
"""Check TCP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_tcp = (12, "08 00 00 00 00 00 00 00 00 00 00 06", "00 00 FF FF FF FF FF FF FF FF FF 00")
stream_id_1 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_TCP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="TCP", dst_filter=DST_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified TCP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_tcp, dst_filter=DST_MAC)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified TCP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_dot1q_tcp_filter(self, tg):
"""Check Dot1Q.TCP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_TCP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_TCP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.TCP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.TCP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_TCP, received)
[docs] def test_dot1q_tcp_custom_filter(self, tg):
"""Check Dot1Q.TCP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_dot1q_tcp = (12, "81 00 00 00 08 00 00 00 00 00 00 00 00 00 00 06",
"00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_TCP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_TCP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.TCP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.TCP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_dot1q_tcp)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.TCP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_udp_filter(self, tg):
"""Check UDP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="UDP", src_filter=SRC_MAC)
tg.start_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified UDP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_UDP, received)
[docs] def test_udp_custom_filter(self, tg):
"""Check UDP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_udp = (12, "08 00 00 00 00 00 00 00 00 00 00 11",
"00 00 FF FF FF FF FF FF FF FF FF 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="UDP", src_filter=SRC_MAC)
tg.start_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified UDP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_udp, src_filter=SRC_MAC)
tg.start_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified UDP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_dot1q_udp_filter(self, tg):
"""Check Dot1Q.UDP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.UDP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.UDP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_UDP, received)
[docs] def test_dot1q_udp_custom_filter(self, tg):
"""Check Dot1Q.UDP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_dot1q_udp = (12, "81 00 00 00 08 00 00 00 00 00 00 00 00 00 00 11",
"00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_UDP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_UDP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.UDP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.UDP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_dot1q_udp)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.UDP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_icmp_filter(self, tg):
"""Check ICMP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_ICMP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="ICMP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified ICMP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(ETH_IP_ICMP, received)
[docs] def test_icmp_custom_filter(self, tg):
"""Check ICMP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_icmp = (12, "08 00 00 00 00 00 00 00 00 00 00 01",
"00 00 FF FF FF FF FF FF FF FF FF 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_ICMP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="ICMP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified ICMP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_icmp)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified ICMP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] def test_dot1q_icmp_filter(self, tg):
"""Check Dot1Q.ICMP filter.
"""
iface = tg.ports[0]
packet_count = 1
stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_ICMP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.ICMP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.ICMP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(DOT1Q_IP_ICMP, received)
[docs] def test_dot1q_icmp_custom_filter(self, tg):
"""Check Dot1Q.ICMP filter.
"""
iface = tg.ports[0]
packet_count = 1
filter_dot1q_icmp = (12, "81 00 00 00 08 00 00 00 00 00 00 00 00 00 00 01",
"00 00 FF FF 00 00 FF FF FF FF FF FF FF FF FF 00")
stream_id_1 = tg.set_stream(DOT1Q_IP_ICMP, count=packet_count, iface=iface)
stream_id_2 = tg.set_stream(ETH_IP_ICMP, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, filter_layer="Dot1Q.ICMP")
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.ICMP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p1 = packets[0]
tg.start_sniff([iface], sniffing_time=2, filter_layer=filter_dot1q_icmp)
tg.start_streams([stream_id_1, stream_id_2])
tg.stop_streams([stream_id_1, stream_id_2])
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
# Verify that only packets with specified Dot1Q.ICMP layer are sniffed
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
p2 = packets[0]
assert p1.bin() == p2.bin()
[docs] @pytest.mark.skip("BGP is not integrated yet")
def test_build_bgp_packet_simple(self, tg):
"""Check building BGP packet.
"""
iface = tg.ports[0]
src_mac = '00:00:00:00:00:cc'
dst_mac = '00:00:00:00:00:99'
bgp_open = ({"Ether": {"src": src_mac, "dst": dst_mac, "type": 0x8100}},
{"Dot1Q": {"vid": 7}},
{"IP": {"dst": "10.0.0.1", "src": "10.0.0.2", "tos": 6}},
{"TCP": {"sport": 179, "dport": 47330, "seq": 305, "ack": 887850408, "flags": 0x18}},
{"BGPHeader": {"type": 2}},
{"BGPUpdate": {"withdrawn_len": 0, "withdrawn": [], "nlri": [(24, '20.1.1.0')],
"total_path": [{"BGPPathAttribute": {"type": 1, "origin": 1}},
{"BGPPathAttribute": {"type": 2, "aspath": []}}]}}, )
stream_id = tg.set_stream(bgp_open, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=4, filter_layer="Dot1Q.TCP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
assert iface in data and data[iface]
# filter our packets from data
data[iface] = [x for x in data[iface] if x.get_lfield("Ether", "dst") == dst_mac and x.get_lfield("Ether", "src") == src_mac]
assert len(data[iface]) == 1
assert data[iface][0].get_lfield("BGPHeader", "type") is not None
assert tg.packet_dictionary(data[iface][0])
assert data[iface][0].get_lcount("BGPPathAttribute") == 2
[docs] @pytest.mark.skip("BGP is not integrated yet")
def test_build_bgp_packet_as_path(self, tg):
"""Check building BGP packet with multiple as_path.
"""
iface = tg.ports[0]
src_mac = '00:00:00:00:00:aa'
dst_mac = '00:00:00:00:00:bb'
bgp_update = ({'Ether': {'src': src_mac, 'dst': dst_mac, 'type': 2048}},
{'IP': {'frag': 0, 'src': '192.168.0.15', 'proto': 6, 'tos': 0, 'dst': '192.168.0.33',
'ttl': 64, 'id': 18669, 'version': 4, 'flags': 2, 'ihl': 5, 'chksum': 28554}},
{'TCP': {'reserved': 0, 'seq': 3593706898, 'ack': 2051072118, 'dataofs': 5,
'dport': 179, 'window': 32120, 'flags': 24, 'chksum': 21876,
'urgptr': 0, 'sport': 2124, 'options': []}},
{'BGPHeader': {'type': 4, 'len': 19}},
{'BGPHeader': {'type': 2, 'len': 98}},
{'BGPUpdate': {'nlri': (16, "172.16.0.0"), 'tp_len': 72, 'withdrawn_len': 0,
'total_path': [
{'BGPPathAttribute': {'origin': 2, 'attr_len': 1, 'flags': 64, 'type': 1}},
{'BGPPathAttribute': {'flags': 64, 'type': 2, 'attr_len': 10, 'aspath': [
{'BGPASPath': {'ases': [500, 500], 'type': 1, 'len': 2}},
{'BGPASPath': {'ases': [65211], 'type': 2, 'len': 1}}]}},
{'BGPPathAttribute': {'attr_len': 4, 'nexthop': '192.168.0.15', 'flags': 64, 'type': 3}},
{'BGPPathAttribute': {'attr_len': 4, 'localpref': 100, 'flags': 64, 'type': 5}},
{'BGPPathAttribute': {'attr_len': 0, 'value': '', 'flags': 64, 'type': 6}},
{'BGPPathAttribute': {'attr_len': 6, 'aggas': 65210, 'aggorigin': '192.168.0.10',
'flags': 192, 'type': 7}},
{'BGPPathAttribute': {'attr_len': 12, 'flags': 192, 'type': 8,
'communities': [4273930241, 51773444, 22282490]}},
{'BGPPathAttribute': {'attr_len': 4, 'oi': '192.168.0.15',
'flags': 128, 'type': 9}},
{'BGPPathAttribute': {'type': 10, 'flags': 128, 'value': '\xc0\xa8\x00\xfa',
'attr_len': 4}}], }},
{'BGPHeader': {'type': 2, 'len': 99}},
{'BGPUpdate': {'withdrawn': [], 'nlri': (22, '192.168.4.0'), 'tp_len': 72,
'withdrawn_len': 0, 'total_path': [
{'BGPPathAttribute': {'origin': 0, 'attr_len': 1, 'flags': 64, 'type': 1}},
{'BGPPathAttribute': {'flags': 64, 'type': 2, 'attr_len': 10, 'aspath': [
{'BGPASPath': {'ases': [500, 500], 'type': 1, 'len': 2}},
{'BGPASPath': {'ases': [65211], 'type': 2, 'len': 1}}]}},
{'BGPPathAttribute': {'attr_len': 4, 'nexthop': '192.168.0.15',
'flags': 64, 'type': 3}},
{'BGPPathAttribute': {'attr_len': 4, 'localpref': 100, 'flags': 64, 'type': 5}},
{'BGPPathAttribute': {'attr_len': 0, 'value': '', 'flags': 64, 'type': 6}},
{'BGPPathAttribute': {'attr_len': 6, 'flags': 192, 'type': 7,
'aggas': 65210, 'aggorigin': "192.168.0.10"}},
{'BGPPathAttribute': {'attr_len': 12, 'flags': 192, 'type': 8,
'communities': [4273930241, 51773444, 22282490]}},
{'BGPPathAttribute': {'attr_len': 4, 'oi': '192.168.0.15',
'flags': 128, 'type': 9}},
{'BGPPathAttribute': {'type': 10, 'flags': 128, 'value': '\xc0\xa8\x00\xfa',
'attr_len': 4}}]}})
stream_id = tg.set_stream(bgp_update, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=4, filter_layer="TCP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
assert iface in data and data[iface]
# filter our packets from data
data[iface] = [x for x in data[iface] if x.get_lfield("Ether", "dst") == dst_mac and x.get_lfield("Ether", "src") == src_mac]
assert len(data[iface]) == 1
assert data[iface][0].get_lfield("BGPHeader", "type") is not None
assert data[iface][0].get_lfield("BGPUpdate", "total_path") is not None
assert data[iface][0].get_lfield("BGPPathAttribute", "type") is not None
[docs] @pytest.mark.skip("BGP is not integrated yet")
def test_build_bgp_notification_packet(self, tg):
"""Check building BGPNotification packet.
"""
iface = tg.ports[0]
src_mac = '00:00:00:00:00:cc'
dst_mac = '00:00:00:00:00:99'
bgp_open = ({"Ether": {"src": src_mac, "dst": dst_mac, "type": 0x8100}},
{"Dot1Q": {"vid": 7}},
{"IP": {"dst": "10.0.0.1", "src": "10.0.0.2", "tos": 6}},
{"TCP": {"sport": 179, "dport": 47330, "seq": 305, "ack": 887850408, "flags": 0x18}},
{"BGPHeader": {"type": 3}},
{"BGPNotification": {"ErrorCode": 6, "ErrorSubCode": 1, "Data": '\x00\x01\x01\x00\x00\x00\x02'}},
)
stream_id = tg.set_stream(bgp_open, count=1, iface=iface)
tg.start_sniff([iface], sniffing_time=4, filter_layer="Dot1Q.TCP")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
assert iface in data and data[iface]
# filter our packets from data
data[iface] = [x for x in data[iface] if x.get_lfield("Ether", "dst") == dst_mac and x.get_lfield("Ether", "src") == src_mac]
assert len(data[iface]) == 1
assert data[iface][0].get_lfield("BGPHeader", "type") is not None
assert tg.packet_dictionary(data[iface][0])
assert data[iface][0].get_lcount("BGPNotification") == 1
[docs] @pytest.mark.skip("STP is not integrated yet")
def test_xstp_build_capture(self, tg):
"""Check stp/rstp/mstp build and detection.
"""
iface = tg.ports[0]
pack_rstp_2 = ({"Dot3": {"src": "00:00:00:11:11:11", "dst": DST_MAC}},
{"LLC": {"dsap": 66, "ssap": 66, "ctrl": 3}},
{"STP": {"proto": 0, "version": 2, "v1len": 0}})
pack_mstp_2 = MSTP + MSTI_BPDU
stream_id_1 = tg.set_stream(STP, count=2, inter=0.1, iface=iface, adjust_size=False)
stream_id_2 = tg.set_stream(RSTP, count=2, inter=0.1, iface=iface, adjust_size=False)
stream_id_3 = tg.set_stream(pack_rstp_2, count=2, inter=0.1, iface=iface, adjust_size=False)
stream_id_4 = tg.set_stream(RSTP, count=2, inter=0.1, iface=iface, adjust_size=False)
stream_id_5 = tg.set_stream(pack_mstp_2, count=2, inter=0.1, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=20, filter_layer="STP", src_filter="00:00:00:11:11:11")
tg.start_streams([stream_id_1, stream_id_2, stream_id_3, stream_id_4, stream_id_5])
data = tg.stop_sniff([iface])
tg.stop_streams([stream_id_1, stream_id_2, stream_id_3, stream_id_4, stream_id_5])
assert iface in data and data[iface]
assert len(data[iface]) == 10
# Verify that we captured 2 ConfBPDUs
assert len([x for x in data[iface] if x.get_lfield("STP", "version") == 0]) == 2
# Verify that we captured 4 RST BPDUs and all of them have v1len field
assert len([x for x in data[iface] if x.get_lfield("STP", "version") == 2 and x.get_lfield("STP", "v1len") == 0]) == 4
# Verify that we captured 4 MST BPDUs
assert len([x for x in data[iface] if x.get_lfield("STP", "version") == 3]) == 4
# Verify that 2 MST BPDUs have MSTI Configuration messages
assert len([x for x in data[iface] if x.get_lfield("STP", "version") == 3 and x.get_lfield("STP", "v3len") > 64 and x.get_lfield("MstiConfigMsg", "rootmac")]) == 2
[docs] def test_double_tagged_packet_1(self, tg):
"""Verify that pypacker can recognize QinQ packets type 0x9100.
"""
iface = tg.ports[0]
packet_count = 1
src_mac = QINQ[0]["Ethernet"]["src"]
stream_id = tg.set_stream(QINQ, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
packet = packets[0]
# Verify ether type
assert tg.get_packet_field(packet, "Ethernet", "type") == ETHER_TYPE_IP
assert tg.get_packet_field(packet, "S-Dot1Q", "type") == ETHER_TYPE_TUNNELING
assert tg.get_packet_field(packet, "C-Dot1Q", "type") == ETHER_TYPE_8021Q
# Verify that we have 2 Dot1Q layers with different prio.
assert tg.get_packet_field(packet, "S-Dot1Q", "prio") == DOT1Q_PRIO_1
assert tg.get_packet_field(packet, "C-Dot1Q", "prio") == DOT1Q_PRIO_2
[docs] def test_double_tagged_packet_2(self, tg):
"""Verify that pypacker can recognize QinQ packets type 0x88A8.
"""
iface = tg.ports[0]
packet_count = 1
packet = copy.deepcopy(QINQ)
packet[1]["Dot1Q"]["type"] = ETHER_TYPE_PBRIDGE
src_mac = packet[0]["Ethernet"]["src"]
stream_id = tg.set_stream(packet, count=packet_count, iface=iface)
tg.start_sniff([iface], sniffing_time=2, src_filter=src_mac)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(packet, received)
[docs] def test_default_ether_type(self, tg):
"""Verify that default Ether type for tagged packets is equal to 0x8100.
"""
iface = tg.ports[0]
packet_count = 1
# Define packet without setting type for Ether layer.
pack = copy.deepcopy(DOT1Q_IP_TCP)
pack[1]["Dot1Q"]["prio"] = DOT1Q_PRIO_2
stream_id = tg.set_stream(pack, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
packet = packets[0]
assert tg.get_packet_field(packet, "Ethernet", "src") == SRC_MAC
assert tg.get_packet_field(packet, "Ethernet", "dst") == DST_MAC
assert tg.get_packet_field(packet, "Ethernet", "type") == ETHER_TYPE_IP
assert tg.check_packet_field(packet, "S-Dot1Q", "vid", VLAN_1)
assert tg.check_packet_field(packet, "S-Dot1Q", "type", ETHER_TYPE_8021Q)
assert tg.check_packet_field(packet, "S-Dot1Q", "cfi", DOT1Q_DEFAULT_CFI)
assert tg.check_packet_field(packet, "S-Dot1Q", "prio", DOT1Q_PRIO_2)
[docs] def test_pause_frames_0001(self, tg):
"""Verify that MAC Control Pause frames with opcode 0x0001 are builded and sniffed correctly.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(PAUSE, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
packet = packets[0]
assert tg.get_packet_field(packet, "Ethernet", "type") == ETHER_TYPE_EFC
assert tg.get_packet_field(packet, "FlowControl", "opcode") == PAUSE_CODE
assert tg.get_packet_field(packet, "Pause", "ptime") == PAUSE_TIME
[docs] def test_pause_frames_0101(self, tg):
"""Verify that MAC Control Pause frames with opcode 0x0101 are builded and sniffed correctly.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(PFC, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
packet = packets[0]
assert tg.get_packet_field(packet, "Ethernet", "type") == ETHER_TYPE_EFC
assert tg.get_packet_field(packet, "FlowControl", "opcode") == PFC_CODE
assert tg.get_packet_field(packet, "PFC", "ms") == PFC_MS
assert tg.get_packet_field(packet, "PFC", "ls_list") == PFC_LS
assert tg.get_packet_field(packet, "PFC", "time_list") == PFC_TIME
[docs] def test_pause_frames_ffff(self, tg):
"""Verify that MAC Control Pause frames with unknown are builded and sniffed correctly.
"""
iface = tg.ports[0]
packet_count = 1
opcode = 0xffff
pack = copy.deepcopy(PAUSE)
pack[1]["FlowControl"]["opcode"] = opcode
stream_id = tg.set_stream(pack, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=3)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(pack, received)
[docs] def test_lldp_build_capture(self, tg):
"""Verify that LLDP packets are builded and sniffed correctly.
"""
iface = tg.ports[0]
packet_count = 1
lldp_managment_with_oid = copy.deepcopy(LLDP_MAN_ADDR_TLV)
lldp_managment_with_oid["LLDPManagementAddress"]["oid"] = b"1.3.6.1.4.1.731.3.2.30.1.1.7"
lldp_org_spec_tlv_1 = {"LLDPOrgSpecGeneric": {"subtype": 3, "tlv_type": 127, "oui": 4623,
"value": b"\x01\x00\x00\x00\x00"},
}
lldp_org_spec_tlv_2 = {"LLDPOrgSpecGeneric": {"subtype": 10, "tlv_type": 127, "oui": 4795,
"value": b"System Product Name"},
}
lldp_pack = copy.deepcopy(LLDP)
lldp_pack[1]['LLDP']['tlvlist'][-1:-1] = [lldp_managment_with_oid,
lldp_org_spec_tlv_1,
lldp_org_spec_tlv_2,
]
stream_id = tg.set_stream(lldp_pack, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(lldp_pack, received)
[docs] def test_lldp_dcbx(self, tg):
"""Verify that DCBX packets are built and captured correctly.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(LLDP_DCBX, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(LLDP_DCBX, received)
[docs] def test_lldp_dcbx_app_prio_table(self, tg):
"""Verify that DCBX packets with Application Priority Tables are built and captured correctly.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(LLDP_DCBX_APP_PRIO, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(LLDP_DCBX_APP_PRIO, received)
[docs] def test_lldp_sys_capabilities(self, tg):
"""Verify that LLDP packets with full System capabilities list are built and captured correctly.
"""
iface = tg.ports[0]
packet_count = 1
# other(1bit)+repeater(2bit)+bridge(3bit)+router(5bit)+telephone(6bit)
capabilities = 55
# other(1bit)+repeater(2bit)+bridge(3bit)+router(5bit)
enabled = 23
lldp_sys_capabilietes = copy.deepcopy(LLDP_SYS_CAPAB_TLV)
lldp_sys_capabilietes["LLDPSystemCapabilities"]["capabilities"] = capabilities
lldp_sys_capabilietes["LLDPSystemCapabilities"]["enabled"] = enabled
lldp_pack = ({"Ethernet": {"dst": LLDP_DST_MAC, "src": SRC_MAC, "type": ETHER_TYPE_LLDP}},
{"LLDP": {'tlvlist': [LLDP_CHASSIS_ID_TLV, LLDP_PORT_ID_TLV,
LLDP_TTL_TLV, lldp_sys_capabilietes, LLDP_END_TLV,
],
},
})
stream_id = tg.set_stream(lldp_pack, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(lldp_pack, received)
[docs] def test_lldp_with_padding(self, tg):
"""Verify that LLDP packets with with padding are built and captured correctly.
"""
iface = tg.ports[0]
packet_count = 1
padding = b"\x00" * 10
lldp_pack = ({"Ethernet": {"dst": LLDP_DST_MAC, "src": SRC_MAC, "type": ETHER_TYPE_LLDP,
"padding": padding}},
{"LLDP": {'tlvlist': [LLDP_CHASSIS_ID_TLV, LLDP_PORT_ID_TLV,
LLDP_TTL_TLV, LLDP_END_TLV,
],
},
})
stream_id = tg.set_stream(lldp_pack, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, dst_filter=LLDP_DST_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
packet = packets[0]
# Verify received packet
assert tg.get_packet_field(packet, "Ethernet", "src") == SRC_MAC
assert tg.get_packet_field(packet, "Ethernet", "dst") == LLDP_DST_MAC
assert tg.get_packet_field(packet, "Ethernet", "type") == ETHER_TYPE_LLDP
assert tg.get_packet_field(packet, "LLDP", "tlvlist")
assert tg.get_packet_field(packet, "LLDPChassisId", "tlv_type") == LLDP_CHASSIS_ID_TLV_TYPE
assert tg.get_packet_field(packet, "LLDPChassisId", "subtype") == LLDP_MAC_SUBTYPE
assert tg.get_packet_field(packet, "LLDPChassisId", "value") == SRC_MAC
assert tg.get_packet_field(packet, "LLDPPortId", "tlv_type") == LLDP_PORT_ID_TLV_TYPE
assert tg.get_packet_field(packet, "LLDPPortId", "subtype") == LLDP_PORT_ID_INTERFACE_SUBTYPE
assert tg.get_packet_field(packet, "LLDPPortId", "value") == LLDP_PORT_ID_INTERFACE_VALUE
assert tg.get_packet_field(packet, "LLDPTTL", "tlv_type") == LLDP_TTL_TLV_TYPE
assert tg.get_packet_field(packet, "LLDPTTL", "seconds") == LLDP_TTL_SECONDS
assert tg.get_packet_field(packet, "LLDPDUEnd", "tlv_type") == 0
assert tg.get_packet_field(packet, "LLDPDUEnd", "tlv_len") == 0
[docs] def test_lacp_layers(self, tg):
"""Verify that LACP packets are built and captured correctly.
"""
iface = tg.ports[0]
packet_count = 1
stream_id = tg.set_stream(LACP, count=packet_count, iface=iface, adjust_size=False)
tg.start_sniff([iface], sniffing_time=2, dst_filter=LACP_DST_MAC, src_filter=SRC_MAC)
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
received = tg.packet_dictionary(packets[0])
# Verify received packet is equal to sent packet
self.verify_packets_data(LACP, received)
[docs] def test_pproc_packet_fragmentation_1(self, tg):
"""Check packet fragmentation.
"""
fragments = tg.packet_fragment(ETH_IP_ICMP, required_size=200, fragsize=110)
assert len(fragments) == 2
[docs] def test_pproc_packet_fragmentation_2(self, tg):
"""Check packet fragmentation. fragsize is None.
"""
fragments = tg.packet_fragment(ETH_IP_ICMP, required_size=200)
assert len(fragments) == 1
[docs] def test_pproc_packet_dictionary(self, tg):
"""Check packet dictionary. Fragsize is None.
"""
fragments = tg.packet_fragment(DOT1Q_IP_UDP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == DOT1Q_IP_UDP
fragments = tg.packet_fragment(DOT1Q_IP_TCP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == DOT1Q_IP_TCP
fragments = tg.packet_fragment(DOT1Q_IP_ICMP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == DOT1Q_IP_ICMP
fragments = tg.packet_fragment(DOT1Q_ARP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == DOT1Q_ARP
fragments = tg.packet_fragment(ETH_IP_ICMP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == ETH_IP_ICMP
fragments = tg.packet_fragment(ETH_IP_UDP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == ETH_IP_UDP
fragments = tg.packet_fragment(ETH_IP_TCP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == ETH_IP_TCP
fragments = tg.packet_fragment(ARP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == ARP
fragments = tg.packet_fragment(DOT1Q, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == DOT1Q
fragments = tg.packet_fragment(QINQ, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == QINQ
fragments = tg.packet_fragment(STP, adjust_size=False, required_size=200)
assert len(fragments) == 1
pac = tg.packet_dictionary(fragments[0])
assert pac == STP
[docs] @pytest.mark.skip("DHCP is not integrated yet")
def test_dhcp_ip_incrementation(self, tg):
"""Check dhcp ip incrementation. Count == Increment count.
"""
iface = tg.ports[0]
dhcp_request = ({"Ether": {"dst": BROADCAT_MAC, "src": '00:00:10:00:01:02'}},
{"IP": {"src": "0.0.0.0", "dst": "255.255.255.255", "ttl": 128}},
{"UDP": {"sport": 68, "dport": 67}},
{"BOOTP": {"chaddr": '00:00:10:00:01:02', "op": 1, "hops": 0, "siaddr": '10.0.3.3'}},
{"DHCP": {"options": [("message-type", "request"), "end"]}})
stream_id = tg.set_stream(dhcp_request, count=5, dhcp_si_increment=(2, 5), required_size=346, iface=iface)
tg.start_sniff([iface], sniffing_time=5, filter_layer="IP", src_filter="00:00:10:00:01:02")
tg.send_stream(stream_id)
data = tg.stop_sniff([iface])
assert iface in list(data.keys())
# Verify that sniffed count == count
assert len(data[iface]) == 5
# Verify that all packets with different src ip
src_ip_set = set()
for packet in data[iface]:
src_ip_set.add(tg.get_packet_field(packet, "BOOTP", "siaddr"))
assert len(src_ip_set) == 5
[docs] @pytest.mark.parametrize("padding_size", [26, 1476])
def test_send_sniff_max_min_packets(self, tg, padding_size):
"""Verify sending and sniffing of packets with minimal and maximal size.
"""
# 26(padding) + 20(ip header) + 14(ether header) + 4(crc) = 64
# 1476(padding) + 20(ip header) + 14(ether header) + 4(crc) = 1514
iface = tg.ports[0]
packet_count = 1
padding = b""
eth_len, ip_len, crc = 14, 20, 4
packet_size = padding_size + ip_len + eth_len + crc
for _ in range(padding_size):
padding += chr(random.randint(0, 16)).encode()
# send packet, sniff pkt stream
pkt = ({"Ethernet": {"src": SRC_MAC, "dst": DST_MAC, "padding": padding}},
{"IP": {"src": IP_SRC, "dst": IP_DST, "p": IP_PROTO_IP}})
tg.start_sniff([iface], sniffing_time=3,
filter_layer="IP", src_filter=SRC_MAC, dst_filter=DST_MAC)
tg.send_stream(tg.set_stream(pkt, count=packet_count, iface=iface, required_size=packet_size))
data = tg.stop_sniff([iface])
packets = data.get(iface, [])
assert len(packets) == packet_count, \
"Captured packets count {0} does not match expected {1}".format(len(packets), packet_count)
packet = packets[0]
# Expected size == Actual size
assert packet_size == len(packet), \
"Expected size: {0}\nActual size: {1}".format(packet_size, len(packet))
# Get padding and compare to original
received_padding = tg.get_packet_field(packet, "Ethernet", "padding")
assert padding_size + crc == len(received_padding)
[docs] def test_incrementation_negative_1(self, tg):
"""Verify that method set_stream returns Error message when layer is not defined in packet(1).
"""
if tg.type == 'ixiahl':
pytest.skip("This behavior isn't supported by IXIA TG")
iface = tg.ports[0]
packet = ({"Ethernet": {"src": SRC_MAC, "dst": DST_MAC, "type": ETHER_TYPE_IP}},
{"TCP": {}})
exception_message = []
kwargs = [{"sipv6_increment": (3, 5)},
{"dipv6_increment": (3, 5)},
{"sudp_increment": (3, 5)},
{"dudp_increment": (3, 5)},
{"fl_increment": (3, 5)},
{"vlan_increment": (3, 5)},
{"igmp_ip_increment": (1, 5)},
{"arp_sip_increment": (2, 5)},
{"dscp_increment": (1, 5)},
{"protocol_increment": (2, 5)}]
for kwarg in kwargs:
with pytest.raises(PypackerException) as excepinfo:
tg.set_stream(packet, count=5, iface=iface, **kwarg)
exception_message.append(excepinfo.value.parameter)
# verify expected result
result = ["Layer UDP is not defined", "Layer IP6 is not defined",
"VLAN tag is not defined", "Layer IGMP is not defined",
"Layer ARP is not defined", "Layer IP is not defined"]
assert len(exception_message) == len(kwargs)
assert set(exception_message) == set(result)
[docs] def test_incrementation_negative_2(self, tg):
"""Verify that method set_stream returns Error message when when layer is not defined in packet(2).
"""
if tg.type == 'ixiahl':
pytest.skip("This behavior isn't supported by IXIA TG")
iface = tg.ports[0]
packet = ({"IP": {"src": IP_SRC, "dst": IP_DST}}, {"TCP": {}})
exception_message = []
kwargs = [{"sa_increment": (3, 5)},
{"da_increment": (3, 5)},
{"arp_sa_increment": (3, 5)},
{"eth_type_increment": (3, 5)}]
for kwarg in kwargs:
with pytest.raises(PypackerException) as excepinfo:
tg.set_stream(packet, count=5, iface=iface, **kwarg)
exception_message.append(excepinfo.value.parameter)
# verify expected result
result = ["Layer Ethernet is not defined", "Layer Ethernet is not defined",
"Layer Ethernet is not defined", "Layer Ethernet is not defined"]
assert len(exception_message) == len(kwargs)
assert set(exception_message) == set(result)
[docs] def test_send_stream_several_times(self, tg):
"""Send stream several times and check statistics.
"""
iface = tg.ports[0]
packet_count = 10000
stream_id_1 = tg.set_stream(PACKET_DEFINITION, count=packet_count, rate=0.01, iface=iface)
tg.clear_statistics([iface])
tg.send_stream(stream_id_1)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_sent_statistics == packet_count
# Send stream again and verify all packets were sent
tg.send_stream(stream_id_1)
end_sent_statistics = tg.get_sent_frames_count(iface)
assert end_sent_statistics == 2 * packet_count
[docs] def test_send_several_streams(self, tg):
"""Send several streams.
"""
iface = tg.ports[0]
packets_count = 20
src_mac = PACKET_DEFINITION[0]["Ethernet"]["src"]
tg.clear_statistics([iface])
stream_1 = tg.set_stream(PACKET_DEFINITION, count=packets_count // 2, inter=0.1, iface=iface)
stream_2 = tg.set_stream(PACKET_DEFINITION, count=packets_count // 2, inter=0.1, iface=iface)
tg.start_sniff([iface], sniffing_time=4, src_filter=src_mac)
tg.send_stream(stream_1)
tg.send_stream(stream_2)
data = tg.stop_sniff([iface])
assert iface in data
assert len(data[iface]) == packets_count
sent_statistics = tg.get_sent_frames_count(iface)
assert sent_statistics == packets_count