tests: More black fixups

Just another round of fixups found by running black on the code

Signed-off-by: Donald Sharp <sharpd@nvidia.com>
This commit is contained in:
Donald Sharp 2021-04-08 13:04:26 -04:00
parent 98ca91e181
commit 0b25370e95
35 changed files with 1077 additions and 871 deletions

View File

@ -346,9 +346,9 @@ def test_converge_protocols():
print("Show that v4 routes are right\n")
v4_routesFile = "%s/r%s/ipv4_routes.ref" % (thisDir, i)
expected = net["r%s" % i].cmd(
"sort {} 2> /dev/null".format(v4_routesFile)
).rstrip()
expected = (
net["r%s" % i].cmd("sort {} 2> /dev/null".format(v4_routesFile)).rstrip()
)
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (
@ -379,9 +379,9 @@ def test_converge_protocols():
print("Show that v6 routes are right\n")
v6_routesFile = "%s/r%s/ipv6_routes.ref" % (thisDir, i)
expected = net["r%s" % i].cmd(
"sort {} 2> /dev/null".format(v6_routesFile)
).rstrip()
expected = (
net["r%s" % i].cmd("sort {} 2> /dev/null".format(v6_routesFile)).rstrip()
)
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (

View File

@ -505,8 +505,10 @@ def test_r1_mplsvpn_VrfTable():
associated_int = r1_snmp.get(
"mplsL3VpnVrfAssociatedInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
)
assertmsg = "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
associated_int
assertmsg = (
"mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
associated_int
)
)
assert associated_int == "3", assertmsg

View File

@ -137,7 +137,7 @@ from lib.common_config import (
kill_mininet_routers_process,
get_frr_ipv6_linklocal,
create_route_maps,
required_linux_kernel_version
required_linux_kernel_version,
)
# Reading the data from JSON File for topology and configuration creation
@ -1329,20 +1329,22 @@ def test_BGP_GR_TC_4_p0(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
@ -1802,10 +1804,11 @@ def test_BGP_GR_TC_6_1_2_p1(request):
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r2: R-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r2: R-bit is set to True\n Error: {}".format(
tc_name, result
))
)
logger.info("Restart BGPd on R2 ")
kill_router_daemons(tgen, "r2", ["bgpd"])
@ -1823,10 +1826,11 @@ def test_BGP_GR_TC_6_1_2_p1(request):
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r2: R-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r2: R-bit is set to True\n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -2108,20 +2112,22 @@ def test_BGP_GR_TC_17_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
@ -2140,10 +2146,11 @@ def test_BGP_GR_TC_17_p1(request):
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: R-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: R-bit is set to True\n Error: {}".format(
tc_name, result
))
)
# Verifying BGP RIB routes
next_hop = next_hop_per_address_family(
@ -2469,20 +2476,22 @@ def test_BGP_GR_TC_20_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
@ -2755,10 +2764,10 @@ def test_BGP_GR_TC_31_1_p1(request):
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info("[Phase 4] : R1 is about to come up now ")
start_router_daemons(tgen, "r1", ["bgpd"])
@ -3237,10 +3246,12 @@ def test_BGP_GR_TC_9_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
@ -3248,10 +3259,10 @@ def test_BGP_GR_TC_9_p1(request):
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
@ -3281,10 +3292,11 @@ def test_BGP_GR_TC_9_p1(request):
result = verify_f_bit(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: F-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: F-bit is set to True\n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -3416,10 +3428,12 @@ def test_BGP_GR_TC_17_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
@ -3427,10 +3441,10 @@ def test_BGP_GR_TC_17_p1(request):
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Phase 5] : R2 is about to come up now ")
@ -3452,10 +3466,11 @@ def test_BGP_GR_TC_17_p1(request):
result = verify_r_bit(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: R-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: R-bit is set to True\n Error: {}".format(
tc_name, result
))
)
# Verifying BGP RIB routes
next_hop = next_hop_per_address_family(
@ -3675,10 +3690,12 @@ def test_BGP_GR_TC_43_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
protocol = "bgp"
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
@ -3983,10 +4000,12 @@ def test_BGP_GR_TC_44_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
@ -5011,10 +5030,10 @@ def test_BGP_GR_TC_48_p1(request):
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
dut = "r2"
peer = "r1"
@ -5025,17 +5044,19 @@ def test_BGP_GR_TC_48_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r2: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r2: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
step("Bring up BGP on R1 and remove Peer-level GR config from R1")
@ -5394,17 +5415,19 @@ def BGP_GR_TC_52_p1(request):
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop, protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
step("Bring up BGP on R2 and remove Peer-level GR config from R1")

View File

@ -136,7 +136,7 @@ from lib.common_config import (
kill_mininet_routers_process,
get_frr_ipv6_linklocal,
create_route_maps,
required_linux_kernel_version
required_linux_kernel_version,
)
# Reading the data from JSON File for topology and configuration creation
@ -557,10 +557,11 @@ def test_BGP_GR_TC_3_p0(request):
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r2", peer="r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r2: EOR is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r2: EOR is set to True\n Error: {}".format(
tc_name, result
))
)
logger.info(
"Waiting for selection deferral timer({} sec)..".format(GR_SELECT_DEFER_TIMER)
@ -703,10 +704,11 @@ def test_BGP_GR_TC_11_p0(request):
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r1", peer="r3", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: EOR is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format(
tc_name, result
))
)
logger.info(
"Waiting for selection deferral timer({} sec).. ".format(
@ -733,10 +735,11 @@ def test_BGP_GR_TC_11_p0(request):
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r3", peer="r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r3: EOR is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r3: EOR is set to True\n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -1470,35 +1473,39 @@ def test_BGP_GR_18_p1(request):
dut = "r6"
input_dict_1 = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r6: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r6: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r6: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes
dut = "r2"
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r6: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r6: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
@ -1959,18 +1966,20 @@ def test_BGP_GR_chaos_29_p1(request):
# Verifying BGP RIB routes before shutting down BGPd daemon
input_dict = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 4] : Start BGPd daemon on R1..")
@ -2212,10 +2221,12 @@ def test_BGP_GR_chaos_33_p1(request):
result = verify_rib(
tgen, addr_type, dut, input_dict_2, next_hop_4, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
if addr_type == "ipv6":
@ -2227,10 +2238,12 @@ def test_BGP_GR_chaos_33_p1(request):
result = verify_rib(
tgen, addr_type, dut, input_dict_2, next_hop_6, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 4] : Start BGPd daemon on R1 and R4..")
@ -2411,27 +2424,30 @@ def test_BGP_GR_chaos_34_2_p1(request):
result = verify_f_bit(
tgen, topo, addr_type, input_dict, "r3", "r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r3: F-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r3: F-bit is set to True\n Error: {}".format(
tc_name, result
))
)
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes after starting BGPd daemon
input_dict_1 = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
@ -2568,10 +2584,11 @@ def test_BGP_GR_chaos_34_1_p1(request):
result = verify_f_bit(
tgen, topo, addr_type, input_dict_2, "r3", "r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r3: F-bit is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r3: F-bit is set to True\n Error: {}".format(
tc_name, result
))
)
logger.info(" Expected behavior: {}".format(result))
logger.info("[Step 3] : Kill BGPd daemon on R1..")
@ -2587,18 +2604,20 @@ def test_BGP_GR_chaos_34_1_p1(request):
# Verifying BGP RIB routes
input_dict = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(tgen, addr_type, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
# Start BGPd daemon on R1
@ -2772,27 +2791,30 @@ def test_BGP_GR_chaos_32_p1(request):
result = verify_eor(
tgen, topo, addr_type, input_dict_3, dut="r5", peer="r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r5: EOR is set to TRUE\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r5: EOR is set to TRUE\n Error: {}".format(
tc_name, result
))
)
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes after starting BGPd daemon
input_dict_1 = {key: topo["routers"][key] for key in ["r5"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
@ -2898,10 +2920,11 @@ def test_BGP_GR_chaos_37_p1(request):
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r3", peer="r1", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r3: EOR is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r3: EOR is set to True\n Error: {}".format(
tc_name, result
))
)
logger.info(" Expected behavior: {}".format(result))
# Verifying BGP RIB routes after starting BGPd daemon
@ -2964,10 +2987,11 @@ def test_BGP_GR_chaos_37_p1(request):
result = verify_eor(
tgen, topo, addr_type, input_dict_3, dut="r1", peer="r3", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: EOR is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -3119,18 +3143,20 @@ def test_BGP_GR_chaos_30_p1(request):
# Verifying BGP RIB routes before shutting down BGPd daemon
input_dict = {key: topo["routers"][key] for key in ["r3"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
write_test_footer(tc_name)
@ -3532,10 +3558,10 @@ def BGP_GR_TC_7_p1(request):
dut = "r1"
input_dict_1 = {key: topo["routers"][key] for key in ["r3"]}
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
write_test_footer(tc_name)
@ -3709,10 +3735,11 @@ def test_BGP_GR_TC_23_p1(request):
result = verify_eor(
tgen, topo, addr_type, input_dict, dut="r1", peer="r2", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: EOR is set to True\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: EOR is set to True\n Error: {}".format(
tc_name, result
))
)
# Verifying BGP RIB routes received from router R1
dut = "r1"
@ -3833,18 +3860,20 @@ def test_BGP_GR_20_p1(request):
dut = "r3"
input_dict_1 = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in BGP RIB\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info(" Expected behavior: {}".format(result))
# Verifying RIB routes before shutting down BGPd daemon
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r3: routes are still present in ZEBRA\n Error: {}".format(tc_name, result)
)
logger.info(" Expected behavior: {}".format(result))
# Start BGPd daemon on R1

