tests: ospf gr helper topotests with scapy.

Testing ospf gr helper using scapy tool.

Signed-off-by: nguggarigoud <nguggarigoud@vmware.com>
This commit is contained in:
nguggarigoud 2021-01-15 17:32:13 +05:30
parent b3eebd46d9
commit cc90defcb6
4 changed files with 1040 additions and 4 deletions

View File

@ -34,6 +34,7 @@ import socket
import subprocess
import ipaddress
import platform
import pytest
try:
# Imports from python2
@ -1601,7 +1602,7 @@ def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
def write_test_header(tc_name):
""" Display message at beginning of test case"""
"""Display message at beginning of test case"""
count = 20
logger.info("*" * (len(tc_name) + count))
step("START -> Testcase : %s" % tc_name, reset=True)
@ -1609,7 +1610,7 @@ def write_test_header(tc_name):
def write_test_footer(tc_name):
""" Display message at end of test case"""
"""Display message at end of test case"""
count = 21
logger.info("=" * (len(tc_name) + count))
logger.info("Testcase : %s -> PASSED", tc_name)
@ -1892,7 +1893,7 @@ def create_interfaces_cfg(tgen, topo, build=False):
"network",
"priority",
"cost",
"mtu_ignore"
"mtu_ignore",
]
if "ospf" in data:
interface_data += _create_interfaces_ospf_cfg(
@ -4591,3 +4592,64 @@ def verify_ip_nht(tgen, input_dict):
logger.debug("Exiting lib API: verify_ip_nht()")
return False
def scapy_send_raw_packet(
tgen, topo, senderRouter, intf, packet=None, interval=1, count=1
):
"""
Using scapy Raw() method to send BSR raw packet from one FRR
to other
Parameters:
-----------
* `tgen` : Topogen object
* `topo` : json file data
* `senderRouter` : Sender router
* `packet` : packet in raw format
* `interval` : Interval between the packets
* `count` : Number of packets to be sent
returns:
--------
errormsg or True
"""
global CD
result = ""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
sender_interface = intf
rnode = tgen.routers()[senderRouter]
for destLink, data in topo["routers"][senderRouter]["links"].items():
if "type" in data and data["type"] == "loopback":
continue
if not packet:
packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][
"data"
]
if interval > 1 or count > 1:
cmd = (
"nohup /usr/bin/python {}/send_bsr_packet.py '{}' '{}' "
"--interval={} --count={} &".format(
CD, packet, sender_interface, interval, count
)
)
else:
cmd = (
"/usr/bin/python {}/send_bsr_packet.py '{}' '{}' "
"--interval={} --count={}".format(
CD, packet, sender_interface, interval, count
)
)
logger.info("Scapy cmd: \n %s", cmd)
result = rnode.run(cmd)
if result == "":
return result
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True

View File

