# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``ovshelpers.py``
`Helpers functions for OVS test suites`
"""
import time
from . import loggers
# create logger for module
mod_logger = loggers.module_logger(name=__name__)
[docs]def set_ovs_config_with_controller(ports, switch, controller):
"""Set OVS configuration on Switch.
Args:
ports(dict): Links information
switch(SwitchGeneral): switch
controller(OvsControllerGeneralMixin): Controller
Returns:
None
"""
switch.ui.create_ovs_bridge(bridge_name="spp0")
controller_ip = "tcp:%s:%s" % (controller.ipaddr, controller.cport, )
switch.ui.create_ovs_bridge_controller("spp0", controller_ip)
for port in ports:
if port[0] != "tg1":
for link_id in ports[port]:
try:
switch.ui.get_table_ovs_ports()[ports[port][link_id]]
except Exception:
switch.ui.create_ovs_port(int(ports[port][link_id]), bridge_name="spp0")
# Wait until configuration is applied and connection to Controller is established
time.sleep(6)
[docs]def set_ovs_complex_config_with_controller(ports, switch, sw_id, controller):
"""Set OVS configuration on Switch.
Args:
ports(dict): Links information
switch(SwitchGeneral): switch
sw_id(int): id of switch in complex setup on which OVS is configured
controller(OvsControllerGeneralMixin): Controller
Returns:
None
"""
switch.ui.create_ovs_bridge(bridge_name="spp0")
controller_ip = "tcp:%s:%s" % (controller.ipaddr, str(controller.cport), )
switch.ui.create_ovs_bridge_controller("spp0", controller_ip)
for port in ports:
if port[0] != "tg1" and (port[0] == "sw%d" % (sw_id, )):
for link_id in ports[port]:
try:
switch.ui.get_table_ovs_ports()[ports[port][link_id]]
except Exception:
switch.ui.create_ovs_port(int(ports[port][link_id]), bridge_name="spp0")
# Wait until configuration is applied and connection to Controller is established
time.sleep(6)
[docs]def add_flow_via_controller(qualifiers, action, controller, name=None, dpid=None):
"""Add flow to OVS bridge via Controller.
Args:
qualifiers(str): flow qualifiers
action(str): flow actions
controller(OvsControllerGeneralMixin): Controller
name(str): flow name
dpid(bool): Get process ID
Returns:
None
"""
flow_command = "flow_add"
flow = "%s %s" % (qualifiers, action, )
if not name:
name = 'flow1'
if controller.name == 'nox':
controller.setprop(flow_command, [flow_command, flow])
elif controller.name == 'floodlight':
if not dpid:
dpid = controller.getprop('get_dpid', '')
mod_logger.info('Switch dpid: %s' % (dpid, ))
controller.setprop(flow_command, [dpid, flow, name])
elif controller.name == 'oftest':
controller.setprop(flow_command, [flow])
[docs]def delete_flow_via_controller(qualifiers, controller, name=None):
"""Delete flow from OVS bridge via Controller.
Args:
qualifiers(str): flow qualifiers
controller(OvsControllerGeneralMixin): Controller
name(str): flow name
Returns:
None
"""
flow_command = "flow_delete"
flow = qualifiers
if not name:
name = 'flow1'
if controller.name == 'nox':
controller.setprop(flow_command, [flow_command, flow])
elif controller.name == 'floodlight':
controller.setprop(flow_command, ['flow_delete', name])
elif controller.name == 'oftest':
controller.setprop(flow_command, [flow])
[docs]def create_packet_definition(packet_to_send):
"""Create packet definition to send.
Args:
packet_to_send(dict): dictionary with specified packet type and fields with values
Returns:
tuple(dict): packet_definition
"""
source_mac = "00:00:00:00:00:01"
destination_mac = "00:00:00:00:00:02"
source_ip = "10.10.10.1"
destination_ip = "10.10.10.2"
source_ip6 = 'fe80::214:f2ff:fe07:af0'
destination_ip6 = 'ff02::1'
sport = 1
dport = 2
tos = 4
if packet_to_send["type"] == "ip":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x0800}},
{"IP": {"dst": destination_ip, "src": source_ip, "tos": tos}},
{"TCP": {}})
elif packet_to_send["type"] == "tagged_ip":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x8100}},
{"Dot1Q": {"vlan": packet_to_send["vlan"],
"prio": packet_to_send["priority"]}},
{"IP": {"dst": destination_ip, "src": source_ip, "tos": tos}})
elif packet_to_send["type"] == "tcp":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x0800}},
{"IP": {"dst": destination_ip, "src": source_ip, "tos": tos}},
{"TCP": {"sport": sport, "dport": dport}})
elif packet_to_send["type"] == "udp":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x0800}},
{"IP": {"dst": destination_ip, "src": source_ip, "tos": tos}},
{"UDP": {"sport": sport, "dport": dport}})
elif packet_to_send["type"] == "double_tagged_ip":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x8100}},
{"Dot1Q": {"vlan": packet_to_send["outer_vlan"], "type": 0x8100,
"prio": packet_to_send["outer_priority"]}},
{"Dot1Q": {"vlan": packet_to_send["inner_vlan"], "type": 0x0800,
"prio": packet_to_send["inner_priority"]}},
{"IP": {"dst": destination_ip, "src": source_ip, "tos": tos}})
elif packet_to_send["type"] == "arp":
packet_definition = (
{"Ether": {"src": source_mac, "dst": 'FF:FF:FF:FF:FF:FF', "type": 0x0806}},
{"ARP": {"op": 1, "hwsrc": source_mac,
"psrc": source_ip, "pdst": destination_ip}},)
elif packet_to_send["type"] == "arp_reply_tagged":
packet_definition = ({"Ether": {"src": source_mac, "dst": destination_mac, "type": 0x8100}},
{"Dot1Q": {"vlan": 2}},
{"ARP": {"op": 2, "hwsrc": source_mac, "hwdst": destination_mac,
"pdst": destination_ip, "psrc": source_ip}}, )
elif packet_to_send["type"] == "icmp":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x0800}},
{"IP": {"dst": destination_ip, "src": source_ip, "proto": 1}},
{"ICMP": {"type": 8, "code": 0}})
elif packet_to_send["type"] == "ipv6":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x86dd}},
{"IPv6": {"dst": destination_ip6, "src": source_ip6, "version": 6,
"hlim": 255, "plen": 64, "tc": 225}})
elif packet_to_send["type"] == "tcp6":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x86dd}},
{"IPv6": {"dst": destination_ip6, "src": source_ip6, "version": 6,
"hlim": 255, "tc": 224, "nh": 6}},
{"TCP": {"sport": sport, "dport": dport}})
elif packet_to_send["type"] == "udp6":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x86dd}},
{"IPv6": {"dst": destination_ip6, "src": source_ip6, "version": 6,
"hlim": 255, "tc": 224, "nh": 17}},
{"UDP": {"sport": sport, "dport": dport}})
elif packet_to_send["type"] == "icmp6":
packet_definition = ({"Ether": {"dst": destination_mac, "src": source_mac, "type": 0x86dd}},
{"IPv6": {"dst": destination_ip6, "src": source_ip6, "version": 6,
"hlim": 255, "tc": 224, "nh": 1}},
{"ICMP": {"type": 8, "code": 0}})
return packet_definition
[docs]def set_ovs_test_preconditions(ports, test_preconditions, switch, controller):
"""Set test preconditions, e.g. flows.
Args:
ports(dict): ports
test_preconditions(tuple): test preconditions
switch(SwitchGeneral): switch
controller(OvsControllerGeneralMixin): Controller
Returns:
None
"""
if isinstance(test_preconditions, tuple):
for i, v in enumerate(test_preconditions):
name = "flow%d" % (i + 1, )
# configure actions string (if output port is in actions)
if "output" in test_preconditions[i][1]:
action = test_preconditions[i][1].split(",")
for j, k in enumerate(action):
if "output" in action[j]:
port_id = int(action[j][action[j].find(":") + 1:])
action[j] = "output:%d" % (int(ports[('sw1', 'tg1')][port_id], ))
action_string = ",".join(action)
test_preconditions[i][1] = action_string
if "enqueue" in test_preconditions[i][1]:
action = test_preconditions[i][1].split(",")
for j, l in enumerate(action):
if "enqueue" in action[j]:
port_id = int(action[j].split(":")[1])
action[j] = "enqueue:%d:%s" % (
int(ports[('sw1', 'tg1')][port_id]), action[j].split(":")[2], )
action_string = ",".join(action)
test_preconditions[i][1] = action_string
if "in_port" in test_preconditions[i][0]:
qualifiers = test_preconditions[i][0].split(",")
for j, g in enumerate(qualifiers):
if "in_port" in qualifiers[j]:
port_id = int(qualifiers[j][qualifiers[j].find("=") + 1:])
qualifiers[j] = "in_port=%d" % (int(ports[('sw1', 'tg1')][port_id], ))
qualifiers_string = ",".join(qualifiers)
test_preconditions[i][0] = qualifiers_string
add_flow_via_controller(test_preconditions[i][0], test_preconditions[i][1],
controller, name)
else:
add_flow_via_controller(test_preconditions[i][0], test_preconditions[i][1],
controller, name)
# Wait until Controller sends flow to Switch"
time.sleep(5)
[docs]def set_ovs_complex_test_preconditions(ports, test_preconditions, switch, controller, dpid=None):
"""Set test preconditions, e.g. flows.
Args:
ports(dict): ports
test_preconditions(tuple): test preconditions
switch(SwitchGeneral): switch
controller(OvsControllerGeneralMixin): Controller
dpid(bool): Get process ID
Returns:
None
"""
if isinstance(test_preconditions, tuple):
for i, v in enumerate(test_preconditions):
name = "flow%d" % (i + 1, )
# configure actions string (if output port is in actions)
if "output" in test_preconditions[i][1]:
action = test_preconditions[i][1].split(",")
for j, k in enumerate(action):
if "output" in action[j]:
action[j] = "output:%d" % (int(
ports[(action[j].split(":")[1], action[j].split(":")[2])][
int(action[j].split(":")[3])]), )
action_string = ",".join(action)
test_preconditions[i][1] = action_string
if "enqueue" in test_preconditions[i][1]:
action = test_preconditions[i][1].split(",")
for j, l in enumerate(action):
if "enqueue" in action[j]:
action[j] = "enqueue:%d:%s" % \
(int(ports[(action[j].split(":")[1], action[j].split(":")[2])][
int(action[j].split(":")[3])]),
action[j].split(":")[4])
action_string = ",".join(action)
test_preconditions[i][1] = action_string
if "in_port" in test_preconditions[i][0]:
qualifiers = test_preconditions[i][0].split(",")
for j, g in enumerate(qualifiers):
if "in_port" in qualifiers[j]:
port_string = qualifiers[j].split("=")[1]
qualifiers[j] = "in_port=%d" % (int(
ports[(port_string.split(":")[0], port_string.split(":")[1])][
int(port_string.split(":")[2])]), )
qualifiers_string = ",".join(qualifiers)
test_preconditions[i][0] = qualifiers_string
add_flow_via_controller(test_preconditions[i][0], test_preconditions[i][1],
controller, name=name, dpid=dpid)
else:
add_flow_via_controller(test_preconditions[i][0], test_preconditions[i][1],
controller, name=name, dpid=dpid)
# Wait until Controller sends flow to Switch"
time.sleep(5)