View File

@ -598,10 +598,12 @@ def test_large_community_lists_with_rmap_apply_and_remove(request):
result = verify_bgp_community(
tgen, adt, dut, NETWORKS[adt], input_dict_4, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"largeCommunity is still present after deleting route-map \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
write_test_footer(tc_name)
@ -899,10 +901,10 @@ def test_large_community_lists_with_rmap_set_none(request):
dut = "r6"
for adt in ADDR_TYPES:
result = verify_bgp_community(tgen, adt, dut, NETWORKS[adt], expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"Community-list is still present \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"Community-list is still present \n Error: {}".format(tc_name, result)
)
write_test_footer(tc_name)
@ -2238,10 +2240,10 @@ def test_large_community_lists_with_rmap_match_regex(request):
result = verify_bgp_community(
tgen, adt, dut, NETWORKS[adt], input_dict_7, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"largeCommunity is still present \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"largeCommunity is still present \n Error: {}".format(tc_name, result)
)
write_test_footer(tc_name)

View File

@ -508,10 +508,10 @@ def test_ambiguous_overlapping_addresses_in_different_vrfs_p0(request):
)
result = verify_rib(tgen, addr_type, dut, input_dict_1, tag=500, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are present with tag value 500 \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"Routes are present with tag value 500 \n Error: {}".format(tc_name, result)
)
logger.info("Expected Behavior: {}".format(result))
step(
@ -1147,10 +1147,12 @@ def test_prefixes_leaking_p0(request):
result = verify_rib(
tgen, addr_type, dut, input_dict_1, metric=123, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"Routes are present with metric value 123 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info("Expected Behavior: {}".format(result))
result = verify_rib(tgen, addr_type, dut, input_dict_2, metric=123)
@ -1161,10 +1163,12 @@ def test_prefixes_leaking_p0(request):
result = verify_rib(
tgen, addr_type, dut, input_dict_2, metric=0, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"Routes are present with metric value 0 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info("Expected Behavior: {}".format(result))
write_test_footer(tc_name)

View File

@ -2222,16 +2222,20 @@ def test_restart_bgpd_daemon_p1(request):
}
result = verify_rib(tgen, addr_type, dut, input_dict_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"Routes are still present in VRF RED_A and RED_B \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
result = verify_rib(tgen, addr_type, dut, input_dict_2, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"Routes are still present in VRF BLUE_A and BLUE_B \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("Bring up BGPd daemon on R1.")
start_router_daemons(tgen, "r1", ["bgpd"])

View File

@ -369,10 +369,11 @@ def test_recursive_routes_iBGP_peer_p1(request):
protocol="bgp",
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Reconfigure the same static route on R2 again")
dut = "r2"
@ -490,10 +491,11 @@ def test_recursive_routes_iBGP_peer_p1(request):
result = verify_rib(
tgen, addr_type, "r2", input_dict_4, protocol="bgp", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -602,10 +604,11 @@ def test_next_hop_as_self_ip_p1(request):
next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Shut interface on R2 that has IP from the subnet as BGP next-hop")
intf_r2_r4 = topo["routers"]["r2"]["links"]["r4"]["interface"]
@ -680,10 +683,11 @@ def test_next_hop_as_self_ip_p1(request):
next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -1626,10 +1630,11 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request):
step("Verify that once eBGP multi-hop is removed, BGP session goes down")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
step("Add ebgp-multihop command on R3 again")
for addr_type in ADDR_TYPES:
@ -1667,10 +1672,11 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request):
step("Verify that BGP session goes down, when update-source is removed")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
step("Add update-source command on R1 again")
for addr_type in ADDR_TYPES:
@ -1719,18 +1725,20 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request):
next_hop=topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
sleep(3)
step("Verify that BGP session goes down, when static route is removed")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
step("Add static route on R3 again")
for addr_type in ADDR_TYPES:
@ -1772,10 +1780,11 @@ def test_BGP_peering_bw_loopback_and_physical_p1(request):
sleep(3)
step("Verify that BGP neighborship between R1 and R3 goes down")
result = verify_bgp_convergence_from_running_config(tgen, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
intf_r1_r3 = topo["routers"]["r1"]["links"]["r3"]["interface"]
shutdown_bringup_interface(tgen, "r1", intf_r1_r3, True)
@ -2091,10 +2100,11 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request):
],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Reconfigure multipath-relax command on R4")
result = create_router_bgp(tgen, topo, maxpath_relax)
@ -2151,10 +2161,11 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request):
],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"Routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Re-configure maximum-path 2 command on R4")
input_dict_8 = {
@ -2342,10 +2353,11 @@ def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request):
"configured but not peer routers"
)
result = verify_bgp_convergence(tgen, topo, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
step("configure same password on R2 and R3")
for routerN in ["r2", "r3"]:
@ -2372,10 +2384,11 @@ def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request):
"strings are in CAPs on R2 and R3"
)
result = verify_bgp_convergence(tgen, topo, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
step("Configure same password on R2 and R3 without CAPs")
for routerN in ["r2", "r3"]:
@ -2399,10 +2412,11 @@ def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request):
step("Verify if password is removed from R1, both sessions go down again")
result = verify_bgp_convergence(tgen, topo, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"BGP is converged \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "BGP is converged \n Error: {}".format(
tc_name, result
))
)
step("Configure alphanumeric password on R1 and peer routers R2,R3")
for bgp_neighbor in ["r2", "r3"]:

View File

@ -498,6 +498,7 @@ def disable_route_map_to_prefer_global_next_hop(tgen, topo):
#
#####################################################
def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request):
"""
TC5_FUNC_5:
@ -762,9 +763,7 @@ def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request):
for addr_type in ADDR_TYPES:
step(
"On router R1 delete static routes in vrf ISR to LOOPBACK_1"
)
step("On router R1 delete static routes in vrf ISR to LOOPBACK_1")
input_routes_r1 = {
"r1": {
@ -772,7 +771,7 @@ def test_dynamic_imported_routes_advertised_to_iBGP_peer_p0(request):
{
"network": [NETWORK1_3[addr_type], NETWORK1_4[addr_type]],
"next_hop": (intf_r2_r1[addr_type]).split("/")[0],
"delete": True
"delete": True,
}
]
}

View File

@ -11,6 +11,7 @@ from lib.topotest import json_cmp_result
from lib.topotest import g_extra_config as topotest_extra_config
from lib.topolog import logger
def pytest_addoption(parser):
"""
Add topology-only option to the topology tester. This option makes pytest
@ -143,9 +144,7 @@ def pytest_configure(config):
vtysh_on_error = config.getoption("--vtysh-on-error")
topotest_extra_config["vtysh_on_error"] = vtysh_on_error
topotest_extra_config["pause_after"] = (
pause_after or shell or vtysh
)
topotest_extra_config["pause_after"] = pause_after or shell or vtysh
topotest_extra_config["topology_only"] = config.getoption("--topology-only")
@ -177,9 +176,11 @@ def pytest_runtest_makereport(item, call):
else:
error = True
# Handle assert failures
parent._previousfailed = item # pylint: disable=W0212
parent._previousfailed = item # pylint: disable=W0212
logger.error(
'assert failed at "{}/{}": {}'.format(modname, item.name, call.excinfo.value)
'assert failed at "{}/{}": {}'.format(
modname, item.name, call.excinfo.value
)
)
# (topogen) Set topology error to avoid advancing in the test.
@ -188,7 +189,6 @@ def pytest_runtest_makereport(item, call):
# This will cause topogen to report error on `routers_have_failure`.
tgen.set_error("{}/{}".format(modname, item.name))
if error and topotest_extra_config["shell_on_error"]:
for router in tgen.routers():
pause = True

View File

