diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 3754ad999e..2abab6b255 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -34,6 +34,7 @@ import socket import subprocess import ipaddress import platform +import pytest try: # Imports from python2 @@ -1601,7 +1602,7 @@ def find_interface_with_greater_ip(topo, router, loopback=True, interface=True): def write_test_header(tc_name): - """ Display message at beginning of test case""" + """Display message at beginning of test case""" count = 20 logger.info("*" * (len(tc_name) + count)) step("START -> Testcase : %s" % tc_name, reset=True) @@ -1609,7 +1610,7 @@ def write_test_header(tc_name): def write_test_footer(tc_name): - """ Display message at end of test case""" + """Display message at end of test case""" count = 21 logger.info("=" * (len(tc_name) + count)) logger.info("Testcase : %s -> PASSED", tc_name) @@ -1892,7 +1893,7 @@ def create_interfaces_cfg(tgen, topo, build=False): "network", "priority", "cost", - "mtu_ignore" + "mtu_ignore", ] if "ospf" in data: interface_data += _create_interfaces_ospf_cfg( @@ -4591,3 +4592,64 @@ def verify_ip_nht(tgen, input_dict): logger.debug("Exiting lib API: verify_ip_nht()") return False + + +def scapy_send_raw_packet( + tgen, topo, senderRouter, intf, packet=None, interval=1, count=1 +): + """ + Using scapy Raw() method to send BSR raw packet from one FRR + to other + + Parameters: + ----------- + * `tgen` : Topogen object + * `topo` : json file data + * `senderRouter` : Sender router + * `packet` : packet in raw format + * `interval` : Interval between the packets + * `count` : Number of packets to be sent + + returns: + -------- + errormsg or True + """ + + global CD + result = "" + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + sender_interface = intf + rnode = tgen.routers()[senderRouter] + + for destLink, data in topo["routers"][senderRouter]["links"].items(): + if "type" in data and data["type"] == "loopback": + continue + + if not packet: + packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][ + "data" + ] + + if interval > 1 or count > 1: + cmd = ( + "nohup /usr/bin/python {}/send_bsr_packet.py '{}' '{}' " + "--interval={} --count={} &".format( + CD, packet, sender_interface, interval, count + ) + ) + else: + cmd = ( + "/usr/bin/python {}/send_bsr_packet.py '{}' '{}' " + "--interval={} --count={}".format( + CD, packet, sender_interface, interval, count + ) + ) + + logger.info("Scapy cmd: \n %s", cmd) + result = rnode.run(cmd) + + if result == "": + return result + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return True diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 40da7c8fbe..9646daf725 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -28,6 +28,7 @@ from time import sleep from lib.topolog import logger from lib.topotest import frr_unicode from ipaddress import IPv6Address +import sys # Import common_config to use commomnly used APIs from lib.common_config import ( @@ -324,8 +325,47 @@ def __create_ospf_global( cmd = "no {}".format(cmd) config_data.append(cmd) + # ospf gr information + gr_data = ospf_data.setdefault("graceful-restart", {}) + if gr_data: + + if "opaque" in gr_data and gr_data["opaque"]: + cmd = "capability opaque" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "helper-only" in gr_data and not gr_data["helper-only"]: + cmd = "graceful-restart helper-only" + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + elif "helper-only" in gr_data and type(gr_data["helper-only"]) is list: + for rtrs in gr_data["helper-only"]: + cmd = "graceful-restart helper-only {}".format(rtrs) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "helper" in gr_data: + if type(gr_data["helper"]) is not list: + gr_data["helper"] = list(gr_data["helper"]) + for helper_role in gr_data["helper"]: + cmd = "graceful-restart helper {}".format(helper_role) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + + if "supported-grace-time" in gr_data: + cmd = "graceful-restart helper supported-grace-time {}".format( + gr_data["supported-grace-time"] + ) + if gr_data.setdefault("delete", False): + cmd = "no {}".format(cmd) + config_data.append(cmd) + result = create_common_configuration( - tgen, router, config_data, ospf, build, load_config + tgen, router, config_data, "ospf", build, load_config ) except InvalidCLIError: @@ -2375,3 +2415,66 @@ def config_ospf6_interface(tgen, topo, input_dict=None, build=False, load_config ) logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) return result + +@retry(retry_timeout=20) +def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None): + """ + This API is used to vreify gr helper using command + show ip ospf graceful-restart helper + + Parameters + ---------- + * `tgen` : Topogen object + * `topo` : topology descriptions + * 'dut' : router + * 'input_dict' - values to be verified + + Usage: + ------- + input_dict = { + "helperSupport":"Disabled", + "strictLsaCheck":"Enabled", + "restartSupoort":"Planned and Unplanned Restarts", + "supportedGracePeriod":1800 + } + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + + """ + logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) + result = False + + if 'ospf' not in topo['routers'][dut]: + errormsg = "[DUT: {}] OSPF is not configured on the router.".format( + dut) + return errormsg + + rnode = tgen.routers()[dut] + logger.info("Verifying OSPF GR details on router %s:", dut) + show_ospf_json = run_frr_cmd(rnode, "show ip ospf graceful-restart helper json", + isjson=True) + + # Verifying output dictionary show_ospf_json is empty or not + if not bool(show_ospf_json): + errormsg = "OSPF is not running" + raise ValueError (errormsg) + return errormsg + + for ospf_gr, gr_data in input_dict.items(): + try: + if input_dict[ospf_gr] == show_ospf_json[ospf_gr]: + logger.info("[DUT: FRR] OSPF GR Helper: %s is %s", ospf_gr, + show_ospf_json[ospf_gr]) + result = True + else: + errormsg = ("[DUT: FRR] OSPF GR Helper: {} expected is {}, Found " + "is {}".format(ospf_gr, input_dict[ospf_gr], show_ospf_json[ + ospf_gr])) + raise ValueError (errormsg) + return errormsg + + except KeyError: + errormsg = ("[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)) + return errormsg + + logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) + return result diff --git a/tests/topotests/ospf_gr_helper/ospf_gr_helper.json b/tests/topotests/ospf_gr_helper/ospf_gr_helper.json new file mode 100644 index 0000000000..efd339ef88 --- /dev/null +++ b/tests/topotests/ospf_gr_helper/ospf_gr_helper.json @@ -0,0 +1,119 @@ +{ + "ipv4base": "10.0.0.0", + "ipv4mask": 24, + "link_ip_start": { + "ipv4": "10.0.0.0", + "v4mask": 24 + }, + "lo_prefix": { + "ipv4": "1.0.", + "v4mask": 32 + }, + "switches": { + "s1": { + "links": { + "r0": { + "ipv4": "17.1.1.2/24", + "ospf": { + "area": "0.0.0.0", + "hello_interval": 1, + "dead_interval": 40, + "priority": 98 + } + }, + "r1": { + "ipv4": "17.1.1.1/24", + "ospf": { + "area": "0.0.0.0", + "hello_interval": 1, + "dead_interval": 40, + "priority": 99 + } + }, + "r2": { + "ipv4": "17.1.1.3/24", + "ospf": { + "area": "0.0.0.0", + "hello_interval": 1, + "dead_interval": 40, + "priority": 0 + } + }, + "r3": { + "ipv4": "17.1.1.4/24", + "ospf": { + "area": "0.0.0.0", + "hello_interval": 1, + "dead_interval": 40, + "priority": 0 + } + } + } + } + }, + "routers": { + "r0": { + "links": { + "lo": { + "ipv4": "auto", + "type": "loopback" + } + }, + "ospf": { + "router_id": "100.1.1.0", + "neighbors": { + "r1": {}, + "r2": {}, + "r3": {} + } + } + }, + "r1": { + "links": { + "lo": { + "ipv4": "auto", + "type": "loopback" + } + }, + "ospf": { + "router_id": "1.1.1.1", + "neighbors": { + "r0": {}, + "r2": {}, + "r3": {} + } + }, + "opq_lsa_hex": "01005e00000570708bd051ef080045c0005cc18b0000015904f711010101e00000050204004801010101000000001e8d0000000000000000000000000001000102090300000001010101800000013bd1002c000100040000070800020001010000000003000411010101" + }, + "r2": { + "links": { + "lo": { + "ipv4": "auto", + "type": "loopback" + } + }, + "ospf": { + "router_id": "100.1.1.2", + "neighbors": { + "r1": {}, + "r0": {} + } + } + }, + "r3": { + "links": { + "lo": { + "ipv4": "auto", + "type": "loopback" + } + }, + "ospf": { + "router_id": "100.1.1.3", + "neighbors": { + "r0": {}, + "r1": {} + } + } + } + } +} \ No newline at end of file diff --git a/tests/topotests/ospf_gr_helper/test_ospf_gr_helper.py b/tests/topotests/ospf_gr_helper/test_ospf_gr_helper.py new file mode 100644 index 0000000000..5363822134 --- /dev/null +++ b/tests/topotests/ospf_gr_helper/test_ospf_gr_helper.py @@ -0,0 +1,752 @@ +#!/usr/bin/python + +# +# Copyright (c) 2021 by VMware, Inc. ("VMware") +# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. +# ("NetDEF") in this file. +# +# Permission to use, copy, modify, and/or distribute this software +# for any purpose with or without fee is hereby granted, provided +# that the above copyright notice and this permission notice appear +# in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# + + +"""OSPF Basic Functionality Automation.""" +import os +import sys +import time +import pytest +import json +from time import sleep +from copy import deepcopy +import ipaddress + +# Save the Current Working Directory to find configuration files. +CWD = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(CWD, "../")) + +# pylint: disable=C0413 +# Import topogen and topotest helpers +from mininet.topo import Topo +from lib.topogen import Topogen, get_topogen + +# Import topoJson from lib, to create topology and initial configuration +from lib.common_config import ( + start_topology, + write_test_header, + write_test_footer, + reset_config_on_routers, + verify_rib, + create_static_routes, + step, + create_route_maps, + shutdown_bringup_interface, + create_interfaces_cfg, + topo_daemons, + scapy_send_raw_packet +) + +from lib.topolog import logger +from lib.topojson import build_topo_from_json, build_config_from_json + +from lib.ospf import ( + verify_ospf_neighbor, + clear_ospf, + verify_ospf_gr_helper, + create_router_ospf, + verify_ospf_interface, + verify_ospf_database, +) + +# Global variables +topo = None +Iters = 5 +sw_name = None +intf = None +intf1 = None +pkt = None + +# Reading the data from JSON File for topology creation +jsonFile = "{}/ospf_gr_helper.json".format(CWD) +try: + with open(jsonFile, "r") as topoJson: + topo = json.load(topoJson) +except IOError: + assert False, "Could not read file {}".format(jsonFile) + +""" +Topology: + + Please view in a fixed-width font such as Courier. + Topo : Broadcast Networks + DUT - HR RR + +---+ +---+ +---+ +---+ + |R0 + +R1 + +R2 + +R3 | + +-+-+ +-+-+ +-+-+ +-+-+ + | | | | + | | | | + --+-----------+--------------+---------------+----- + Ethernet Segment + +Testcases: + +TC1. Verify by default helper support is disabled for FRR ospf +TC2. OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor + sends grace lsa, helps RR to restart gracefully (RR = DR) +TC3. OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor + sends grace lsa, helps RR to restart gracefully (RR = BDR) +TC4. OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor + sends grace lsa, helps RR to restart gracefully (RR = DRother) +TC5. OSPF GR on P2P : Verify DUT enters Helper mode when neighbor sends + grace lsa, helps RR to restart gracefully. +TC6. Verify all the show commands newly introducted as part of ospf + helper support - Json Key verification wrt to show commands. +TC7. Verify helper when grace lsa is received with different configured + value in process level (higher, lower, grace lsa timer above 1800) +TC8. Verify helper functionality when dut is helping RR and new grace lsa + is received from RR. +""" + + +class CreateTopo(Topo): + """ + Test topology builder. + + * `Topo`: Topology object + """ + + def build(self, *_args, **_opts): + """Build function.""" + tgen = get_topogen(self) + + # Building topology from json file + build_topo_from_json(tgen, topo) + + +def setup_module(mod): + """ + Sets up the pytest environment + + * `mod`: module name + """ + global topo, intf, intf1, sw_name, pkt + testsuite_run_time = time.asctime(time.localtime(time.time())) + logger.info("Testsuite start time: {}".format(testsuite_run_time)) + logger.info("=" * 40) + + logger.info("Running setup_module to create topology") + + # This function initiates the topology build with Topogen... + tgen = Topogen(CreateTopo, mod.__name__) + # ... and here it calls Mininet initialization functions. + + # get list of daemons needs to be started for this suite. + daemons = topo_daemons(tgen, topo) + + # Starting topology, create tmp files which are loaded to routers + # to start deamons and then start routers + start_topology(tgen, daemons) + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( + ospf_covergence + ) + + sw_name = topo["switches"].keys()[0] + intf = topo["routers"]["r0"]["links"][sw_name]["interface"] + intf1 = topo["routers"]["r1"]["links"][sw_name]["interface"] + pkt = topo["routers"]["r1"]["opq_lsa_hex"] + + logger.info("Running setup_module() done") + + +def teardown_module(): + """Teardown the pytest environment""" + + logger.info("Running teardown_module to delete topology") + + tgen = get_topogen() + + try: + # Stop toplogy and Remove tmp files + tgen.stop_topology + + except OSError: + # OSError exception is raised when mininet tries to stop switch + # though switch is stopped once but mininet tries to stop same + # switch again, where it ended up with exception + pass + + +def delete_ospf(): + """delete ospf process after each test""" + tgen = get_topogen() + step("Delete ospf process") + for rtr in topo["routers"]: + ospf_del = {rtr: {"ospf": {"delete": True}}} + result = create_router_ospf(tgen, topo, ospf_del) + assert result is True, "Testcase: Failed \n Error: {}".format(result) + + +# ################################## +# Test cases start here. +# ################################## + + +def test_ospf_gr_helper_tc1_p0(request): + """Verify by default helper support is disabled for FRR ospf""" + + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + global topo, intf, intf1, pkt + + step("Bring up the base config as per the topology") + reset_config_on_routers(tgen) + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ( + ospf_covergence is True + ), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence) + + step("Verify that GR helper route is disabled by default to the in" "the DUT.") + input_dict = { + "helperSupport": "Disabled", + "strictLsaCheck": "Enabled", + "restartSupoort": "Planned and Unplanned Restarts", + "supportedGracePeriod": 1800, + } + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that DUT does not enter helper mode upon receiving the " "grace lsa.") + + # send grace lsa + scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + + input_dict = {"activeRestarterCnt": 1} + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict, expected=False) + assert ( + result is not True + ), "Testcase {} : Failed. DUT entered helper role " " \n Error: {}".format( + tc_name, result + ) + + step("Configure graceful restart in the DUT") + ospf_gr_r0 = { + "r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that GR helper route is enabled in the DUT.") + input_dict = { + "helperSupport": "Enabled", + "strictLsaCheck": "Enabled", + "restartSupoort": "Planned and Unplanned Restarts", + "supportedGracePeriod": 1800, + } + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r1 = { + "r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Perform GR in RR.") + step("Verify that DUT does enter helper mode upon receiving" " the grace lsa.") + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Unconfigure the GR helper command.") + ospf_gr_r0 = { + "r0": { + "ospf": { + "graceful-restart": {"helper-only": [], "opaque": True, "delete": True} + } + } + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + input_dict = {"helperSupport": "Disabled"} + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Configure gr helper using the router id") + ospf_gr_r0 = { + "r0": { + "ospf": {"graceful-restart": {"helper-only": ["1.1.1.1"], "opaque": True}} + } + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that DUT does enter helper mode upon receiving" " the grace lsa.") + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Un Configure gr helper using the router id") + ospf_gr_r0 = { + "r0": { + "ospf": { + "graceful-restart": { + "helper-only": ["1.1.1.1"], + "opaque": True, + "delete": True, + } + } + } + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that GR helper router is disabled in the DUT for" " router id x.x.x.x") + input_dict = {"enabledRouterIds": [{"routerId": "1.1.1.1"}]} + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict, expected=False) + assert ( + result is not True + ), "Testcase {} : Failed, Helper role enabled for RR\n Error: {}".format( + tc_name, result + ) + delete_ospf() + write_test_footer(tc_name) + + +def test_ospf_gr_helper_tc2_p0(request): + """ + OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor + sends grace lsa, helps RR to restart gracefully (RR = DR) + """ + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + global topo, intf, intf1, pkt + + step("Bring up the base config as per the topology") + step( + "Configure DR priority as 99 in RR , DUT dr priority = 98 " + "& reset ospf process in all the routers" + ) + reset_config_on_routers(tgen) + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ( + ospf_covergence is True + ), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence) + ospf_gr_r0 = { + "r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r1 = { + "r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that DUT enters into helper mode.") + + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + delete_ospf() + write_test_footer(tc_name) + + +def test_ospf_gr_helper_tc3_p1(request): + """ + OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor + sends grace lsa, helps RR to restart gracefully (RR = BDR) + """ + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + global topo, intf, intf1, pkt + + step("Bring up the base config as per the topology") + step( + "Configure DR priority as 99 in RR , DUT dr priority = 98 " + "& reset ospf process in all the routers" + ) + reset_config_on_routers(tgen) + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ( + ospf_covergence is True + ), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence) + step( + "Configure DR pririty 100 on R0 and clear ospf neighbors " "on all the routers." + ) + + input_dict = { + "r0": { + "links": { + sw_name: { + "interface": topo["routers"]["r0"]["links"][sw_name]["interface"], + "ospf": {"priority": 100}, + } + } + } + } + + result = create_interfaces_cfg(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Clear ospf neighbours in all routers") + for rtr in topo["routers"]: + clear_ospf(tgen, rtr) + + step("Verify that DR election is triggered and R0 is elected as DR") + input_dict = { + "r0": { + "ospf": { + "neighbors": { + "r1": {"state": "Full", "role": "Backup"}, + "r2": {"state": "Full", "role": "DROther"}, + "r3": {"state": "Full", "role": "DROther"}, + } + } + } + } + dut = "r0" + result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r0 = { + "r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r1 = { + "r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that DUT enters into helper mode.") + + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + delete_ospf() + write_test_footer(tc_name) + + +def test_ospf_gr_helper_tc4_p1(request): + """ + OSPF GR on Broadcast : Verify DUT enters Helper mode when neighbor + sends grace lsa, helps RR to restart gracefully (RR = DRother) + """ + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + global topo, intf, intf1, pkt + + step("Bring up the base config as per the topology") + step( + "Configure DR priority as 99 in RR , DUT dr priority = 98 " + "& reset ospf process in all the routers" + ) + reset_config_on_routers(tgen) + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ( + ospf_covergence is True + ), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence) + step( + "Configure DR pririty 100 on R0 and clear ospf neighbors " "on all the routers." + ) + + input_dict = { + "r0": { + "links": { + sw_name: { + "interface": topo["routers"]["r0"]["links"][sw_name]["interface"], + "ospf": {"priority": 0}, + } + } + } + } + + result = create_interfaces_cfg(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result) + + step("Clear ospf neighbours in all routers") + for rtr in topo["routers"]: + clear_ospf(tgen, rtr) + + step("Verify that DR election is triggered and R0 is elected as 2-Way") + input_dict = { + "r0": { + "ospf": { + "neighbors": { + "r1": {"state": "Full", "role": "DR"}, + "r2": {"state": "2-Way", "role": "DROther"}, + "r3": {"state": "2-Way", "role": "DROther"}, + } + } + } + } + dut = "r0" + result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r0 = { + "r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r1 = { + "r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that DUT enters into helper mode.") + + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + delete_ospf() + + write_test_footer(tc_name) + + +def test_ospf_gr_helper_tc7_p1(request): + """ + Test ospf gr helper + Verify helper when grace lsa is received with different configured + value in process level (higher, lower, grace lsa timer above 1800) + """ + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + global topo, intf, intf1, pkt + + step("Bring up the base config as per the topology") + step( + "Configure DR priority as 99 in RR , DUT dr priority = 98 " + "& reset ospf process in all the routers" + ) + step( + "Enable GR on RR and DUT with grace period on RR = 333" + "and grace period on DUT = 300" + ) + reset_config_on_routers(tgen) + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ( + ospf_covergence is True + ), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence) + ospf_gr_r0 = { + "r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r1 = { + "r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + input_dict = {"supportedGracePeriod": 1800} + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Configure grace period = 1801 on RR and restart ospf .") + grace_period_1801 = "01005e00000570708bd051ef080045c0005cbeb10000015907d111010101e00000050204004801010101000000009714000000000000000000000000000100010209030000000101010180000001c8e9002c000100040000016800020001010000000003000411010101" + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, grace_period_1801) + + step("Verify R0 does not enter helper mode.") + input_dict = {"activeRestarterCnt": 1} + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict, expected=False) + assert ( + result is not True + ), "Testcase {} : Failed. DUT entered helper role " " \n Error: {}".format( + tc_name, result + ) + + delete_ospf() + + write_test_footer(tc_name) + + +def test_ospf_gr_helper_tc8_p1(request): + """ + Test ospf gr helper + + Verify helper functionality when dut is helping RR and new grace lsa + is received from RR. + """ + tc_name = request.node.name + write_test_header(tc_name) + tgen = get_topogen() + + # Don't run this test if we have any failure. + if tgen.routers_have_failure(): + pytest.skip(tgen.errors) + + global topo, intf, intf1, pkt + + step("Bring up the base config as per the topology") + step("Enable GR") + reset_config_on_routers(tgen) + ospf_covergence = verify_ospf_neighbor(tgen, topo, lan=True) + assert ( + ospf_covergence is True + ), "OSPF is not after reset config \n Error:" " {}".format(ospf_covergence) + ospf_gr_r0 = { + "r0": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r0) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + ospf_gr_r1 = { + "r1": {"ospf": {"graceful-restart": {"helper-only": [], "opaque": True}}} + } + result = create_router_ospf(tgen, topo, ospf_gr_r1) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + input_dict = {"supportedGracePeriod": 1800} + dut = "r0" + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Verify that DUT enters into helper mode.") + + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + step("Send the Grace LSA again to DUT when RR is in GR.") + input_dict = {"activeRestarterCnt": 1} + gracelsa_sent = False + repeat = 0 + dut = "r0" + while not gracelsa_sent and repeat < Iters: + gracelsa_sent = scapy_send_raw_packet(tgen, topo, "r1", intf1, pkt) + result = verify_ospf_gr_helper(tgen, topo, dut, input_dict) + if isinstance(result, str): + repeat += 1 + gracelsa_sent = False + + assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) + + delete_ospf() + + write_test_footer(tc_name) + + +if __name__ == "__main__": + args = ["-s"] + sys.argv[1:] + sys.exit(pytest.main(args))