Merge pull request #13915 from pguibert6WIND/bgp_vpnv6_per_nexthop_better_test

topotests: label per nexthop ipv6 test adds add a while loop for mpls…
This commit is contained in:
Donald Sharp 2023-07-06 16:21:19 -04:00 committed by GitHub
commit 3b496e9b5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -226,6 +226,48 @@ def bgp_vpnv6_table_check_all(router, label_list=None, same=False):
bgp_vpnv6_table_check(router, group=group, label_list=label_list)
def check_show_mpls_table(router, blacklist=None, label_list=None, whitelist=None):
nexthop_list = []
if blacklist:
nexthop_list.append(blacklist)
dump = router.vtysh_cmd("show mpls table json", isjson=True)
for in_label, label_info in dump.items():
if label_list is not None:
label_list.add(in_label)
for nh in label_info["nexthops"]:
if "installed" not in nh.keys():
return "{} {} is not installed yet on {}".format(in_label, label_info, router.name)
if nh["installed"] != True or nh["type"] != "BGP":
return "{}, show mpls table, nexthop is not installed".format(
router.name
)
if "nexthop" in nh.keys():
if nh["nexthop"] in nexthop_list:
return "{}, show mpls table, duplicated or blacklisted nexthop address".format(
router.name
)
nexthop_list.append(nh["nexthop"])
elif "interface" in nh.keys():
if nh["interface"] in nexthop_list:
return "{}, show mpls table, duplicated or blacklisted nexthop interface".format(
router.name
)
nexthop_list.append(nh["interface"])
else:
return "{}, show mpls table, entry with neither nexthop nor interface".format(
router.name
)
if whitelist:
for entry in whitelist:
if entry not in nexthop_list:
return "{}, show mpls table, entry with nexthop {} not present in nexthop list".format(
router.name, entry
)
return None
def mpls_table_check(router, blacklist=None, label_list=None, whitelist=None):
"""
Dump and check 'show mpls table json' output. An assert is triggered in case test fails
@ -234,46 +276,14 @@ def mpls_table_check(router, blacklist=None, label_list=None, whitelist=None):
* 'label_list': the list of labels that should be in inLabel value
* 'whitelist': the list of nexthops (IP or interface) that should be on output
"""
nexthop_list = []
if blacklist:
nexthop_list.append(blacklist)
logger.info("Checking MPLS labels on {}".format(router.name))
dump = router.vtysh_cmd("show mpls table json", isjson=True)
for in_label, label_info in dump.items():
if label_list is not None:
label_list.add(in_label)
for nh in label_info["nexthops"]:
assert (
nh["installed"] == True and nh["type"] == "BGP"
), "{}, show mpls table, nexthop is not installed".format(router.name)
if "nexthop" in nh.keys():
assert (
nh["nexthop"] not in nexthop_list
), "{}, show mpls table, duplicated or blacklisted nexthop address".format(
router.name
)
nexthop_list.append(nh["nexthop"])
elif "interface" in nh.keys():
assert (
nh["interface"] not in nexthop_list
), "{}, show mpls table, duplicated or blacklisted nexthop interface".format(
router.name
)
nexthop_list.append(nh["interface"])
else:
assert (
0
), "{}, show mpls table, entry with neither nexthop nor interface".format(
router.name
)
if whitelist:
for entry in whitelist:
assert (
entry in nexthop_list
), "{}, show mpls table, entry with nexthop {} not present in nexthop list".format(
router.name, entry
)
logger.info("Checking MPLS labels on {}".format(router.name))
# Check r2 removed 172.31.0.30 vpnv4 update
test_func = functools.partial(
check_show_mpls_table, router, blacklist, label_list, whitelist
)
success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
assert success, "{}, MPLS labels check fail: {}".format(router.name, result)
def check_show_bgp_vpn_prefix_not_found(router, ipversion, prefix, rd, label=None):