@ -124,7 +124,6 @@ class TemplateTopo(Topo):
switch.add_link(tgen.gears["r3"])
def setup_module(mod):
"Sets up the pytest environment"
@ -148,20 +147,24 @@ def setup_module(mod):
# Don't start the following in the CE nodes
if router.name[0] == "r":
router.load_config(
TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname)),
TopoRouter.RD_ISIS,
os.path.join(CWD, "{}/isisd.conf".format(rname)),
"-M snmp",
)
router.load_config(
TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)),
TopoRouter.RD_LDP,
os.path.join(CWD, "{}/ldpd.conf".format(rname)),
)
router.load_config(
TopoRouter.RD_SNMP, os.path.join(CWD, "{}/snmpd.conf".format(rname)),
TopoRouter.RD_SNMP,
os.path.join(CWD, "{}/snmpd.conf".format(rname)),
"-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap",
)
# After loading the configurations, this function loads configured daemons.
tgen.start_router()
def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
@ -169,6 +172,7 @@ def teardown_module(mod):
# This function tears down the whole topology.
tgen.stop_topology()
def router_compare_json_output(rname, command, reference):
"Compare router JSON output"
@ -184,6 +188,7 @@ def router_compare_json_output(rname, command, reference):
assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)
assert diff is None, assertmsg
def generate_oid(numoids, index1, index2):
if numoids == 1:
oid = "{}".format(index1)
@ -200,7 +205,9 @@ def test_isis_convergence():
router_compare_json_output(
rname,
"show yang operational-data /frr-interface:lib isisd",
"show_yang_interface_isis_adjacencies.ref")
"show_yang_interface_isis_adjacencies.ref",
)
def test_r1_scalar_snmp():
"Wait for protocol convergence"
@ -213,26 +220,26 @@ def test_r1_scalar_snmp():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid('isisSysVersion', "one(1)")
assert r1_snmp.test_oid('isisSysLevelType', "level1and2(3)")
assert r1_snmp.test_oid('isisSysID',"00 00 00 00 00 01")
assert r1_snmp.test_oid('isisSysMaxPathSplits',"32")
assert r1_snmp.test_oid('isisSysMaxLSPGenInt',"900 seconds")
assert r1_snmp.test_oid('isisSysAdminState',"on(1)")
assert r1_snmp.test_oid('isisSysMaxAge',"1200 seconds")
assert r1_snmp.test_oid('isisSysProtSupported',"07 5 6 7")
assert r1_snmp.test_oid("isisSysVersion", "one(1)")
assert r1_snmp.test_oid("isisSysLevelType", "level1and2(3)")
assert r1_snmp.test_oid("isisSysID", "00 00 00 00 00 01")
assert r1_snmp.test_oid("isisSysMaxPathSplits", "32")
assert r1_snmp.test_oid("isisSysMaxLSPGenInt", "900 seconds")
assert r1_snmp.test_oid("isisSysAdminState", "on(1)")
assert r1_snmp.test_oid("isisSysMaxAge", "1200 seconds")
assert r1_snmp.test_oid("isisSysProtSupported", "07 5 6 7")
r2 = tgen.net.get("r2")
r2_snmp = SnmpTester(r2, "2.2.2.2", "public", "2c")
assert r2_snmp.test_oid('isisSysVersion', "one(1)")
assert r2_snmp.test_oid('isisSysLevelType', "level1and2(3)")
assert r2_snmp.test_oid('isisSysID',"00 00 00 00 00 02")
assert r2_snmp.test_oid('isisSysMaxPathSplits',"32")
assert r2_snmp.test_oid('isisSysMaxLSPGenInt',"900 seconds")
assert r2_snmp.test_oid('isisSysAdminState',"on(1)")
assert r2_snmp.test_oid('isisSysMaxAge',"1200 seconds")
assert r2_snmp.test_oid('isisSysProtSupported',"07 5 6 7")
assert r2_snmp.test_oid("isisSysVersion", "one(1)")
assert r2_snmp.test_oid("isisSysLevelType", "level1and2(3)")
assert r2_snmp.test_oid("isisSysID", "00 00 00 00 00 02")
assert r2_snmp.test_oid("isisSysMaxPathSplits", "32")
assert r2_snmp.test_oid("isisSysMaxLSPGenInt", "900 seconds")
assert r2_snmp.test_oid("isisSysAdminState", "on(1)")
assert r2_snmp.test_oid("isisSysMaxAge", "1200 seconds")
assert r2_snmp.test_oid("isisSysProtSupported", "07 5 6 7")
circtable_test = {
@ -245,7 +252,8 @@ circtable_test = {
"isisCircMeshGroupEnabled": ["inactive(1)", "inactive(1)", "inactive(1)"],
"isisCircSmallHellos": ["false(2)", "false(2)", "false(2)"],
"isisCirc3WayEnabled": ["false(2)", "false(2)", "false(2)"],
}
}
def test_r1_isisCircTable():
tgen = get_topogen()
@ -256,9 +264,9 @@ def test_r1_isisCircTable():
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
oids = []
oids.append(generate_oid(1,1,0))
oids.append(generate_oid(1,2,0))
oids.append(generate_oid(1,3,0))
oids.append(generate_oid(1, 1, 0))
oids.append(generate_oid(1, 2, 0))
oids.append(generate_oid(1, 3, 0))
# check items
for item in circtable_test.keys():
@ -267,14 +275,26 @@ def test_r1_isisCircTable():
)
assert r1_snmp.test_oid_walk(item, circtable_test[item], oids), assertmsg
circleveltable_test = {
"isisCircLevelMetric": ["10", "10", "10", "10"],
"isisCircLevelWideMetric": ["10", "10", "0", "0"],
"isisCircLevelISPriority": ["64", "64", "64", "64"],
"isisCircLevelHelloMultiplier": ["10", "10", "10", "10"],
"isisCircLevelHelloTimer": ["3000 milliseconds", "3000 milliseconds", "3000 milliseconds", "3000 milliseconds"],
"isisCircLevelMinLSPRetransInt": ["1 seconds", "1 seconds", "0 seconds", "0 seconds"],
}
"isisCircLevelHelloTimer": [
"3000 milliseconds",
"3000 milliseconds",
"3000 milliseconds",
"3000 milliseconds",
],
"isisCircLevelMinLSPRetransInt": [
"1 seconds",
"1 seconds",
"0 seconds",
"0 seconds",
],
}
def test_r1_isislevelCircTable():
tgen = get_topogen()
@ -285,10 +305,10 @@ def test_r1_isislevelCircTable():
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
oids = []
oids.append(generate_oid(2,1,"area"))
oids.append(generate_oid(2,2,"area"))
oids.append(generate_oid(2,3,"area"))
oids.append(generate_oid(2,3,"domain"))
oids.append(generate_oid(2, 1, "area"))
oids.append(generate_oid(2, 2, "area"))
oids.append(generate_oid(2, 3, "area"))
oids.append(generate_oid(2, 3, "domain"))
# check items
for item in circleveltable_test.keys():
@ -316,6 +336,7 @@ adjtable_down_test = {
"isisISAdjNeighPriority": ["64"],
}
def test_r1_isisAdjTable():
"check ISIS Adjacency Table"
tgen = get_topogen()
@ -324,11 +345,11 @@ def test_r1_isisAdjTable():
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
oids = []
oids.append(generate_oid(2,1,1))
oids.append(generate_oid(2,2,1))
oids.append(generate_oid(2, 1, 1))
oids.append(generate_oid(2, 2, 1))
oids_down = []
oids_down.append(generate_oid(2,1,1))
oids_down.append(generate_oid(2, 1, 1))
# check items
for item in adjtable_test.keys():
@ -337,7 +358,6 @@ def test_r1_isisAdjTable():
)
assert r1_snmp.test_oid_walk(item, adjtable_test[item], oids), assertmsg
# shutdown interface and one adjacency should be removed
"check ISIS adjacency is removed when interface is shutdown"
r1_cmd.vtysh_cmd("conf t\ninterface r1-eth1\nshutdown")
@ -347,7 +367,9 @@ def test_r1_isisAdjTable():
assertmsg = "{} should be {} oids {} full dict {}:".format(
item, adjtable_down_test[item], oids_down, r1_snmp.walk(item)
)
assert r1_snmp.test_oid_walk(item, adjtable_down_test[item], oids_down), assertmsg
assert r1_snmp.test_oid_walk(
item, adjtable_down_test[item], oids_down
), assertmsg
# no shutdown interface and adjacency should be restored
r1_cmd.vtysh_cmd("conf t\ninterface r1-eth1\nno shutdown")

View File

