Merge pull request #16091 from y-bharath14/srib-topotests-commits

tests: Organizing variables using format method
This commit is contained in:
Donald Sharp 2024-05-28 10:07:12 -04:00 committed by GitHub
commit c2b44a8bbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -54,7 +54,7 @@ fatal_error = ""
def build_topo(tgen):
router = tgen.add_router("r1")
for i in range(0, 10):
tgen.add_switch("sw%d" % i).add_link(router)
tgen.add_switch("sw{}".format(i)).add_link(router)
#####################################################
@ -67,7 +67,7 @@ def build_topo(tgen):
def setup_module(module):
global fatal_error
print("\n\n** %s: Setup Topology" % module.__name__)
print("\n\n** {}: Setup Topology".format(module.__name__))
print("******************************************\n")
thisDir = os.path.dirname(os.path.realpath(__file__))
@ -85,34 +85,36 @@ def setup_module(module):
#
# Main router
for i in range(1, 2):
net["r%s" % i].loadConf("mgmtd", "%s/r%s/zebra.conf" % (thisDir, i))
net["r%s" % i].loadConf("zebra", "%s/r%s/zebra.conf" % (thisDir, i))
net["r%s" % i].loadConf("ripd", "%s/r%s/ripd.conf" % (thisDir, i))
net["r%s" % i].loadConf("ripngd", "%s/r%s/ripngd.conf" % (thisDir, i))
net["r%s" % i].loadConf("ospfd", "%s/r%s/ospfd.conf" % (thisDir, i))
net["r{}".format(i)].loadConf("mgmtd", "{}/r{}/zebra.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("zebra", "{}/r{}/zebra.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("ripd", "{}/r{}/ripd.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("ripngd", "{}/r{}/ripngd.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("ospfd", "{}/r{}/ospfd.conf".format(thisDir, i))
if net["r1"].checkRouterVersion("<", "4.0"):
net["r%s" % i].loadConf(
"ospf6d", "%s/r%s/ospf6d.conf-pre-v4" % (thisDir, i)
net["r{}".format(i)].loadConf(
"ospf6d", "{}/r{}/ospf6d.conf-pre-v4".format(thisDir, i)
)
else:
net["r%s" % i].loadConf("ospf6d", "%s/r%s/ospf6d.conf" % (thisDir, i))
net["r%s" % i].loadConf("isisd", "%s/r%s/isisd.conf" % (thisDir, i))
net["r%s" % i].loadConf("bgpd", "%s/r%s/bgpd.conf" % (thisDir, i))
if net["r%s" % i].daemon_available("ldpd"):
net["r{}".format(i)].loadConf(
"ospf6d", "{}/r{}/ospf6d.conf".format(thisDir, i)
)
net["r{}".format(i)].loadConf("isisd", "{}/r{}/isisd.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("bgpd", "{}/r{}/bgpd.conf".format(thisDir, i))
if net["r{}".format(i)].daemon_available("ldpd"):
# Only test LDPd if it's installed and Kernel >= 4.5
net["r%s" % i].loadConf("ldpd", "%s/r%s/ldpd.conf" % (thisDir, i))
net["r%s" % i].loadConf("sharpd")
net["r%s" % i].loadConf("nhrpd", "%s/r%s/nhrpd.conf" % (thisDir, i))
net["r%s" % i].loadConf("babeld", "%s/r%s/babeld.conf" % (thisDir, i))
net["r%s" % i].loadConf("pbrd", "%s/r%s/pbrd.conf" % (thisDir, i))
tgen.gears["r%s" % i].start()
net["r{}".format(i)].loadConf("ldpd", "{}/r{}/ldpd.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("sharpd")
net["r{}".format(i)].loadConf("nhrpd", "{}/r{}/nhrpd.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("babeld", "{}/r{}/babeld.conf".format(thisDir, i))
net["r{}".format(i)].loadConf("pbrd", "{}/r{}/pbrd.conf".format(thisDir, i))
tgen.gears["r{}".format(i)].start()
# For debugging after starting FRR daemons, uncomment the next line
# tgen.mininet_cli()
def teardown_module(module):
print("\n\n** %s: Shutdown Topology" % module.__name__)
print("\n\n** {}: Shutdown Topology".format(module.__name__))
print("******************************************\n")
tgen = get_topogen()
tgen.stop_topology()
@ -133,7 +135,7 @@ def test_router_running():
# Starting Routers
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
# For debugging after starting FRR daemons, uncomment the next line
@ -158,7 +160,9 @@ def test_error_messages_vtysh():
#
# VTYSH output from router
vtystdout = net["r%s" % i].cmd('vtysh -c "show version" 2> /dev/null').rstrip()
vtystdout = (
net["r{}".format(i)].cmd('vtysh -c "show version" 2> /dev/null').rstrip()
)
# Fix newlines (make them all the same)
vtystdout = ("\n".join(vtystdout.splitlines()) + "\n").rstrip()
@ -166,16 +170,20 @@ def test_error_messages_vtysh():
vtystdout = re.sub(r"FRRouting [0-9]+.*", "", vtystdout, flags=re.DOTALL)
if vtystdout == "":
print("r%s StdOut ok" % i)
print("r{} StdOut ok".format(i))
assert vtystdout == "", "Vtysh StdOut Output check failed for router r%s" % i
assert (
vtystdout == ""
), "Vtysh StdOut Output check failed for router r{}".format(i)
#
# Second checking Standard Error
#
# VTYSH StdErr output from router
vtystderr = net["r%s" % i].cmd('vtysh -c "show version" > /dev/null').rstrip()
vtystderr = (
net["r{}".format(i)].cmd('vtysh -c "show version" > /dev/null').rstrip()
)
# Fix newlines (make them all the same)
vtystderr = ("\n".join(vtystderr.splitlines()) + "\n").rstrip()
@ -183,13 +191,15 @@ def test_error_messages_vtysh():
# vtystderr = re.sub(r"FRRouting [0-9]+.*", "", vtystderr, flags=re.DOTALL)
if vtystderr == "":
print("r%s StdErr ok" % i)
print("r{} StdErr ok".format(i))
assert vtystderr == "", "Vtysh StdErr Output check failed for router r%s" % i
assert (
vtystderr == ""
), "Vtysh StdErr Output check failed for router r{}".format(i)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -213,37 +223,37 @@ def test_error_messages_daemons():
error_logs = ""
for i in range(1, 2):
log = net["r%s" % i].getStdErr("ripd")
log = net["r{}".format(i)].getStdErr("ripd")
if log:
error_logs += "r%s RIPd StdErr Output:\n" % i
error_logs += "r{} RIPd StdErr Output:\n".format(i)
error_logs += log
log = net["r%s" % i].getStdErr("ripngd")
log = net["r{}".format(i)].getStdErr("ripngd")
if log:
error_logs += "r%s RIPngd StdErr Output:\n" % i
error_logs += "r{} RIPngd StdErr Output:\n".format(i)
error_logs += log
log = net["r%s" % i].getStdErr("ospfd")
log = net["r{}".format(i)].getStdErr("ospfd")
if log:
error_logs += "r%s OSPFd StdErr Output:\n" % i
error_logs += "r{} OSPFd StdErr Output:\n".format(i)
error_logs += log
log = net["r%s" % i].getStdErr("ospf6d")
log = net["r{}".format(i)].getStdErr("ospf6d")
if log:
error_logs += "r%s OSPF6d StdErr Output:\n" % i
error_logs += "r{} OSPF6d StdErr Output:\n".format(i)
error_logs += log
log = net["r%s" % i].getStdErr("isisd")
log = net["r{}".format(i)].getStdErr("isisd")
# ISIS shows debugging enabled status on StdErr
# Remove these messages
log = re.sub(r"^IS-IS .* debugging is on.*", "", log).rstrip()
if log:
error_logs += "r%s ISISd StdErr Output:\n" % i
error_logs += "r{} ISISd StdErr Output:\n".format(i)
error_logs += log
log = net["r%s" % i].getStdErr("bgpd")
log = net["r{}".format(i)].getStdErr("bgpd")
if log:
error_logs += "r%s BGPd StdErr Output:\n" % i
error_logs += "r{} BGPd StdErr Output:\n".format(i)
error_logs += log
if net["r%s" % i].daemon_available("ldpd"):
log = net["r%s" % i].getStdErr("ldpd")
if net["r{}".format(i)].daemon_available("ldpd"):
log = net["r{}".format(i)].getStdErr("ldpd")
if log:
error_logs += "r%s LDPd StdErr Output:\n" % i
error_logs += "r{} LDPd StdErr Output:\n".format(i)
error_logs += log
log = net["r1"].getStdErr("nhrpd")
@ -251,27 +261,27 @@ def test_error_messages_daemons():
# Ignore these
log = re.sub(r".*YANG model.*not embedded.*", "", log).rstrip()
if log:
error_logs += "r%s NHRPd StdErr Output:\n" % i
error_logs += "r{} NHRPd StdErr Output:\n".format(i)
error_logs += log
log = net["r1"].getStdErr("babeld")
if log:
error_logs += "r%s BABELd StdErr Output:\n" % i
error_logs += "r{} BABELd StdErr Output:\n".format(i)
error_logs += log
log = net["r1"].getStdErr("pbrd")
if log:
error_logs += "r%s PBRd StdErr Output:\n" % i
error_logs += "r{} PBRd StdErr Output:\n".format(i)
error_logs += log
log = net["r%s" % i].getStdErr("zebra")
log = net["r{}".format(i)].getStdErr("zebra")
if log:
error_logs += "r%s Zebra StdErr Output:\n" % i
error_logs += "r{} Zebra StdErr Output:\n".format(i)
error_logs += log
if error_logs:
sys.stderr.write(
"Failed check for StdErr Output on daemons:\n%s\n" % error_logs
"Failed check for StdErr Output on daemons:\n{}\n".format(error_logs)
)
# Ignoring the issue if told to ignore (ie not yet fixed)
@ -317,18 +327,20 @@ def test_converge_protocols():
# Make sure that all daemons are running
failures = 0
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
print("Show that v4 routes are right\n")
v4_routesFile = "%s/r%s/ipv4_routes.ref" % (thisDir, i)
v4_routesFile = "{}/r{}/ipv4_routes.ref".format(thisDir, i)
expected = (
net["r%s" % i].cmd("sort {} 2> /dev/null".format(v4_routesFile)).rstrip()
net["r{}".format(i)]
.cmd("sort {} 2> /dev/null".format(v4_routesFile))
.rstrip()
)
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd(
"vtysh -c \"show ip route\" | sed -e '/^Codes: /,/^\\s*$/d' | sort 2> /dev/null"
)
@ -344,24 +356,26 @@ def test_converge_protocols():
title2="Expected IP RoutingTable",
)
if diff:
sys.stderr.write("r%s failed IP Routing table check:\n%s\n" % (i, diff))
sys.stderr.write("r{} failed IP Routing table check:\n{}\n".format(i, diff))
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert failures == 0, "IP Routing table failed for r%s\n%s" % (i, diff)
assert failures == 0, "IP Routing table failed for r{}\n{}".format(i, diff)
failures = 0
print("Show that v6 routes are right\n")
v6_routesFile = "%s/r%s/ipv6_routes.ref" % (thisDir, i)
v6_routesFile = "{}/r{}/ipv6_routes.ref".format(thisDir, i)
expected = (
net["r%s" % i].cmd("sort {} 2> /dev/null".format(v6_routesFile)).rstrip()
net["r{}".format(i)]
.cmd("sort {} 2> /dev/null".format(v6_routesFile))
.rstrip()
)
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd(
"vtysh -c \"show ipv6 route\" | sed -e '/^Codes: /,/^\\s*$/d' | sort 2> /dev/null"
)
@ -377,20 +391,24 @@ def test_converge_protocols():
title2="Expected IPv6 RoutingTable",
)
if diff:
sys.stderr.write("r%s failed IPv6 Routing table check:\n%s\n" % (i, diff))
sys.stderr.write(
"r{} failed IPv6 Routing table check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert failures == 0, "IPv6 Routing table failed for r%s\n%s" % (i, diff)
assert failures == 0, "IPv6 Routing table failed for r{}\n{}".format(i, diff)
def route_get_nhg_id(route_str):
net = get_topogen().net
output = net["r1"].cmd('vtysh -c "show ip route %s nexthop-group"' % route_str)
output = net["r1"].cmd(
'vtysh -c "show ip route {} nexthop-group"'.format(route_str)
)
match = re.search(r"Nexthop Group ID: (\d+)", output)
assert match is not None, (
"Nexthop Group ID not found for sharpd route %s" % route_str
assert match is not None, "Nexthop Group ID not found for sharpd route {}".format(
route_str
)
nhg_id = int(match.group(1))
@ -410,7 +428,7 @@ def verify_nexthop_group(nhg_id, recursive=False, ecmp=0):
while not found and count < 10:
count += 1
# Verify NHG is valid/installed
output = net["r1"].cmd('vtysh -c "show nexthop-group rib %d"' % nhg_id)
output = net["r1"].cmd('vtysh -c "show nexthop-group rib {}"'.format(nhg_id))
valid = re.search(r"Valid", output)
if valid is None:
found = False
@ -449,20 +467,24 @@ def verify_nexthop_group(nhg_id, recursive=False, ecmp=0):
continue
found = True
assert valid is not None, "Nexthop Group ID=%d not marked Valid" % nhg_id
assert valid is not None, "Nexthop Group ID={} not marked Valid".format(nhg_id)
if ecmp or recursive:
assert ecmpcount is not None, "Nexthop Group ID=%d has no depends" % nhg_id
assert ecmpcount is not None, "Nexthop Group ID={} has no depends".format(
nhg_id
)
if ecmp:
assert len(depends) == ecmp, (
"Nexthop Group ID=%d doesn't match ecmp size" % nhg_id
)
assert (
len(depends) == ecmp
), "Nexthop Group ID={} doesn't match ecmp size".format(nhg_id)
else:
assert len(depends) == 1, (
"Nexthop Group ID=%d should only have one recursive depend" % nhg_id
assert (
len(depends) == 1
), "Nexthop Group ID={} should only have one recursive depend".format(
nhg_id
)
else:
assert installed is not None, (
"Nexthop Group ID=%d not marked Installed" % nhg_id
assert installed is not None, "Nexthop Group ID={} not marked Installed".format(
nhg_id
)
@ -600,7 +622,7 @@ def test_nexthop_groups():
dups = []
nhg_id = route_get_nhg_id("6.6.6.1/32")
while (len(dups) != 4) and count < 10:
output = net["r1"].cmd('vtysh -c "show nexthop-group rib %d"' % nhg_id)
output = net["r1"].cmd('vtysh -c "show nexthop-group rib {}"'.format(nhg_id))
dups = re.findall(r"(via 1\.1\.1\.1)", output)
if len(dups) != 4:
@ -608,9 +630,10 @@ def test_nexthop_groups():
sleep(1)
# Should find 3, itself is inactive
assert len(dups) == 4, (
"Route 6.6.6.1/32 with Nexthop Group ID=%d has wrong number of resolved nexthops"
% nhg_id
assert (
len(dups) == 4
), "Route 6.6.6.1/32 with Nexthop Group ID={} has wrong number of resolved nexthops".format(
nhg_id
)
## Remove all NHG routes
@ -640,7 +663,7 @@ def test_rip_status():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/rip_status.ref" % (thisDir, i)
refTableFile = "{}/r{}/rip_status.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -649,7 +672,7 @@ def test_rip_status():
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd('vtysh -c "show ip rip status" 2> /dev/null')
.rstrip()
)
@ -670,16 +693,20 @@ def test_rip_status():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write("r%s failed IP RIP status check:\n%s\n" % (i, diff))
sys.stderr.write(
"r{} failed IP RIP status check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert failures == 0, "IP RIP status failed for router r%s:\n%s" % (i, diff)
assert failures == 0, "IP RIP status failed for router r{}:\n{}".format(
i, diff
)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -697,7 +724,7 @@ def test_ripng_status():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/ripng_status.ref" % (thisDir, i)
refTableFile = "{}/r{}/ripng_status.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -706,7 +733,7 @@ def test_ripng_status():
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd('vtysh -c "show ipv6 ripng status" 2> /dev/null')
.rstrip()
)
@ -730,20 +757,19 @@ def test_ripng_status():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write(
"r%s failed IPv6 RIPng status check:\n%s\n" % (i, diff)
"r{} failed IPv6 RIPng status check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert failures == 0, "IPv6 RIPng status failed for router r%s:\n%s" % (
i,
diff,
assert failures == 0, "IPv6 RIPng status failed for router r{}:\n{}".format(
i, diff
)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -761,7 +787,7 @@ def test_ospfv2_interfaces():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/show_ip_ospf_interface.ref" % (thisDir, i)
refTableFile = "{}/r{}/show_ip_ospf_interface.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -770,7 +796,7 @@ def test_ospfv2_interfaces():
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd('vtysh -c "show ip ospf interface" 2> /dev/null')
.rstrip()
)
@ -803,11 +829,11 @@ def test_ospfv2_interfaces():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write(
"r%s failed SHOW IP OSPF INTERFACE check:\n%s\n" % (i, diff)
"r{} failed SHOW IP OSPF INTERFACE check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
# Ignoring the issue if told to ignore (ie not yet fixed)
if failures != 0:
@ -821,11 +847,11 @@ def test_ospfv2_interfaces():
assert (
failures == 0
), "SHOW IP OSPF INTERFACE failed for router r%s:\n%s" % (i, diff)
), "SHOW IP OSPF INTERFACE failed for router r{}:\n{}".format(i, diff)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -843,7 +869,7 @@ def test_isis_interfaces():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/show_isis_interface_detail.ref" % (thisDir, i)
refTableFile = "{}/r{}/show_isis_interface_detail.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -852,7 +878,7 @@ def test_isis_interfaces():
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd('vtysh -c "show isis interface detail" 2> /dev/null')
.rstrip()
)
@ -876,19 +902,19 @@ def test_isis_interfaces():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write(
"r%s failed SHOW ISIS INTERFACE DETAIL check:\n%s\n" % (i, diff)
"r{} failed SHOW ISIS INTERFACE DETAIL check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert (
failures == 0
), "SHOW ISIS INTERFACE DETAIL failed for router r%s:\n%s" % (i, diff)
), "SHOW ISIS INTERFACE DETAIL failed for router r{}:\n{}".format(i, diff)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -906,7 +932,7 @@ def test_bgp_summary():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/show_ip_bgp_summary.ref" % (thisDir, i)
refTableFile = "{}/r{}/show_ip_bgp_summary.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected_original = open(refTableFile).read().rstrip()
@ -933,7 +959,7 @@ def test_bgp_summary():
]:
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd(
'vtysh -c "show ip bgp summary ' + arguments + '" 2> /dev/null'
)
@ -1049,22 +1075,19 @@ def test_bgp_summary():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write(
"r%s failed SHOW IP BGP SUMMARY check:\n%s\n" % (i, diff)
"r{} failed SHOW IP BGP SUMMARY check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert (
failures == 0
), "SHOW IP BGP SUMMARY failed for router r%s:\n%s" % (
i,
diff,
)
), "SHOW IP BGP SUMMARY failed for router r{}:\n{}".format(i, diff)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -1082,7 +1105,7 @@ def test_bgp_ipv6_summary():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/show_bgp_ipv6_summary.ref" % (thisDir, i)
refTableFile = "{}/r{}/show_bgp_ipv6_summary.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -1091,7 +1114,7 @@ def test_bgp_ipv6_summary():
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd('vtysh -c "show bgp ipv6 summary" 2> /dev/null')
.rstrip()
)
@ -1147,20 +1170,19 @@ def test_bgp_ipv6_summary():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write(
"r%s failed SHOW BGP IPv6 SUMMARY check:\n%s\n" % (i, diff)
"r{} failed SHOW BGP IPv6 SUMMARY check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert failures == 0, "SHOW BGP IPv6 SUMMARY failed for router r%s:\n%s" % (
i,
diff,
)
assert (
failures == 0
), "SHOW BGP IPv6 SUMMARY failed for router r{}:\n{}".format(i, diff)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -1177,11 +1199,13 @@ def test_nht():
thisDir = os.path.dirname(os.path.realpath(__file__))
for i in range(1, 2):
nhtFile = "%s/r%s/ip_nht.ref" % (thisDir, i)
nhtFile = "{}/r{}/ip_nht.ref".format(thisDir, i)
expected = open(nhtFile).read().rstrip()
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = net["r%s" % i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip()
actual = (
net["r{}".format(i)].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip()
)
actual = re.sub(r"fd [0-9]+", "fd XX", actual)
actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1)
@ -1193,15 +1217,17 @@ def test_nht():
)
if diff:
assert 0, "r%s failed ip nht check:\n%s\n" % (i, diff)
assert 0, "r{} failed ip nht check:\n{}\n".format(i, diff)
else:
print("show ip nht is ok\n")
nhtFile = "%s/r%s/ipv6_nht.ref" % (thisDir, i)
nhtFile = "{}/r{}/ipv6_nht.ref".format(thisDir, i)
expected = open(nhtFile).read().rstrip()
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = net["r%s" % i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip()
actual = (
net["r{}".format(i)].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip()
)
actual = re.sub(r"fd [0-9]+", "fd XX", actual)
actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1)
@ -1213,7 +1239,7 @@ def test_nht():
)
if diff:
assert 0, "r%s failed ipv6 nht check:\n%s\n" % (i, diff)
assert 0, "r{} failed ipv6 nht check:\n{}\n".format(i, diff)
else:
print("show ipv6 nht is ok\n")
@ -1233,7 +1259,7 @@ def test_bgp_ipv4():
diffresult = {}
for i in range(1, 2):
success = 0
for refTableFile in glob.glob("%s/r%s/show_bgp_ipv4*.ref" % (thisDir, i)):
for refTableFile in glob.glob("{}/r{}/show_bgp_ipv4*.ref".format(thisDir, i)):
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -1242,7 +1268,9 @@ def test_bgp_ipv4():
# Actual output from router
actual = (
net["r%s" % i].cmd('vtysh -c "show bgp ipv4" 2> /dev/null').rstrip()
net["r{}".format(i)]
.cmd('vtysh -c "show bgp ipv4" 2> /dev/null')
.rstrip()
)
# Remove summary line (changed recently)
actual = re.sub(r"Total number.*", "", actual)
@ -1264,24 +1292,26 @@ def test_bgp_ipv4():
diffresult[refTableFile] = diff
else:
success = 1
print("template %s matched: r%s ok" % (refTableFile, i))
print("template {} matched: r{} ok".format(refTableFile, i))
break
if not success:
resultstr = "No template matched.\n"
for f in diffresult.keys():
resultstr += "template %s: r%s failed SHOW BGP IPv4 check:\n%s\n" % (
f,
i,
diffresult[f],
resultstr += (
"template {}: r{} failed SHOW BGP IPv4 check:\n{}\n".format(
f,
i,
diffresult[f],
)
)
raise AssertionError(
"SHOW BGP IPv4 failed for router r%s:\n%s" % (i, resultstr)
"SHOW BGP IPv4 failed for router r{}:\n{}".format(i, resultstr)
)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -1300,7 +1330,7 @@ def test_bgp_ipv6():
diffresult = {}
for i in range(1, 2):
success = 0
for refTableFile in glob.glob("%s/r%s/show_bgp_ipv6*.ref" % (thisDir, i)):
for refTableFile in glob.glob("{}/r{}/show_bgp_ipv6*.ref".format(thisDir, i)):
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -1309,7 +1339,9 @@ def test_bgp_ipv6():
# Actual output from router
actual = (
net["r%s" % i].cmd('vtysh -c "show bgp ipv6" 2> /dev/null').rstrip()
net["r{}".format(i)]
.cmd('vtysh -c "show bgp ipv6" 2> /dev/null')
.rstrip()
)
# Remove summary line (changed recently)
actual = re.sub(r"Total number.*", "", actual)
@ -1331,23 +1363,25 @@ def test_bgp_ipv6():
diffresult[refTableFile] = diff
else:
success = 1
print("template %s matched: r%s ok" % (refTableFile, i))
print("template {} matched: r{} ok".format(refTableFile, i))
if not success:
resultstr = "No template matched.\n"
for f in diffresult.keys():
resultstr += "template %s: r%s failed SHOW BGP IPv6 check:\n%s\n" % (
f,
i,
diffresult[f],
resultstr += (
"template {}: r{} failed SHOW BGP IPv6 check:\n{}\n".format(
f,
i,
diffresult[f],
)
)
raise AssertionError(
"SHOW BGP IPv6 failed for router r%s:\n%s" % (i, resultstr)
"SHOW BGP IPv6 failed for router r{}:\n{}".format(i, resultstr)
)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -1364,13 +1398,15 @@ def test_route_map():
print("*******************************************************\n")
failures = 0
for i in range(1, 2):
refroutemap = "%s/r%s/show_route_map.ref" % (thisDir, i)
refroutemap = "{}/r{}/show_route_map.ref".format(thisDir, i)
if os.path.isfile(refroutemap):
expected = open(refroutemap).read().rstrip()
expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1)
actual = (
net["r%s" % i].cmd('vtysh -c "show route-map" 2> /dev/null').rstrip()
net["r{}".format(i)]
.cmd('vtysh -c "show route-map" 2> /dev/null')
.rstrip()
)
actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1)
@ -1383,15 +1419,15 @@ def test_route_map():
if diff:
sys.stderr.write(
"r%s failed show route-map command Check:\n%s\n" % (i, diff)
"r{} failed show route-map command Check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
assert (
failures == 0
), "Show route-map command failed for router r%s:\n%s" % (i, diff)
), "Show route-map command failed for router r{}:\n{}".format(i, diff)
def test_nexthop_groups_with_route_maps():
@ -1418,28 +1454,34 @@ def test_nexthop_groups_with_route_maps():
src_str = "192.168.0.1"
net["r1"].cmd(
'vtysh -c "c t" -c "route-map NH-SRC permit 111" -c "set src %s"' % src_str
'vtysh -c "c t" -c "route-map NH-SRC permit 111" -c "set src {}"'.format(
src_str
)
)
net["r1"].cmd('vtysh -c "c t" -c "ip protocol sharp route-map NH-SRC"')
net["r1"].cmd('vtysh -c "sharp install routes %s nexthop-group test 1"' % route_str)
net["r1"].cmd(
'vtysh -c "sharp install routes {} nexthop-group test 1"'.format(route_str)
)
verify_route_nexthop_group("%s/32" % route_str)
verify_route_nexthop_group("{}/32".format(route_str))
# Only a valid test on linux using nexthop objects
if sys.platform.startswith("linux"):
output = net["r1"].cmd("ip route show %s/32" % route_str)
match = re.search(r"src %s" % src_str, output)
assert match is not None, "Route %s/32 not installed with src %s" % (
output = net["r1"].cmd("ip route show {}/32".format(route_str))
match = re.search(r"src {}".format(src_str), output)
assert match is not None, "Route {}/32 not installed with src {}".format(
route_str,
src_str,
)
# Remove NHG routes and route-map
net["r1"].cmd('vtysh -c "sharp remove routes %s 1"' % route_str)
net["r1"].cmd('vtysh -c "sharp remove routes {} 1"'.format(route_str))
net["r1"].cmd('vtysh -c "c t" -c "no ip protocol sharp route-map NH-SRC"')
net["r1"].cmd(
'vtysh -c "c t" -c "no route-map NH-SRC permit 111" # -c "set src %s"' % src_str
'vtysh -c "c t" -c "no route-map NH-SRC permit 111" # -c "set src {}"'.format(
src_str
)
)
net["r1"].cmd('vtysh -c "c t" -c "no route-map NH-SRC"')
@ -1449,7 +1491,9 @@ def test_nexthop_groups_with_route_maps():
deny_route_str = "3.3.3.2"
net["r1"].cmd(
'vtysh -c "c t" -c "ip prefix-list NOPE seq 5 permit %s/32"' % permit_route_str
'vtysh -c "c t" -c "ip prefix-list NOPE seq 5 permit {}/32"'.format(
permit_route_str
)
)
net["r1"].cmd(
'vtysh -c "c t" -c "route-map NOPE permit 111" -c "match ip address prefix-list NOPE"'
@ -1459,35 +1503,42 @@ def test_nexthop_groups_with_route_maps():
# This route should be permitted
net["r1"].cmd(
'vtysh -c "sharp install routes %s nexthop-group test 1"' % permit_route_str
'vtysh -c "sharp install routes {} nexthop-group test 1"'.format(
permit_route_str
)
)
verify_route_nexthop_group("%s/32" % permit_route_str)
verify_route_nexthop_group("{}/32".format(permit_route_str))
# This route should be denied
net["r1"].cmd(
'vtysh -c "sharp install routes %s nexthop-group test 1"' % deny_route_str
'vtysh -c "sharp install routes {} nexthop-group test 1"'.format(deny_route_str)
)
nhg_id = route_get_nhg_id(deny_route_str)
output = net["r1"].cmd('vtysh -c "show nexthop-group rib %d"' % nhg_id)
output = net["r1"].cmd('vtysh -c "show nexthop-group rib {}"'.format(nhg_id))
match = re.search(r"Valid", output)
assert match is None, "Nexthop Group ID=%d should not be marked Valid" % nhg_id
assert match is None, "Nexthop Group ID={} should not be marked Valid".format(
nhg_id
)
match = re.search(r"Installed", output)
assert match is None, "Nexthop Group ID=%d should not be marked Installed" % nhg_id
assert match is None, "Nexthop Group ID={} should not be marked Installed".format(
nhg_id
)
# Remove NHG routes and route-map
net["r1"].cmd('vtysh -c "sharp remove routes %s 1"' % permit_route_str)
net["r1"].cmd('vtysh -c "sharp remove routes %s 1"' % deny_route_str)
net["r1"].cmd('vtysh -c "sharp remove routes {} 1"'.format(permit_route_str))
net["r1"].cmd('vtysh -c "sharp remove routes {} 1"'.format(deny_route_str))
net["r1"].cmd('vtysh -c "c t" -c "no ip protocol sharp route-map NOPE"')
net["r1"].cmd('vtysh -c "c t" -c "no route-map NOPE permit 111"')
net["r1"].cmd('vtysh -c "c t" -c "no route-map NOPE deny 222"')
net["r1"].cmd('vtysh -c "c t" -c "no route-map NOPE"')
net["r1"].cmd(
'vtysh -c "c t" -c "no ip prefix-list NOPE seq 5 permit %s/32"'
% permit_route_str
'vtysh -c "c t" -c "no ip prefix-list NOPE seq 5 permit {}/32"'.format(
permit_route_str
)
)
@ -1550,7 +1601,7 @@ def test_mpls_interfaces():
print("******************************************\n")
failures = 0
for i in range(1, 2):
refTableFile = "%s/r%s/show_mpls_ldp_interface.ref" % (thisDir, i)
refTableFile = "{}/r{}/show_mpls_ldp_interface.ref".format(thisDir, i)
if os.path.isfile(refTableFile):
# Read expected result from file
expected = open(refTableFile).read().rstrip()
@ -1559,7 +1610,7 @@ def test_mpls_interfaces():
# Actual output from router
actual = (
net["r%s" % i]
net["r{}".format(i)]
.cmd('vtysh -c "show mpls ldp interface" 2> /dev/null')
.rstrip()
)
@ -1579,22 +1630,22 @@ def test_mpls_interfaces():
# Empty string if it matches, otherwise diff contains unified diff
if diff:
sys.stderr.write(
"r%s failed MPLS LDP Interface status Check:\n%s\n" % (i, diff)
"r{} failed MPLS LDP Interface status Check:\n{}\n".format(i, diff)
)
failures += 1
else:
print("r%s ok" % i)
print("r{} ok".format(i))
if failures > 0:
fatal_error = "MPLS LDP Interface status failed"
assert (
failures == 0
), "MPLS LDP Interface status failed for router r%s:\n%s" % (i, diff)
), "MPLS LDP Interface status failed for router r{}:\n{}".format(i, diff)
# Make sure that all daemons are running
for i in range(1, 2):
fatal_error = net["r%s" % i].checkRouterRunning()
fatal_error = net["r{}".format(i)].checkRouterRunning()
assert fatal_error == "", fatal_error
@ -1707,8 +1758,8 @@ def test_shutdown_check_memleak():
thisDir = os.path.dirname(os.path.realpath(__file__))
for i in range(1, 2):
net["r%s" % i].stopRouter()
net["r%s" % i].report_memory_leaks(
net["r{}".format(i)].stopRouter()
net["r{}".format(i)].report_memory_leaks(
os.environ.get("TOPOTESTS_CHECK_MEMLEAK"), os.path.basename(__file__)
)