topotests: bgp_vpnv4_asbr, wait that mpls entry is installed

Ensure in the test that MPLS entries are installed before
declaring the test fails.

Signed-off-by: Philippe Guibert <philippe.guibert@6wind.com>
This commit is contained in:
Philippe Guibert 2023-07-03 10:25:35 +02:00
parent 6f0aef2ef8
commit 37b602a69d

View File

@ -205,6 +205,28 @@ def bgp_vpnv4_prefix_check(router, rd, prefix, label, nexthop):
logger.info(assert_msg + " found")
def mpls_table_get_entry(router, out_label, out_nexthop):
"""
Get the in_label from tuple (out_label, out_nexthop)
* 'router': the router to check
* 'out_label': The outgoing label expected
* 'out_nexthop': The outgoing nexthop expected
"""
dump = router.vtysh_cmd("show mpls table json", isjson=True)
for in_label, label_info in dump.items():
for nh in label_info["nexthops"]:
if nh["type"] != "BGP" or "installed" not in nh.keys():
continue
if "nexthop" in nh.keys():
if nh["nexthop"] != out_nexthop:
continue
if "outLabelStack" in nh.keys():
if out_label not in nh["outLabelStack"]:
continue
return in_label
return None
def mpls_table_check_entry(router, out_label, out_nexthop):
"""
Dump and check 'show mpls table json' output. An assert is triggered in case test fails
@ -229,13 +251,10 @@ def mpls_table_check_entry(router, out_label, out_nexthop):
router.name, in_label, nh["outLabelStack"], nh["nexthop"]
)
)
return in_label
assert (
0
), "{}, show mpls table, entry matching in_label {} out_label {} out_nexthop {} not found".format(
return None
return "{}, show mpls table, entry matching in_label {} out_label {} out_nexthop {} not found".format(
router.name, in_label, out_label, out_nexthop
)
return None
def check_ping(name, dest_addr, expect_connected):
@ -390,7 +409,13 @@ def check_show_bgp_vpn_ok(router, vpnv4_entries):
router.name, prefix, l3vpn_label, l3vpn_nh
)
)
in_label = mpls_table_check_entry(router, l3vpn_label, vpnv4_nht[l3vpn_nh])
test_func = functools.partial(
mpls_table_check_entry, router, l3vpn_label, vpnv4_nht[l3vpn_nh]
)
success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
assert success, result
in_label = mpls_table_get_entry(router, l3vpn_label, vpnv4_nht[l3vpn_nh])
label_ip_entries[prefix] = in_label
bgp_vpnv4_prefix_check(