@ -42,8 +42,8 @@ from lib.topogen import Topogen, TopoRouter, get_topogen
from lib.topolog import logger
from lib.topotest import iproute2_is_vrf_capable
from lib.common_config import (
required_linux_kernel_version,
adjust_router_l3mdev,
required_linux_kernel_version,
adjust_router_l3mdev,
)
from mininet.topo import Topo
@ -51,18 +51,19 @@ from mininet.topo import Topo
pytestmark = [pytest.mark.isisd]
VERTEX_TYPE_LIST = [
"pseudo_IS",
"pseudo_TE-IS",
"IS",
"TE-IS",
"ES",
"IP internal",
"IP external",
"IP TE",
"IP6 internal",
"IP6 external",
"UNKNOWN"
]
"pseudo_IS",
"pseudo_TE-IS",
"IS",
"TE-IS",
"ES",
"IP internal",
"IP external",
"IP TE",
"IP6 internal",
"IP6 external",
"UNKNOWN",
]
class ISISTopo1(Topo):
"Simple two layer ISIS vrf topology"
@ -330,8 +331,9 @@ def parse_topology(lines, level):
ipv = "ipv4"
continue
item_match = re.match(r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)"
, line)
item_match = re.match(
r"([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+)", line
)
if (
item_match is not None
and item_match.group(1) == "Vertex"
@ -344,8 +346,12 @@ def parse_topology(lines, level):
# Skip header
continue
item_match = re.match(r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)"
.format(vertex_type_regex), line)
item_match = re.match(
r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+) ([^\s]+) ([^\s]+)".format(
vertex_type_regex
),
line,
)
if item_match is not None:
areas[area][level][ipv].append(
{
@ -359,8 +365,10 @@ def parse_topology(lines, level):
)
continue
item_match = re.match(r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)"
.format(vertex_type_regex), line)
item_match = re.match(
r"([^\s]+) ({}) ([0]|([1-9][0-9]*)) ([^\s]+)".format(vertex_type_regex),
line,
)
if item_match is not None:
areas[area][level][ipv].append(

View File

@ -141,15 +141,16 @@ def setup_module(mod):
TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
)
router.load_config(
TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname)),
"-M snmp"
TopoRouter.RD_LDP,
os.path.join(CWD, "{}/ldpd.conf".format(rname)),
"-M snmp",
)
router.load_config(
TopoRouter.RD_SNMP, os.path.join(CWD, "{}/snmpd.conf".format(rname)),
"-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap"
TopoRouter.RD_SNMP,
os.path.join(CWD, "{}/snmpd.conf".format(rname)),
"-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap",
)
tgen.start_router()
@ -199,7 +200,7 @@ def test_rib():
# Skip if previous fatal error condition is raised
# TODO: disabling this check to avoid 'snmpd not running' errors
#if tgen.routers_have_failure():
# if tgen.routers_have_failure():
# pytest.skip(tgen.errors)
for rname in ["r1", "r2", "r3"]:
@ -212,7 +213,7 @@ def test_ldp_adjacencies():
# Skip if previous fatal error condition is raised
# TODO: disabling this check to avoid 'snmpd not running' errors
#if tgen.routers_have_failure():
# if tgen.routers_have_failure():
# pytest.skip(tgen.errors)
for rname in ["r1", "r2", "r3"]:
@ -226,7 +227,7 @@ def test_ldp_neighbors():
tgen = get_topogen()
# Skip if previous fatal error condition is raised
#if tgen.routers_have_failure():
# if tgen.routers_have_failure():
# pytest.skip(tgen.errors)
for rname in ["r1", "r2", "r3"]:
@ -242,8 +243,8 @@ def test_r1_ldp_lsr_objects():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid('mplsLdpLsrId', "01 01 01 01")
assert r1_snmp.test_oid('mplsLdpLsrLoopDetectionCapable', 'none(1)')
assert r1_snmp.test_oid("mplsLdpLsrId", "01 01 01 01")
assert r1_snmp.test_oid("mplsLdpLsrLoopDetectionCapable", "none(1)")
def test_r1_ldp_entity_table():
@ -253,52 +254,31 @@ def test_r1_ldp_entity_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid_walk("mplsLdpEntityLdpId", ["1.1.1.1:0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityIndex", ["1"])
assert r1_snmp.test_oid_walk("mplsLdpEntityProtocolVersion", ["1"])
assert r1_snmp.test_oid_walk("mplsLdpEntityAdminStatus", ["enable(1)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityOperStatus", ["enabled(2)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityTcpPort", ["646"])
assert r1_snmp.test_oid_walk("mplsLdpEntityUdpDscPort", ["646"])
assert r1_snmp.test_oid_walk("mplsLdpEntityMaxPduLength", ["4096 octets"])
assert r1_snmp.test_oid_walk("mplsLdpEntityKeepAliveHoldTimer", ["180 seconds"])
assert r1_snmp.test_oid_walk("mplsLdpEntityHelloHoldTimer", ["0 seconds"])
assert r1_snmp.test_oid_walk("mplsLdpEntityInitSessionThreshold", ["0"])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityLdpId', ['1.1.1.1:0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityIndex', ['1'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityProtocolVersion', ['1'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityAdminStatus', ['enable(1)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityOperStatus', ['enabled(2)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityTcpPort', ['646'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityUdpDscPort', ['646'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityMaxPduLength', ['4096 octets'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityKeepAliveHoldTimer', ['180 seconds'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityHelloHoldTimer', ['0 seconds'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityInitSessionThreshold', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityLabelDistMethod', ['downstreamUnsolicited(2)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityLabelRetentionMode', ['liberal(2)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityPathVectorLimit', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityHopCountLimit', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityTransportAddrKind', ['loopback(2)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityTargetPeer', ['true(1)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityTargetPeerAddrType', ['ipv4(1)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityTargetPeerAddr', ['01 01 01 01'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityLabelType', ['generic(1)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityDiscontinuityTime', ['(0) 0:00:00.00'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStorageType', ['nonVolatile(3)'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityRowStatus', ['createAndGo(4)'])
"mplsLdpEntityLabelDistMethod", ["downstreamUnsolicited(2)"]
)
assert r1_snmp.test_oid_walk("mplsLdpEntityLabelRetentionMode", ["liberal(2)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityPathVectorLimit", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityHopCountLimit", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityTransportAddrKind", ["loopback(2)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeer", ["true(1)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeerAddrType", ["ipv4(1)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityTargetPeerAddr", ["01 01 01 01"])
assert r1_snmp.test_oid_walk("mplsLdpEntityLabelType", ["generic(1)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityDiscontinuityTime", ["(0) 0:00:00.00"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStorageType", ["nonVolatile(3)"])
assert r1_snmp.test_oid_walk("mplsLdpEntityRowStatus", ["createAndGo(4)"])
def test_r1_ldp_entity_stats_table():
@ -308,32 +288,23 @@ def test_r1_ldp_entity_stats_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionAttempts", ["0"])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsSessionAttempts', ['0'])
"mplsLdpEntityStatsSessionRejectedNoHelloErrors", ["0"]
)
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedAdErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedMaxPduErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsSessionRejectedLRErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadLdpIdentifierErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadPduLengthErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadMessageLengthErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsBadTlvLengthErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsMalformedTlvValueErrors", ["0"])
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsKeepAliveTimerExpErrors", ["0"])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsSessionRejectedNoHelloErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsSessionRejectedAdErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsSessionRejectedMaxPduErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsSessionRejectedLRErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsBadLdpIdentifierErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsBadPduLengthErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsBadMessageLengthErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsBadTlvLengthErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsMalformedTlvValueErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsKeepAliveTimerExpErrors', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsShutdownReceivedNotifications', ['0'])
assert r1_snmp.test_oid_walk(
'mplsLdpEntityStatsShutdownSentNotifications', ['0'])
"mplsLdpEntityStatsShutdownReceivedNotifications", ["0"]
)
assert r1_snmp.test_oid_walk("mplsLdpEntityStatsShutdownSentNotifications", ["0"])
def test_r1_ldp_peer_table():
@ -343,17 +314,16 @@ def test_r1_ldp_peer_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid_walk("mplsLdpPeerLdpId", ["2.2.2.2:0", "3.3.3.3:0"])
assert r1_snmp.test_oid_walk(
'mplsLdpPeerLdpId', ['2.2.2.2:0', '3.3.3.3:0'])
"mplsLdpPeerLabelDistMethod",
["downstreamUnsolicited(2)", "downstreamUnsolicited(2)"],
)
assert r1_snmp.test_oid_walk("mplsLdpPeerPathVectorLimit", ["0", "0"])
assert r1_snmp.test_oid_walk("mplsLdpPeerTransportAddrType", ["ipv4(1)", "ipv4(1)"])
assert r1_snmp.test_oid_walk(
'mplsLdpPeerLabelDistMethod',
['downstreamUnsolicited(2)', 'downstreamUnsolicited(2)'])
assert r1_snmp.test_oid_walk(
'mplsLdpPeerPathVectorLimit', ['0', '0'])
assert r1_snmp.test_oid_walk(
'mplsLdpPeerTransportAddrType', ['ipv4(1)', 'ipv4(1)'])
assert r1_snmp.test_oid_walk(
'mplsLdpPeerTransportAddr', ['02 02 02 02', '03 03 03 03'])
"mplsLdpPeerTransportAddr", ["02 02 02 02", "03 03 03 03"]
)
def test_r1_ldp_session_table():
@ -363,18 +333,20 @@ def test_r1_ldp_session_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid_walk('mplsLdpSessionState',
['operational(5)', 'operational(5)'])
assert r1_snmp.test_oid_walk('mplsLdpSessionRole',
['passive(3)', 'passive(3)'])
assert r1_snmp.test_oid_walk('mplsLdpSessionProtocolVersion',
['1', '1'])
assert r1_snmp.test_oid_walk('mplsLdpSessionKeepAliveTime',
['180 seconds', '180 seconds'])
assert r1_snmp.test_oid_walk('mplsLdpSessionMaxPduLength',
['4096 octets', '4096 octets'])
assert r1_snmp.test_oid_walk('mplsLdpSessionDiscontinuityTime',
['(0) 0:00:00.00', '(0) 0:00:00.00'])
assert r1_snmp.test_oid_walk(
"mplsLdpSessionState", ["operational(5)", "operational(5)"]
)
assert r1_snmp.test_oid_walk("mplsLdpSessionRole", ["passive(3)", "passive(3)"])
assert r1_snmp.test_oid_walk("mplsLdpSessionProtocolVersion", ["1", "1"])
assert r1_snmp.test_oid_walk(
"mplsLdpSessionKeepAliveTime", ["180 seconds", "180 seconds"]
)
assert r1_snmp.test_oid_walk(
"mplsLdpSessionMaxPduLength", ["4096 octets", "4096 octets"]
)
assert r1_snmp.test_oid_walk(
"mplsLdpSessionDiscontinuityTime", ["(0) 0:00:00.00", "(0) 0:00:00.00"]
)
def test_r1_ldp_session_stats_table():
@ -384,10 +356,8 @@ def test_r1_ldp_session_stats_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid_walk(
'mplsLdpSessionStatsUnknownMesTypeErrors', ['0', '0'])
assert r1_snmp.test_oid_walk(
'mplsLdpSessionStatsUnknownTlvErrors', ['0', '0'])
assert r1_snmp.test_oid_walk("mplsLdpSessionStatsUnknownMesTypeErrors", ["0", "0"])
assert r1_snmp.test_oid_walk("mplsLdpSessionStatsUnknownTlvErrors", ["0", "0"])
def test_r1_ldp_hello_adjacency_table():
@ -397,12 +367,11 @@ def test_r1_ldp_hello_adjacency_table():
r1 = tgen.net.get("r1")
r1_snmp = SnmpTester(r1, "1.1.1.1", "public", "2c")
assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyIndex',
['1', '2', '1'])
assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyHoldTime',
['15', '45', '15'])
assert r1_snmp.test_oid_walk('mplsLdpHelloAdjacencyType',
['link(1)', 'targeted(2)', 'link(1)'])
assert r1_snmp.test_oid_walk("mplsLdpHelloAdjacencyIndex", ["1", "2", "1"])
assert r1_snmp.test_oid_walk("mplsLdpHelloAdjacencyHoldTime", ["15", "45", "15"])
assert r1_snmp.test_oid_walk(
"mplsLdpHelloAdjacencyType", ["link(1)", "targeted(2)", "link(1)"]
)
# Memory leak test template

View File

@ -43,7 +43,7 @@ from lib.common_config import (
run_frr_cmd,
FRRCFG_FILE,
retry,
get_ipv6_linklocal_address
get_ipv6_linklocal_address,
)
LOGDIR = "/tmp/topotests/"

View File

@ -75,54 +75,54 @@ config_section = "topogen"
# Debug logs for daemons
DEBUG_LOGS = {
"pimd": [
'debug msdp events',
'debug msdp packets',
'debug igmp events',
'debug igmp trace',
'debug mroute',
'debug mroute detail',
'debug pim events',
'debug pim packets',
'debug pim trace',
'debug pim zebra',
'debug pim bsm',
'debug pim packets joins',
'debug pim packets register',
'debug pim nht'
"debug msdp events",
"debug msdp packets",
"debug igmp events",
"debug igmp trace",
"debug mroute",
"debug mroute detail",
"debug pim events",
"debug pim packets",
"debug pim trace",
"debug pim zebra",
"debug pim bsm",
"debug pim packets joins",
"debug pim packets register",
"debug pim nht",
],
"bgpd": [
'debug bgp neighbor-events',
'debug bgp updates',
'debug bgp zebra',
'debug bgp nht',
'debug bgp neighbor-events',
'debug bgp graceful-restart',
'debug bgp update-groups',
'debug bgp vpn leak-from-vrf',
'debug bgp vpn leak-to-vrf',
'debug bgp zebr',
'debug bgp updates',
'debug bgp nht',
'debug bgp neighbor-events',
'debug vrf'
"debug bgp neighbor-events",
"debug bgp updates",
"debug bgp zebra",
"debug bgp nht",
"debug bgp neighbor-events",
"debug bgp graceful-restart",
"debug bgp update-groups",
"debug bgp vpn leak-from-vrf",
"debug bgp vpn leak-to-vrf",
"debug bgp zebr",
"debug bgp updates",
"debug bgp nht",
"debug bgp neighbor-events",
"debug vrf",
],
"zebra": [
'debug zebra events',
'debug zebra rib',
'debug zebra vxlan',
'debug zebra nht'
"debug zebra events",
"debug zebra rib",
"debug zebra vxlan",
"debug zebra nht",
],
"ospf": [
'debug ospf event',
'debug ospf ism',
'debug ospf lsa',
'debug ospf nsm',
'debug ospf nssa',
'debug ospf packet all',
'debug ospf sr',
'debug ospf te',
'debug ospf zebra',
]
"debug ospf event",
"debug ospf ism",
"debug ospf lsa",
"debug ospf nsm",
"debug ospf nssa",
"debug ospf packet all",
"debug ospf sr",
"debug ospf te",
"debug ospf zebra",
],
}
if config.has_option("topogen", "verbosity"):
@ -1155,10 +1155,8 @@ def create_debug_log_config(tgen, input_dict, build=False):
log_file = debug_dict.setdefault("log_file", None)
if log_file:
_log_file = os.path.join(LOGDIR, tgen.modname,
log_file)
debug_config.append("log file {} \n".\
format(_log_file))
_log_file = os.path.join(LOGDIR, tgen.modname, log_file)
debug_config.append("log file {} \n".format(_log_file))
if type(enable_logs) is list:
for daemon in enable_logs:
@ -1178,10 +1176,9 @@ def create_debug_log_config(tgen, input_dict, build=False):
for debug_log in debug_logs:
debug_config.append("no {}".format(debug_log))
result = create_common_configuration(tgen, router,
debug_config,
"debug_log_config",
build=build)
result = create_common_configuration(
tgen, router, debug_config, "debug_log_config", build=build
)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
@ -3868,7 +3865,7 @@ def get_ipv6_linklocal_address(topo, node, intf):
"""
tgen = get_topogen()
ext_nh = tgen.net[node].get_ipv6_linklocal()
req_nh = topo[node]['links'][intf]['interface']
req_nh = topo[node]["links"][intf]["interface"]
llip = None
for llips in ext_nh:
if llips[0] == req_nh:
@ -3876,8 +3873,9 @@ def get_ipv6_linklocal_address(topo, node, intf):
logger.info("Link local ip found = %s", llip)
return llip
errormsg = "Failed: Link local ip not found on router {}, "\
"interface {}".format(node, intf)
errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
node, intf
)
return errormsg
@ -4553,9 +4551,7 @@ def adjust_router_l3mdev(tgen, router):
)
output = tgen.net[router].cmd("sysctl -n net.ipv4.tcp_l3mdev_accept")
logger.info(
"router {0}: existing tcp_l3mdev_accept was {1}".format(router, output)
)
logger.info("router {0}: existing tcp_l3mdev_accept was {1}".format(router, output))
tgen.net[router].cmd(
"sysctl -w net.ipv4.tcp_l3mdev_accept={}".format(l3mdev_accept)

View File

@ -344,9 +344,7 @@ def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=
for lnk in input_dict[router]["links"].keys():
if "ospf" not in input_dict[router]["links"][lnk]:
logger.debug(
"Router %s: ospf config is not present in"
"input_dict",
router
"Router %s: ospf config is not present in" "input_dict", router
)
continue
ospf_data = input_dict[router]["links"][lnk]["ospf"]

View File

@ -54,6 +54,7 @@ from mininet.term import makeTerm
g_extra_config = {}
def gdb_core(obj, daemon, corefiles):
gdbcmds = """
info threads
@ -544,7 +545,7 @@ def iproute2_is_vrf_capable():
["ip", "route", "show", "vrf"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE
stdin=subprocess.PIPE,
)
iproute2_err = subp.communicate()[1].splitlines()[0].split()[0]
@ -1345,15 +1346,9 @@ class Router(Node):
# Run a command in a new window (gnome-terminal, screen, tmux, xterm)
def runInWindow(self, cmd, title=None):
topo_terminal = os.getenv("FRR_TOPO_TERMINAL")
if topo_terminal or (
"TMUX" not in os.environ and "STY" not in os.environ
):
if topo_terminal or ("TMUX" not in os.environ and "STY" not in os.environ):
term = topo_terminal if topo_terminal else "xterm"
makeTerm(
self,
title=title if title else cmd,
term=term,
cmd=cmd)
makeTerm(self, title=title if title else cmd, term=term, cmd=cmd)
else:
nscmd = "sudo nsenter -m -n -t {} {}".format(self.pid, cmd)
if "TMUX" in os.environ:
@ -1362,9 +1357,7 @@ class Router(Node):
cmd = "{} {}".format(wcmd, nscmd)
elif "STY" in os.environ:
if os.path.exists(
"/run/screen/S-{}/{}".format(
os.environ['USER'], os.environ['STY']
)
"/run/screen/S-{}/{}".format(os.environ["USER"], os.environ["STY"])
):
wcmd = "screen"
else:
@ -1372,7 +1365,6 @@ class Router(Node):
cmd = "{} {}".format(wcmd, nscmd)
self.cmd(cmd)
def startRouter(self, tgen=None):
# Disable integrated-vtysh-config
self.cmd(
@ -1459,7 +1451,7 @@ class Router(Node):
def startRouterDaemons(self, daemons=None):
"Starts all FRR daemons for this router."
gdb_breakpoints = g_extra_config["gdb_breakpoints"]
gdb_breakpoints = g_extra_config["gdb_breakpoints"]
gdb_daemons = g_extra_config["gdb_daemons"]
gdb_routers = g_extra_config["gdb_routers"]
@ -1520,12 +1512,10 @@ class Router(Node):
if (
(gdb_routers or gdb_daemons)
and (not gdb_routers
or self.name in gdb_routers
or "all" in gdb_routers)
and (not gdb_daemons
or daemon in gdb_daemons
or "all" in gdb_daemons)
and (
not gdb_routers or self.name in gdb_routers or "all" in gdb_routers
)
and (not gdb_daemons or daemon in gdb_daemons or "all" in gdb_daemons)
):
if daemon == "snmpd":
cmdopt += " -f "
@ -1546,7 +1536,6 @@ class Router(Node):
self.cmd(" ".join([cmdenv, binary, cmdopt]))
logger.info("{}: {} {} started".format(self, self.routertype, daemon))
# Start Zebra first
if "zebra" in daemons_list:
start_daemon("zebra", "-s 90000000")

View File

@ -648,7 +648,11 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
input_dict = {
"i1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": next_hop_rp}]},
"l1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": next_hop_lhr}]},
"f1": {"static_routes": [{"network": CRP, "next_hop": next_hop_fhr, "delete": True}]},
"f1": {
"static_routes": [
{"network": CRP, "next_hop": next_hop_fhr, "delete": True}
]
},
}
result = create_static_routes(tgen, input_dict)
@ -692,10 +696,11 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
step("Verify if b1 chosen as BSR in l1")
result = verify_pim_bsr(tgen, topo, "l1", BSR_IP_1, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"b1 is not chosen as BSR in l1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "b1 is not chosen as BSR in l1 \n Error: {}".format(
tc_name, result
))
)
state_after = verify_pim_interface_traffic(tgen, state_dict)
assert isinstance(
@ -841,10 +846,12 @@ def test_new_router_fwd_p0(request):
# Verify bsr state in l1
step("Verify no BSR in l1 as i1 would not forward the no-forward bsm")
result = verify_pim_bsr(tgen, topo, "l1", bsr_ip, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"BSR data is present after no-forward bsm also \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
# unconfigure unicast bsm on f1-i1-eth2
step("unconfigure unicast bsm on f1-i1-eth2, will forward with only mcast")
@ -966,10 +973,11 @@ def test_int_bsm_config_p1(request):
result = verify_ip_mroutes(
tgen, "i1", src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"Mroutes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "Mroutes are still present \n Error: {}".format(
tc_name, result
))
)
# unconfigure bsm processing on f1 on f1-i1-eth2
step("unconfigure bsm processing on f1 in f1-i1-eth2, will drop bsm")
@ -989,20 +997,21 @@ def test_int_bsm_config_p1(request):
# Verify bsr state in i1
step("Verify if b1 is not chosen as BSR in i1")
result = verify_pim_bsr(tgen, topo, "i1", bsr_ip, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"b1 is chosen as BSR in i1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "b1 is chosen as BSR in i1 \n Error: {}".format(
tc_name, result
))
)
# check if mroute still not installed because of rp not available
step("check if mroute still not installed because of rp not available")
result = verify_ip_mroutes(
tgen, "i1", src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"mroute installed but rp not available \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"mroute installed but rp not available \n Error: {}".format(tc_name, result)
)
# configure bsm processing on i1 on f1-i1-eth2
step("configure bsm processing on f1 in f1-i1-eth2, will accept bsm")
@ -1464,10 +1473,11 @@ def test_BSM_timeout_p0(request):
tgen, topo, "f1", group, rp_source="BSR", expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"bsr has not aged out in f1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "bsr has not aged out in f1 \n Error: {}".format(
tc_name, result
))
)
# Verify RP mapping removed after hold timer expires
group = "225.1.1.1/32"
@ -1491,20 +1501,23 @@ def test_BSM_timeout_p0(request):
result = verify_join_state_and_timer(
tgen, dut, iif, src_addr, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"join state is up and join timer is running in l1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
# Verify ip mroute is not installed
step("Verify mroute not installed in l1")
result = verify_ip_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"mroute installed in l1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
tc_name, result
))
)
step("clear BSM database before moving to next case")
clear_bsrp_data(tgen, topo)
@ -1657,10 +1670,11 @@ def test_iif_join_state_p0(request):
result = verify_ip_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"mroute installed in l1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
tc_name, result
))
)
# Add back route for RP to make it reachable
step("Add back route for RP to make it reachable")