@ -28,6 +28,7 @@ from time import sleep
from lib.topolog import logger
from lib.topotest import frr_unicode
from ipaddress import IPv6Address
import sys
# Import common_config to use commomnly used APIs
from lib.common_config import (
@ -324,8 +325,47 @@ def __create_ospf_global(
cmd = "no {}".format(cmd)
config_data.append(cmd)
# ospf gr information
gr_data = ospf_data.setdefault("graceful-restart", {})
if gr_data:
if "opaque" in gr_data and gr_data["opaque"]:
cmd = "capability opaque"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "helper-only" in gr_data and not gr_data["helper-only"]:
cmd = "graceful-restart helper-only"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list:
for rtrs in gr_data["helper-only"]:
cmd = "graceful-restart helper-only {}".format(rtrs)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "helper" in gr_data:
if type(gr_data["helper"]) is not list:
gr_data["helper"] = list(gr_data["helper"])
for helper_role in gr_data["helper"]:
cmd = "graceful-restart helper {}".format(helper_role)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "supported-grace-time" in gr_data:
cmd = "graceful-restart helper supported-grace-time {}".format(
gr_data["supported-grace-time"]
)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
result = create_common_configuration(
tgen, router, config_data, ospf, build, load_config
tgen, router, config_data, "ospf", build, load_config
)
except InvalidCLIError:
@ -2375,3 +2415,66 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config
)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=20)
def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
"""
This API is used to vreify gr helper using command
show ip ospf graceful-restart helper
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* 'dut' : router
* 'input_dict' - values to be verified
Usage:
-------
input_dict = {
"helperSupport":"Disabled",
"strictLsaCheck":"Enabled",
"restartSupoort":"Planned and Unplanned Restarts",
"supportedGracePeriod":1800
}
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if 'ospf' not in topo['routers'][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF GR details on router %s:", dut)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json",
isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
raise ValueError (errormsg)
return errormsg
for ospf_gr, gr_data in input_dict.items():
try:
if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr,
show_ospf_json[ospf_gr])
result = True
else:
errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
"is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[
ospf_gr]))
raise ValueError (errormsg)
return errormsg
except KeyError:
errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr))
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result

View File

@ -0,0 +1,119 @@
{
"ipv4base": "10.0.0.0",
"ipv4mask": 24,
"link_ip_start": {
"ipv4": "10.0.0.0",
"v4mask": 24
},
"lo_prefix": {
"ipv4": "1.0.",
"v4mask": 32
},
"switches": {
"s1": {
"links": {
"r0": {
"ipv4": "17.1.1.2/24",
"ospf": {
"area": "0.0.0.0",
"hello_interval": 1,
"dead_interval": 40,
"priority": 98
}
},
"r1": {
"ipv4": "17.1.1.1/24",
"ospf": {
"area": "0.0.0.0",
"hello_interval": 1,
"dead_interval": 40,
"priority": 99
}
},
"r2": {
"ipv4": "17.1.1.3/24",
"ospf": {
"area": "0.0.0.0",
"hello_interval": 1,
"dead_interval": 40,
"priority": 0
}
},
"r3": {
"ipv4": "17.1.1.4/24",
"ospf": {
"area": "0.0.0.0",
"hello_interval": 1,
"dead_interval": 40,
"priority": 0
}
}
}
}
},
"routers": {
"r0": {
"links": {
"lo": {
"ipv4": "auto",
"type": "loopback"
}
},
"ospf": {
"router_id": "100.1.1.0",
"neighbors": {
"r1": {},
"r2": {},
"r3": {}
}
}
},
"r1": {
"links": {
"lo": {
"ipv4": "auto",
"type": "loopback"
}
},
"ospf": {
"router_id": "1.1.1.1",
"neighbors": {
"r0": {},
"r2": {},
"r3": {}
}
},
"opq_lsa_hex": "01005e00000570708bd051ef080045c0005cc18b0000015904f711010101e00000050204004801010101000000001e8d0000000000000000000000000001000102090300000001010101800000013bd1002c000100040000070800020001010000000003000411010101"
},
"r2": {
"links": {
"lo": {
"ipv4": "auto",
"type": "loopback"
}
},
"ospf": {
"router_id": "100.1.1.2",
"neighbors": {
"r1": {},
"r0": {}
}
}
},
"r3": {
"links": {
"lo": {
"ipv4": "auto",
"type": "loopback"
}
},
"ospf": {
"router_id": "100.1.1.3",
"neighbors": {
"r0": {},
"r1": {}
}
}
}
}
}

View File

@ -0,0 +1,752 @@
#!/usr/bin/python
#
# Copyright (c) 2021 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
"""OSPF Basic Functionality Automation."""
import os
import sys
import time
import pytest
import json
from time import sleep
from copy import deepcopy
import ipaddress
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
from mininet.topo import Topo
from lib.topogen import Topogen, get_topogen
# Import topoJson from lib, to create topology and initial configuration
from lib.common_config import (
start_topology,
write_test_header,
write_test_footer,
reset_config_on_routers,
verify_rib,
create_static_routes,
step,
create_route_maps,
shutdown_bringup_interface,
create_interfaces_cfg,
topo_daemons,
scapy_send_raw_packet
)
from lib.topolog import logger
from lib.topojson import build_topo_from_json, build_config_from_json
from lib.ospf import (
verify_ospf_neighbor,
clear_ospf,
verify_ospf_gr_helper,
create_router_ospf,
verify_ospf_interface,
verify_ospf_database,
)
# Global variables
topo = None
Iters = 5
sw_name = None
intf = None
intf1 = None
pkt = None
# Reading the data from JSON File for topology creation
jsonFile = "{}/ospf_gr_helper.json".format(CWD)
try:
with open(jsonFile, "r") as topoJson:
topo = json.load(topoJson)
except IOError:
assert False, "Could not read file {}".format(jsonFile)
"""
Topology:
Please view in a fixed-width font such as Courier.
Topo : Broadcast Networks
DUT - HR RR
+---+ +---+ +---+ +---+
|R0 + +R1 + +R2 + +R3 |
+-+-+ +-+-+ +-+-+ +-+-+
| | | |
| | | |
--+-----------+--------------+---------------+-----
Ethernet Segment
Testcases:
TC1. Verify by default helper support is disabled for FRR ospf
TC2. OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor
sends grace lsa, helps RR to restart gracefully (RR = DR)
TC3. OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor
sends grace lsa, helps RR to restart gracefully (RR = BDR)
TC4. OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor
sends grace lsa, helps RR to restart gracefully (RR = DRother)
TC5. OSPF GR on P2P : Verify DUT enters Helper mode when neighbor sends
grace lsa, helps RR to restart gracefully.
TC6. Verify all the show commands newly introducted as part of ospf
helper support - Json Key verification wrt to show commands.
TC7. Verify helper when grace lsa is received with different configured
value in process level (higher, lower, grace lsa timer above 1800)
TC8. Verify helper functionality when dut is helping RR and new grace lsa
is received from RR.
"""
class CreateTopo(Topo):
"""
Test topology builder.
* `Topo`: Topology object
"""
def build(self, *_args, **_opts):
"""Build function."""
tgen = get_topogen(self)
# Building topology from json file
build_topo_from_json(tgen, topo)
def setup_module(mod):
"""
Sets up the pytest environment
* `mod`: module name
"""
global topo, intf, intf1, sw_name, pkt
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# This function initiates the topology build with Topogen...
tgen = Topogen(CreateTopo, mod.__name__)
# ... and here it calls Mininet initialization functions.
# get list of daemons needs to be started for this suite.
daemons = topo_daemons(tgen, topo)
# Starting topology, create tmp files which are loaded to routers
# to start deamons and then start routers
start_topology(tgen, daemons)
# Creating configuration from JSON
build_config_from_json(tgen, topo)
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format(
ospf_covergence
)
sw_name = topo["switches"].keys()[0]
intf = topo["routers"]["r0"]["links"][sw_name]["interface"]
intf1 = topo["routers"]["r1"]["links"][sw_name]["interface"]
pkt = topo["routers"]["r1"]["opq_lsa_hex"]
logger.info("Running setup_module() done")
def teardown_module():
"""Teardown the pytest environment"""
logger.info("Running teardown_module to delete topology")
tgen = get_topogen()
try:
# Stop toplogy and Remove tmp files
tgen.stop_topology
except OSError:
# OSError exception is raised when mininet tries to stop switch
# though switch is stopped once but mininet tries to stop same
# switch again, where it ended up with exception
pass
def delete_ospf():
"""delete ospf process after each test"""
tgen = get_topogen()
step("Delete ospf process")
for rtr in topo["routers"]:
ospf_del = {rtr: {"ospf": {"delete": True}}}
result = create_router_ospf(tgen, topo, ospf_del)
assert result is True, "Testcase: Failed \n Error: {}".format(result)
# ##################################
# Test cases start here.
# ##################################
def test_ospf_gr_helper_tc1_p0(request):
"""Verify by default helper support is disabled for FRR ospf"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
global topo, intf, intf1, pkt
step("Bring up the base config as per the topology")
reset_config_on_routers(tgen)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert (
ospf_covergence is True
), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence)
step("Verify that GR helper route is disabled by default to the in" "the DUT.")
input_dict = {
"helperSupport": "Disabled",
"strictLsaCheck": "Enabled",
"restartSupoort": "Planned and Unplanned Restarts",
"supportedGracePeriod": 1800,
}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that DUT does not enter helper mode upon receiving the " "grace lsa.")
# send grace lsa
scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
input_dict = {"activeRestarterCnt": 1}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict, expected=False)
assert (
result is not True
), "Testcase {} : Failed. DUT entered helper role " " \n Error: {}".format(
tc_name, result
)
step("Configure graceful restart in the DUT")
ospf_gr_r0 = {
"r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that GR helper route is enabled in the DUT.")
input_dict = {
"helperSupport": "Enabled",
"strictLsaCheck": "Enabled",
"restartSupoort": "Planned and Unplanned Restarts",
"supportedGracePeriod": 1800,
}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r1 = {
"r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Perform GR in RR.")
step("Verify that DUT does enter helper mode upon receiving" " the grace lsa.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Unconfigure the GR helper command.")
ospf_gr_r0 = {
"r0": {
"ospf": {
"graceful-restart": {"helper-only": [], "opaque": True, "delete": True}
}
}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
input_dict = {"helperSupport": "Disabled"}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Configure gr helper using the router id")
ospf_gr_r0 = {
"r0": {
"ospf": {"graceful-restart": {"helper-only": ["1.1.1.1"], "opaque": True}}
}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that DUT does enter helper mode upon receiving" " the grace lsa.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Un Configure gr helper using the router id")
ospf_gr_r0 = {
"r0": {
"ospf": {
"graceful-restart": {
"helper-only": ["1.1.1.1"],
"opaque": True,
"delete": True,
}
}
}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that GR helper router is disabled in the DUT for" " router id x.x.x.x")
input_dict = {"enabledRouterIds": [{"routerId": "1.1.1.1"}]}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict, expected=False)
assert (
result is not True
), "Testcase {} : Failed, Helper role enabled for RR\n Error: {}".format(
tc_name, result
)
delete_ospf()
write_test_footer(tc_name)
def test_ospf_gr_helper_tc2_p0(request):
"""
OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor
sends grace lsa, helps RR to restart gracefully (RR = DR)
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
global topo, intf, intf1, pkt
step("Bring up the base config as per the topology")
step(
"Configure DR priority as 99 in RR , DUT dr priority = 98 "
"& reset ospf process in all the routers"
)
reset_config_on_routers(tgen)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert (
ospf_covergence is True
), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence)
ospf_gr_r0 = {
"r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r1 = {
"r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that DUT enters into helper mode.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
delete_ospf()
write_test_footer(tc_name)
def test_ospf_gr_helper_tc3_p1(request):
"""
OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor
sends grace lsa, helps RR to restart gracefully (RR = BDR)
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
global topo, intf, intf1, pkt
step("Bring up the base config as per the topology")
step(
"Configure DR priority as 99 in RR , DUT dr priority = 98 "
"& reset ospf process in all the routers"
)
reset_config_on_routers(tgen)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert (
ospf_covergence is True
), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence)
step(
"Configure DR pririty 100 on R0 and clear ospf neighbors " "on all the routers."
)
input_dict = {
"r0": {
"links": {
sw_name: {
"interface": topo["routers"]["r0"]["links"][sw_name]["interface"],
"ospf": {"priority": 100},
}
}
}
}
result = create_interfaces_cfg(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
step("Clear ospf neighbours in all routers")
for rtr in topo["routers"]:
clear_ospf(tgen, rtr)
step("Verify that DR election is triggered and R0 is elected as DR")
input_dict = {
"r0": {
"ospf": {
"neighbors": {
"r1": {"state": "Full", "role": "Backup"},
"r2": {"state": "Full", "role": "DROther"},
"r3": {"state": "Full", "role": "DROther"},
}
}
}
}
dut = "r0"
result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r0 = {
"r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r1 = {
"r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that DUT enters into helper mode.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
delete_ospf()
write_test_footer(tc_name)
def test_ospf_gr_helper_tc4_p1(request):
"""
OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor
sends grace lsa, helps RR to restart gracefully (RR = DRother)
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
global topo, intf, intf1, pkt
step("Bring up the base config as per the topology")
step(
"Configure DR priority as 99 in RR , DUT dr priority = 98 "
"& reset ospf process in all the routers"
)
reset_config_on_routers(tgen)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert (
ospf_covergence is True
), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence)
step(
"Configure DR pririty 100 on R0 and clear ospf neighbors " "on all the routers."
)
input_dict = {
"r0": {
"links": {
sw_name: {
"interface": topo["routers"]["r0"]["links"][sw_name]["interface"],
"ospf": {"priority": 0},
}
}
}
}
result = create_interfaces_cfg(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
step("Clear ospf neighbours in all routers")
for rtr in topo["routers"]:
clear_ospf(tgen, rtr)
step("Verify that DR election is triggered and R0 is elected as 2-Way")
input_dict = {
"r0": {
"ospf": {
"neighbors": {
"r1": {"state": "Full", "role": "DR"},
"r2": {"state": "2-Way", "role": "DROther"},
"r3": {"state": "2-Way", "role": "DROther"},
}
}
}
}
dut = "r0"
result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r0 = {
"r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r1 = {
"r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that DUT enters into helper mode.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
delete_ospf()
write_test_footer(tc_name)
def test_ospf_gr_helper_tc7_p1(request):
"""
Test ospf gr helper
Verify helper when grace lsa is received with different configured
value in process level (higher, lower, grace lsa timer above 1800)
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
global topo, intf, intf1, pkt
step("Bring up the base config as per the topology")
step(
"Configure DR priority as 99 in RR , DUT dr priority = 98 "
"& reset ospf process in all the routers"
)
step(
"Enable GR on RR and DUT with grace period on RR = 333"
"and grace period on DUT = 300"
)
reset_config_on_routers(tgen)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert (
ospf_covergence is True
), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence)
ospf_gr_r0 = {
"r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r1 = {
"r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
input_dict = {"supportedGracePeriod": 1800}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Configure grace period = 1801 on RR and restart ospf .")
grace_period_1801 = "01005e00000570708bd051ef080045c0005cbeb10000015907d111010101e00000050204004801010101000000009714000000000000000000000000000100010209030000000101010180000001c8e9002c000100040000016800020001010000000003000411010101"
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, grace_period_1801)
step("Verify R0 does not enter helper mode.")
input_dict = {"activeRestarterCnt": 1}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict, expected=False)
assert (
result is not True
), "Testcase {} : Failed. DUT entered helper role " " \n Error: {}".format(
tc_name, result
)
delete_ospf()
write_test_footer(tc_name)
def test_ospf_gr_helper_tc8_p1(request):
"""
Test ospf gr helper
Verify helper functionality when dut is helping RR and new grace lsa
is received from RR.
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
global topo, intf, intf1, pkt
step("Bring up the base config as per the topology")
step("Enable GR")
reset_config_on_routers(tgen)
ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True)
assert (
ospf_covergence is True
), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence)
ospf_gr_r0 = {
"r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r0)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
ospf_gr_r1 = {
"r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}}
}
result = create_router_ospf(tgen, topo, ospf_gr_r1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
input_dict = {"supportedGracePeriod": 1800}
dut = "r0"
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Verify that DUT enters into helper mode.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
step("Send the Grace LSA again to DUT when RR is in GR.")
input_dict = {"activeRestarterCnt": 1}
gracelsa_sent = False
repeat = 0
dut = "r0"
while not gracelsa_sent and repeat < Iters:
gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt)
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
if isinstance(result, str):
repeat += 1
gracelsa_sent = False
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
delete_ospf()
write_test_footer(tc_name)
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))