mirror of
https://git.proxmox.com/git/mirror_frr
synced 2025-05-28 17:44:38 +00:00
Merge pull request #7499 from mjstapp/fix_topo_dup_verify_fib
tests: remove duplicate verify_fib_routes from common_config.py
This commit is contained in:
commit
9da6c9bf20
@ -2926,262 +2926,6 @@ def verify_rib(
|
||||
return True
|
||||
|
||||
|
||||
@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
|
||||
def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
|
||||
"""
|
||||
Data will be read from input_dict or input JSON file, API will generate
|
||||
same prefixes, which were redistributed by either create_static_routes() or
|
||||
advertise_networks_using_network_command() and will verify next_hop and
|
||||
each prefix/routes is present in "show ip/ipv6 fib json"
|
||||
command o/p.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
* `tgen` : topogen object
|
||||
* `addr_type` : ip type, ipv4/ipv6
|
||||
* `dut`: Device Under Test, for which user wants to test the data
|
||||
* `input_dict` : input dict, has details of static routes
|
||||
* `next_hop`[optional]: next_hop which needs to be verified,
|
||||
default: static
|
||||
|
||||
Usage
|
||||
-----
|
||||
input_routes_r1 = {
|
||||
"r1": {
|
||||
"static_routes": [{
|
||||
"network": ["1.1.1.1/32],
|
||||
"next_hop": "Null0",
|
||||
"vrf": "RED"
|
||||
}]
|
||||
}
|
||||
}
|
||||
result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
|
||||
|
||||
Returns
|
||||
-------
|
||||
errormsg(str) or True
|
||||
"""
|
||||
|
||||
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
|
||||
|
||||
router_list = tgen.routers()
|
||||
for routerInput in input_dict.keys():
|
||||
for router, rnode in router_list.items():
|
||||
if router != dut:
|
||||
continue
|
||||
|
||||
logger.info("Checking router %s FIB routes:", router)
|
||||
|
||||
# Verifying RIB routes
|
||||
if addr_type == "ipv4":
|
||||
command = "show ip fib"
|
||||
else:
|
||||
command = "show ipv6 fib"
|
||||
|
||||
found_routes = []
|
||||
missing_routes = []
|
||||
|
||||
if "static_routes" in input_dict[routerInput]:
|
||||
static_routes = input_dict[routerInput]["static_routes"]
|
||||
|
||||
for static_route in static_routes:
|
||||
if "vrf" in static_route and static_route["vrf"] is not None:
|
||||
|
||||
logger.info(
|
||||
"[DUT: {}]: Verifying routes for VRF:"
|
||||
" {}".format(router, static_route["vrf"])
|
||||
)
|
||||
|
||||
cmd = "{} vrf {}".format(command, static_route["vrf"])
|
||||
|
||||
else:
|
||||
cmd = "{}".format(command)
|
||||
|
||||
cmd = "{} json".format(cmd)
|
||||
|
||||
rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
|
||||
|
||||
# Verifying output dictionary rib_routes_json is not empty
|
||||
if bool(rib_routes_json) is False:
|
||||
errormsg = "[DUT: {}]: No route found in fib".format(router)
|
||||
return errormsg
|
||||
|
||||
network = static_route["network"]
|
||||
if "no_of_ip" in static_route:
|
||||
no_of_ip = static_route["no_of_ip"]
|
||||
else:
|
||||
no_of_ip = 1
|
||||
|
||||
# Generating IPs for verification
|
||||
ip_list = generate_ips(network, no_of_ip)
|
||||
st_found = False
|
||||
nh_found = False
|
||||
|
||||
for st_rt in ip_list:
|
||||
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
|
||||
# st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
|
||||
|
||||
_addr_type = validate_ip_address(st_rt)
|
||||
if _addr_type != addr_type:
|
||||
continue
|
||||
|
||||
if st_rt in rib_routes_json:
|
||||
st_found = True
|
||||
found_routes.append(st_rt)
|
||||
|
||||
if next_hop:
|
||||
if type(next_hop) is not list:
|
||||
next_hop = [next_hop]
|
||||
|
||||
count = 0
|
||||
for nh in next_hop:
|
||||
for nh_dict in rib_routes_json[st_rt][0][
|
||||
"nexthops"
|
||||
]:
|
||||
if nh_dict["ip"] != nh:
|
||||
continue
|
||||
else:
|
||||
count += 1
|
||||
|
||||
if count == len(next_hop):
|
||||
nh_found = True
|
||||
else:
|
||||
missing_routes.append(st_rt)
|
||||
errormsg = (
|
||||
"Nexthop {} is Missing"
|
||||
" for route {} in "
|
||||
"RIB of router {}\n".format(
|
||||
next_hop, st_rt, dut
|
||||
)
|
||||
)
|
||||
return errormsg
|
||||
|
||||
else:
|
||||
missing_routes.append(st_rt)
|
||||
|
||||
if len(missing_routes) > 0:
|
||||
errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
|
||||
dut, missing_routes
|
||||
)
|
||||
return errormsg
|
||||
|
||||
if nh_found:
|
||||
logger.info(
|
||||
"Found next_hop {} for all routes in RIB"
|
||||
" of router {}\n".format(next_hop, dut)
|
||||
)
|
||||
|
||||
if found_routes:
|
||||
logger.info(
|
||||
"[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
|
||||
dut,
|
||||
found_routes,
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
if "bgp" in input_dict[routerInput]:
|
||||
if (
|
||||
"advertise_networks"
|
||||
not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
|
||||
"unicast"
|
||||
]
|
||||
):
|
||||
continue
|
||||
|
||||
found_routes = []
|
||||
missing_routes = []
|
||||
advertise_network = input_dict[routerInput]["bgp"]["address_family"][
|
||||
addr_type
|
||||
]["unicast"]["advertise_networks"]
|
||||
|
||||
# Continue if there are no network advertise
|
||||
if len(advertise_network) == 0:
|
||||
continue
|
||||
|
||||
for advertise_network_dict in advertise_network:
|
||||
if "vrf" in advertise_network_dict:
|
||||
cmd = "{} vrf {} json".format(command, static_route["vrf"])
|
||||
else:
|
||||
cmd = "{} json".format(command)
|
||||
|
||||
rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
|
||||
|
||||
# Verifying output dictionary rib_routes_json is not empty
|
||||
if bool(rib_routes_json) is False:
|
||||
errormsg = "No route found in rib of router {}..".format(router)
|
||||
return errormsg
|
||||
|
||||
start_ip = advertise_network_dict["network"]
|
||||
if "no_of_network" in advertise_network_dict:
|
||||
no_of_network = advertise_network_dict["no_of_network"]
|
||||
else:
|
||||
no_of_network = 1
|
||||
|
||||
# Generating IPs for verification
|
||||
ip_list = generate_ips(start_ip, no_of_network)
|
||||
st_found = False
|
||||
nh_found = False
|
||||
|
||||
for st_rt in ip_list:
|
||||
# st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
|
||||
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
|
||||
|
||||
_addr_type = validate_ip_address(st_rt)
|
||||
if _addr_type != addr_type:
|
||||
continue
|
||||
|
||||
if st_rt in rib_routes_json:
|
||||
st_found = True
|
||||
found_routes.append(st_rt)
|
||||
|
||||
if next_hop:
|
||||
if type(next_hop) is not list:
|
||||
next_hop = [next_hop]
|
||||
|
||||
count = 0
|
||||
for nh in next_hop:
|
||||
for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
|
||||
if nh_dict["ip"] != nh:
|
||||
continue
|
||||
else:
|
||||
count += 1
|
||||
|
||||
if count == len(next_hop):
|
||||
nh_found = True
|
||||
else:
|
||||
missing_routes.append(st_rt)
|
||||
errormsg = (
|
||||
"Nexthop {} is Missing"
|
||||
" for route {} in "
|
||||
"RIB of router {}\n".format(next_hop, st_rt, dut)
|
||||
)
|
||||
return errormsg
|
||||
else:
|
||||
missing_routes.append(st_rt)
|
||||
|
||||
if len(missing_routes) > 0:
|
||||
errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
|
||||
dut, missing_routes
|
||||
)
|
||||
return errormsg
|
||||
|
||||
if nh_found:
|
||||
logger.info(
|
||||
"Found next_hop {} for all routes in RIB"
|
||||
" of router {}\n".format(next_hop, dut)
|
||||
)
|
||||
|
||||
if found_routes:
|
||||
logger.info(
|
||||
"[DUT: {}]: Verified routes FIB"
|
||||
", found routes are: {}\n".format(dut, found_routes)
|
||||
)
|
||||
|
||||
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
|
||||
return True
|
||||
|
||||
|
||||
@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
|
||||
def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user