View File

@ -458,10 +458,11 @@ def test_starg_mroute_p0(request):
result = verify_ip_mroutes(
tgen, dut, src_addr, GROUP_ADDRESS, iif, oil, wait=20, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"mroute installed in l1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "mroute installed in l1 \n Error: {}".format(
tc_name, result
))
)
# Send BSM again to configure rp
step("Add back RP by sending BSM from b1")
@ -811,10 +812,11 @@ def test_BSR_election_p0(request):
# Verify bsr state in FHR
step("Verify if b2 is not chosen as bsr in f1")
result = verify_pim_bsr(tgen, topo, "f1", bsr_ip2, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"b2 is chosen as bsr in f1 \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "b2 is chosen as bsr in f1 \n Error: {}".format(
tc_name, result
))
)
# Verify if b1 is still chosen as bsr
step("Verify if b1 is still chosen as bsr in f1")

View File

@ -3487,11 +3487,11 @@ def test_prune_sent_to_LHR_and_FHR_when_PIMnbr_down_p2(request):
IGMP_JOIN_RANGE_1,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"upstream is still present after shut the link from "
"FHR to RP from RP node \n Error: {}".format(
tc_name, result
))
"FHR to RP from RP node \n Error: {}".format(tc_name, result)
)
step(" No shut the link from FHR to RP from RP node")
@ -3638,11 +3638,11 @@ def test_prune_sent_to_LHR_and_FHR_when_PIMnbr_down_p2(request):
IGMP_JOIN_RANGE_1,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"upstream is still present after shut the link from "
"FHR to RP from FHR node \n Error: {}".format(
tc_name, result
))
"FHR to RP from FHR node \n Error: {}".format(tc_name, result)
)
step(" No shut the link from FHR to RP from FHR node")

View File

@ -490,10 +490,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request):
data["oil"],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"mroutes(S,G) are present after delete of static routes on c1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
result = verify_upstream_iif(
tgen,
@ -503,10 +505,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request):
IGMP_JOIN_RANGE_1,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"upstream is present after delete of static routes on c1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
for data in input_dict_starg:
result = verify_ip_mroutes(
@ -518,10 +522,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request):
data["oil"],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"mroutes(*,G) are present after delete of static routes on c1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
result = verify_upstream_iif(
tgen,
@ -531,10 +537,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request):
IGMP_JOIN_RANGE_1,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"upstream is present after delete of static routes on c1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("Configure default routes on c2")
@ -557,10 +565,12 @@ def test_mroute_when_RP_reachable_default_route_p2(request):
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"RP info is unknown after removing static route from c2 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("Verify (s,g) populated after adding default route ")
@ -787,10 +797,11 @@ def test_mroute_with_RP_default_route_all_nodes_p2(request):
data["oil"],
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"mroutes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "mroutes are still present \n Error: {}".format(
tc_name, result
))
)
result = verify_upstream_iif(
tgen,
@ -800,10 +811,11 @@ def test_mroute_with_RP_default_route_all_nodes_p2(request):
IGMP_JOIN_RANGE_1,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"upstream is still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "upstream is still present \n Error: {}".format(
tc_name, result
))
)
step("Configure default routes on all the nodes")
@ -840,10 +852,12 @@ def test_mroute_with_RP_default_route_all_nodes_p2(request):
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_1, "Unknown", rp_address, SOURCE, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"RP info is unknown after removing static route from c2 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("Verify (s,g) populated after adding default route ")

View File

@ -423,10 +423,12 @@ def test_add_delete_static_RP_p0(request):
dut = "r1"
interface = "r1-r0-eth0"
result = verify_igmp_groups(tgen, dut, interface, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: igmp group present without any IGMP join \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r1: Verify show ip pim interface traffic without any IGMP join")
state_dict = {"r1": {"r1-r2-eth1": ["pruneTx"]}}
@ -492,26 +494,29 @@ def test_add_delete_static_RP_p0(request):
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_ALL, iif, rp_address, SOURCE, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: RP info present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: RP info present \n Error: {}".format(
tc_name, result
))
)
step("r1: Verify upstream IIF interface")
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: upstream IIF interface present \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream IIF interface present \n Error: {}".format(tc_name, result)
)
step("r1: Verify upstream join state and join timer")
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream join state is up and join timer is running \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r1: Verify PIM state")
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
@ -521,10 +526,11 @@ def test_add_delete_static_RP_p0(request):
step("r1: Verify ip mroutes")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: mroutes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: mroutes are still present \n Error: {}".format(
tc_name, result
))
)
step("r1: Verify show ip pim interface traffic without any IGMP join")
state_after = verify_pim_interface_traffic(tgen, state_dict)
@ -681,10 +687,12 @@ def test_SPT_RPT_path_same_p1(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S, G) upstream join state is up and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r2-eth1"
@ -811,17 +819,19 @@ def test_not_reachable_static_RP_p0(request):
"using show ip pim state"
)
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"OIL is not same and IIF is not cleared on R1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r1: upstream IIF should be unknown , verify using show ip pim" "upstream")
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: upstream IIF is not unknown \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream IIF is not unknown \n Error: {}".format(tc_name, result)
)
step(
"r1: join state should not be joined and join timer should stop,"
@ -830,10 +840,12 @@ def test_not_reachable_static_RP_p0(request):
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: join state is joined and timer is not stopped \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step(
"r1: (*,G) prune is sent towards the RP interface, verify using"
@ -850,10 +862,12 @@ def test_not_reachable_static_RP_p0(request):
step("r1: (*, G) cleared from mroute table using show ip mroute")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: (*, G) are not cleared from mroute table \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
logger.info("Expected behavior: {}".format(result))
# Uncomment next line for debugging
@ -920,10 +934,11 @@ def test_add_RP_after_join_received_p1(request):
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE_ALL, iif, rp_address, SOURCE, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: rp-info is present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: rp-info is present \n Error: {}".format(
tc_name, result
))
)
step("joinTx value before join sent")
state_dict = {"r1": {"r1-r2-eth1": ["joinTx"]}}
@ -944,34 +959,38 @@ def test_add_RP_after_join_received_p1(request):
step("r1: Verify upstream IIF interface")
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: upstream IFF interface is present \n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream IFF interface is present \n Error: {}".format(tc_name, result)
)
step("r1: Verify upstream join state and join timer")
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream join state is joined and timer is running \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r1: Verify PIM state")
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: PIM state is up\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: PIM state is up\n Error: {}".format(
tc_name, result
))
)
step("r1: Verify ip mroutes")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: mroutes are still present\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: mroutes are still present\n Error: {}".format(
tc_name, result
))
)
step("r1: Configure static RP")
input_dict = {
@ -1095,33 +1114,37 @@ def test_reachable_static_RP_after_join_p0(request):
step("r1 : Verify upstream IIF interface")
iif = "r1-r2-eth1"
result = verify_upstream_iif(tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: upstream IIF interface is present\n Error: {}".format(
tc_name, result
))
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream IIF interface is present\n Error: {}".format(tc_name, result)
)
step("r1 : Verify upstream join state and join timer")
result = verify_join_state_and_timer(
tgen, dut, iif, STAR, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: upstream join state is joined and timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r1 : Verify PIM state")
result = verify_pim_state(tgen, dut, iif, oif, GROUP_ADDRESS, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: PIM state is up\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: PIM state is up\n Error: {}".format(
tc_name, result
))
)
step("r1 : Verify ip mroutes")
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: mroutes are still present\n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: mroutes are still present\n Error: {}".format(
tc_name, result
))
)
step("r1: Make RP reachable")
intf = "r1-r2-eth1"
@ -1362,10 +1385,12 @@ def test_send_join_on_higher_preffered_rp_p1(request):
result = verify_pim_rp_info(
tgen, topo, dut, GROUP_RANGE, oif, rp_address_2, SOURCE, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: rp-info is present for group 225.1.1.1 \n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step(
"r1 : Verify RPF interface updated in mroute when higher preferred"
@ -1619,11 +1644,11 @@ def test_RP_configured_as_LHR_1_p1(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S, G) upstream join state is joined and join"
" timer is running \n Error: {}".format(
tc_name, result
))
" timer is running \n Error: {}".format(tc_name, result)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -1828,10 +1853,12 @@ def test_RP_configured_as_LHR_2_p1(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -2036,10 +2063,12 @@ def test_RP_configured_as_FHR_1_p1(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -2245,10 +2274,12 @@ def test_RP_configured_as_FHR_2_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -2372,10 +2403,12 @@ def test_SPT_RPT_path_different_p1(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -2394,10 +2427,12 @@ def test_SPT_RPT_path_different_p1(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (S, G) ip mroutes")
oif = "none"
@ -2623,10 +2658,12 @@ def test_restart_pimd_process_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -2791,10 +2828,12 @@ def test_multiple_groups_same_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -2813,10 +2852,12 @@ def test_multiple_groups_same_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (S, G) ip mroutes")
oif = "none"
@ -2932,10 +2973,12 @@ def test_multiple_groups_same_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (S, G) ip mroutes")
oif = "none"
@ -2952,10 +2995,12 @@ def test_multiple_groups_same_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -3132,10 +3177,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (S, G) ip mroutes")
oif = "none"
@ -3154,10 +3201,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -3224,10 +3273,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r4: Verify (S, G) ip mroutes")
oif = "none"
@ -3399,10 +3450,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (S, G) ip mroutes")
oif = "none"
@ -3421,10 +3474,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_1, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -3491,10 +3546,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r4: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r4: Verify (S, G) ip mroutes")
oif = "none"
@ -3513,10 +3570,12 @@ def test_multiple_groups_different_RP_address_p2(request):
result = verify_join_state_and_timer(
tgen, dut, iif, SOURCE_ADDRESS, GROUP_ADDRESS_LIST_2, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (S,G) upstream state is joined and join timer is running\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (S, G) ip mroutes")
oif = "r3-r1-eth0"
@ -3643,30 +3702,36 @@ def test_shutdown_primary_path_p1(request):
iif = "r1-r3-eth2"
oif = "r1-r0-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (*, G) ip mroutes")
dut = "r2"
iif = "lo"
oif = "r2-r3-eth1"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: Verify (*, G) ip mroutes")
dut = "r3"
iif = "r3-r2-eth1"
oif = "r3-r1-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r3: (*,G) mroutes are not cleared after shut of R1 to R3 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r3: No shutdown the link from R1 to R3 from R3 node")
dut = "r3"
@ -3826,20 +3891,24 @@ def test_delete_RP_shut_noshut_upstream_interface_p1(request):
iif = "r1-r2-eth1"
oif = "r1-r0-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (*, G) ip mroutes cleared")
dut = "r2"
iif = "lo"
oif = "r2-r1-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (*,G) mroutes are not cleared after shut of R1 to R0 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
write_test_footer(tc_name)
@ -3949,20 +4018,24 @@ def test_delete_RP_shut_noshut_RP_interface_p1(request):
iif = "r1-r2-eth1"
oif = "r1-r0-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: (*,G) mroutes are not cleared after shut of R1 to R2 and R3 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
step("r2: Verify (*, G) ip mroutes cleared")
dut = "r2"
iif = "lo"
oif = "r2-r1-eth0"
result = verify_ip_mroutes(tgen, dut, STAR, GROUP_ADDRESS, iif, oif, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r2: (*,G) mroutes are not cleared after shut of R1 to R2 and R3 link\n Error: {}".format(
tc_name, result
))
tc_name, result
)
)
write_test_footer(tc_name)

View File

@ -86,6 +86,7 @@ from mininet.topo import Topo
pytestmark = [pytest.mark.ospfd]
class TemplateTopo(Topo):
"Test topology builder"
@ -385,9 +386,7 @@ def test_rib_ipv4_step5():
pytest.skip(tgen.errors)
logger.info("Disabling SR on rt6")
tgen.net["rt6"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "no segment-routing on"'
)
tgen.net["rt6"].cmd('vtysh -c "conf t" -c "router ospf" -c "no segment-routing on"')
for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:
router_compare_json_output(
@ -652,7 +651,7 @@ def test_mpls_lib_step10():
# Expected changes:
# -All commands should be rejected
#
#def test_ospf_invalid_config_step11():
# def test_ospf_invalid_config_step11():
# logger.info("Test (step 11): check if invalid configuration is rejected")
# tgen = get_topogen()
#

View File

@ -79,6 +79,7 @@ import pytest
pytestmark = [pytest.mark.ospfd]
class OspfTeTopo(Topo):
"Test topology builder"
@ -175,6 +176,7 @@ def setup_testcase(msg):
# Note that all routers must discover the same Network Topology, so the same TED.
def test_step1():
"Step1: Check initial topology"
@ -190,12 +192,8 @@ def test_step2():
tgen = setup_testcase("Step2: Shutdown interface between r1 & r2")
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "interface r1-eth1" -c "shutdown"'
)
tgen.net["r2"].cmd(
'vtysh -c "conf t" -c "interface r2-eth1" -c "shutdown"'
)
tgen.net["r1"].cmd('vtysh -c "conf t" -c "interface r1-eth1" -c "shutdown"')
tgen.net["r2"].cmd('vtysh -c "conf t" -c "interface r2-eth1" -c "shutdown"')
for rname in ["r1", "r2", "r3", "r4"]:
compare_ted_json_output(tgen, rname, "ted_step2.json")
@ -207,9 +205,7 @@ def test_step3():
tgen = setup_testcase("Step3: Disable Inter-AS on r3")
tgen.net["r3"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "no mpls-te inter-as"'
)
tgen.net["r3"].cmd('vtysh -c "conf t" -c "router ospf" -c "no mpls-te inter-as"')
for rname in ["r1", "r2", "r3", "r4"]:
compare_ted_json_output(tgen, rname, "ted_step3.json")
@ -221,18 +217,14 @@ def test_step4():
tgen = setup_testcase("Step4: Enable Segment Routing on r1 & r2")
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing on"'
)
tgen.net["r1"].cmd('vtysh -c "conf t" -c "router ospf" -c "segment-routing on"')
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing global-block 20000 23999"'
)
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing prefix 10.0.255.1/32 index 10"'
)
tgen.net["r2"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing on"'
)
tgen.net["r2"].cmd('vtysh -c "conf t" -c "router ospf" -c "segment-routing on"')
tgen.net["r2"].cmd(
'vtysh -c "conf t" -c "router ospf" -c "segment-routing node-msd 16"'
)
@ -253,12 +245,8 @@ def test_step5():
tgen = setup_testcase("Step5: Re-enable interface between r1 & r2")
tgen.net["r1"].cmd(
'vtysh -c "conf t" -c "interface r1-eth1" -c "no shutdown"'
)
tgen.net["r2"].cmd(
'vtysh -c "conf t" -c "interface r2-eth1" -c "no shutdown"'
)
tgen.net["r1"].cmd('vtysh -c "conf t" -c "interface r1-eth1" -c "no shutdown"')
tgen.net["r2"].cmd('vtysh -c "conf t" -c "interface r2-eth1" -c "no shutdown"')
for rname in ["r1", "r2", "r3", "r4"]:
compare_ted_json_output(tgen, rname, "ted_step5.json")
@ -291,9 +279,7 @@ def test_step7():
tgen = setup_testcase("Step7: Disable OSPF on r4")
tgen.net["r4"].cmd(
'vtysh -c "conf t" -c "no router ospf"'
)
tgen.net["r4"].cmd('vtysh -c "conf t" -c "no router ospf"')
for rname in ["r1", "r2", "r3"]:
compare_ted_json_output(tgen, rname, "ted_step7.json")

View File

@ -135,25 +135,35 @@ def test_wait_protocol_convergence():
* Full/DROther
* Full/Backup
"""
result = tgen.gears[router].vtysh_cmd('show ip ospf neighbor json',
isjson=True)
if topotest.json_cmp(result, {"neighbors": {neighbor: [
{"state": "Full/DR"}]}}) is None:
result = tgen.gears[router].vtysh_cmd(
"show ip ospf neighbor json", isjson=True
)
if (
topotest.json_cmp(
result, {"neighbors": {neighbor: [{"state": "Full/DR"}]}}
)
is None
):
return None
if topotest.json_cmp(result, {"neighbors": {neighbor: [
{"state": "Full/DROther"}]}}) is None:
if (
topotest.json_cmp(
result, {"neighbors": {neighbor: [{"state": "Full/DROther"}]}}
)
is None
):
return None
return topotest.json_cmp(result, {"neighbors": {neighbor: [
{"state": "Full/Backup"}]}})
return topotest.json_cmp(
result, {"neighbors": {neighbor: [{"state": "Full/Backup"}]}}
)
_, result = topotest.run_and_expect(run_command_and_expect, None,
count=130, wait=1)
_, result = topotest.run_and_expect(
run_command_and_expect, None, count=130, wait=1
)
assertmsg = '"{}" convergence failure'.format(router)
assert result is None, assertmsg
def expect_ospfv3_neighbor_full(router, neighbor):
"Wait until OSPFv3 convergence."
logger.info("waiting OSPFv3 router '{}'".format(router))

View File

@ -141,12 +141,19 @@ def test_ospf6_default_route():
topotest.router_json_cmp,
tgen.gears[router],
"show ipv6 ospf6 database inter-prefix detail json",
{"areaScopedLinkStateDb": [{
"areaId": area,
"lsa": [{
"prefix": prefix,
"metric": metric,
}]}]},
{
"areaScopedLinkStateDb": [
{
"areaId": area,
"lsa": [
{
"prefix": prefix,
"metric": metric,
}
],
}
]
},
)
_, result = topotest.run_and_expect(test_func, None, count=4, wait=1)
assertmsg = '"{}" convergence failure'.format(router)

View File

@ -249,17 +249,20 @@ def test_ospf_chaos_tc31_p1(request):
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Bring up OSPFd daemon on R0.")
start_router_daemons(tgen, "r0", ["ospfd"])
@ -482,17 +485,20 @@ def test_ospf_chaos_tc34_p1(request):
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol,
expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Bring up staticd daemon on R0.")
start_router_daemons(tgen, "r0", ["staticd"])

View File

@ -259,19 +259,21 @@ def test_ospf_ecmp_tc16_p0(request):
shutdown_bringup_interface(tgen, dut, intf, False)
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
protocol = "ospf"
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
for intfr in range(1, 7):
intf = topo["routers"]["r1"]["links"]["r0-link{}".format(intfr)]["interface"]
@ -326,10 +328,11 @@ def test_ospf_ecmp_tc16_p0(request):
result = verify_ospf_rib(
tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
protocol = "ospf"
result = verify_rib(
@ -342,10 +345,11 @@ def test_ospf_ecmp_tc16_p0(request):
attempts=5,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Re configure the static route in R0.")
dut = "r0"
@ -432,10 +436,11 @@ def test_ospf_ecmp_tc17_p0(request):
result = verify_ospf_rib(
tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
protocol = "ospf"
result = verify_rib(
@ -448,10 +453,11 @@ def test_ospf_ecmp_tc17_p0(request):
attempts=5,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Reconfigure the static route in R0.Change ECMP value to 2.")
dut = "r0"

View File

@ -307,10 +307,11 @@ def test_ospf_lan_ecmp_tc18_p0(request):
result = verify_ospf_rib(
tgen, dut, input_dict, next_hop=nh, attempts=5, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
protocol = "ospf"
result = verify_rib(
@ -323,10 +324,11 @@ def test_ospf_lan_ecmp_tc18_p0(request):
attempts=5,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)

View File

@ -397,10 +397,11 @@ def test_ospf_lan_tc1_p0(request):
shutdown_bringup_interface(tgen, dut, intf, False)
result = verify_ospf_neighbor(tgen, topo, dut, lan=True, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r0: OSPF neighbors-hip is up \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r0: OSPF neighbors-hip is up \n Error: {}".format(
tc_name, result
))
)
step("No Shut interface on R0")
dut = "r0"

View File

@ -332,18 +332,20 @@ def test_ospf_routemaps_functionality_tc19_p0(request):
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are present in fib \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are present in fib \n Error: {}".format(
tc_name, result
))
)
step("Delete and reconfigure prefix list.")
# Create ip prefix list
@ -383,18 +385,20 @@ def test_ospf_routemaps_functionality_tc19_p0(request):
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
pfx_list = {
"r0": {
@ -438,18 +442,20 @@ def test_ospf_routemaps_functionality_tc19_p0(request):
}
}
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)
@ -496,18 +502,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request):
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, attempts=2, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, attempts=2, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
step(
"configure the route map with the same name that is used "
@ -523,18 +531,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request):
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
# Create route map
routemaps = {"r0": {"route_maps": {"rmap_ipv4": [{"action": "deny"}]}}}
@ -545,18 +555,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request):
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
step("Delete the route map.")
# Create route map
@ -573,18 +585,20 @@ def test_ospf_routemaps_functionality_tc20_p0(request):
dut = "r1"
protocol = "ospf"
result = verify_ospf_rib(tgen, dut, input_dict, expected=False)
assert result is not True, ("Testcase {} : Failed \n "
"r1: OSPF routes are present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: OSPF routes are present \n Error: {}".format(
tc_name, result
))
)
result = verify_rib(
tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False
)
assert result is not True, ("Testcase {} : Failed \n "
"r1: routes are still present \n Error: {}".format(
assert (
result is not True
), "Testcase {} : Failed \n " "r1: routes are still present \n Error: {}".format(
tc_name, result
))
)
write_test_footer(tc_name)

View File

@ -247,11 +247,11 @@ def test_ospf_redistribution_tc5_p0(request):
if result is not True:
break
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: OSPF routes are present after deleting ip address of newly "
"configured interface of R0 \n Error: {}".format(
tc_name, result
))
"configured interface of R0 \n Error: {}".format(tc_name, result)
)
protocol = "ospf"
result = verify_rib(
@ -264,11 +264,11 @@ def test_ospf_redistribution_tc5_p0(request):
attempts=5,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: OSPF routes are present in fib after deleting ip address of newly "
"configured interface of R0 \n Error: {}".format(
tc_name, result
))
"configured interface of R0 \n Error: {}".format(tc_name, result)
)
step("Add back the deleted ip address on newly configured interface of R0")
topo1 = {
@ -370,11 +370,11 @@ def test_ospf_redistribution_tc6_p0(request):
result = verify_ospf_rib(tgen, dut, input_dict, next_hop=nh, expected=False)
if result is not True:
break
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: OSPF routes are present after deleting ip address of newly "
"configured loopback of R0 \n Error: {}".format(
tc_name, result
))
"configured loopback of R0 \n Error: {}".format(tc_name, result)
)
protocol = "ospf"
result = verify_rib(
@ -386,11 +386,11 @@ def test_ospf_redistribution_tc6_p0(request):
next_hop=nh,
expected=False,
)
assert result is not True, ("Testcase {} : Failed \n "
assert result is not True, (
"Testcase {} : Failed \n "
"r1: OSPF routes are present in fib after deleting ip address of newly "
"configured loopback of R0 \n Error: {}".format(
tc_name, result
))
"configured loopback of R0 \n Error: {}".format(tc_name, result)
)
step("Add back the deleted ip address on newly configured interface of R0")
topo1 = {

View File

@ -94,12 +94,14 @@ def setup_module(mod):
tgen.start_router()
def teardown_module(_mod):
"Teardown the pytest environment"
tgen = get_topogen()
tgen.stop_topology()
def test_converge_protocols():
"Wait for protocol convergence"
@ -110,19 +112,26 @@ def test_converge_protocols():
topotest.sleep(10, "Waiting for OSPF convergence")
def ospf_configure_suppress_fa(router_name, area):
"Configure OSPF suppress-fa in router_name"
tgen = get_topogen()
router = tgen.gears[router_name]
router.vtysh_cmd("conf t\nrouter ospf\narea {} nssa suppress-fa\nexit\n".format(area))
router.vtysh_cmd(
"conf t\nrouter ospf\narea {} nssa suppress-fa\nexit\n".format(area)
)
def ospf_unconfigure_suppress_fa(router_name, area):
"Remove OSPF suppress-fa in router_name"
tgen = get_topogen()
router = tgen.gears[router_name]
router.vtysh_cmd("conf t\nrouter ospf\nno area {} nssa suppress-fa\nexit\n".format(area))
router.vtysh_cmd(
"conf t\nrouter ospf\nno area {} nssa suppress-fa\nexit\n".format(area)
)
def ospf_get_lsa_type5(router_name):
"Return a dict with link state id as key and forwarding addresses as value"
@ -141,7 +150,8 @@ def ospf_get_lsa_type5(router_name):
result[lsa] = re1.group(1)
return result
@pytest.fixture(scope='module', name='original')
@pytest.fixture(scope="module", name="original")
def test_ospf_set_suppress_fa():
"Test OSPF area [x] nssa suppress-fa"
@ -162,6 +172,7 @@ def test_ospf_set_suppress_fa():
# in the test_ospf_unset_supress_fa
return initial
def test_ospf_unset_supress_fa(original):
"Test OSPF no area [x] nssa suppress-fa"

View File

@ -949,10 +949,11 @@ def static_routes_rmap_pfxlist_p0_tc7_ebgp(request):
result4 = verify_rib(
tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
)
assert result4 is not True, ("Testcase {} : Failed \n"
"routes are still present \n Error: {}".format(
assert (
result4 is not True
), "Testcase {} : Failed \n" "routes are still present \n Error: {}".format(
tc_name, result4
))
)
step("vm4 should be present in FRR1")
dut = "r1"

View File

@ -947,10 +947,11 @@ def test_static_routes_rmap_pfxlist_p0_tc7_ibgp(request):
result4 = verify_rib(
tgen, addr_type, dut, input_dict, protocol=protocol, expected=False
)
assert result4 is not True, ("Testcase {} : Failed \n"
"routes are still present \n Error: {}".format(
assert (
result4 is not True
), "Testcase {} : Failed \n" "routes are still present \n Error: {}".format(
tc_name, result4
))
)
step("vm4 should be present in FRR1")
dut = "r1"