tests: Framework support for BGP Multi-VRF automation

1. Added/Enhanced APIs to created bgp multi-vrf config and to
verify config/functionality

Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
This commit is contained in:
Kuldeep Kashyap 2020-05-13 17:53:05 +00:00
parent e109bded80
commit f8d572e88f
3 changed files with 1179 additions and 326 deletions

View File

@ -75,8 +75,12 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
"address_family": {
"ipv4": {
"unicast": {
"redistribute": [
{"redist_type": "static"},
"redistribute": [{
"redist_type": "static",
"attribute": {
"metric" : 123
}
},
{"redist_type": "connected"}
],
"advertise_networks": [
@ -143,50 +147,55 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True
logger.debug("Router %s: 'bgp' not present in input_dict", router)
continue
data_all_bgp = __create_bgp_global(tgen, input_dict, router, build)
if data_all_bgp:
bgp_data = input_dict[router]["bgp"]
bgp_data_list = input_dict[router]["bgp"]
bgp_addr_data = bgp_data.setdefault("address_family", {})
if type(bgp_data_list) is not list:
bgp_data_list = [bgp_data_list]
if not bgp_addr_data:
logger.debug(
"Router %s: 'address_family' not present in " "input_dict for BGP",
router,
)
else:
for bgp_data in bgp_data_list:
data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
if data_all_bgp:
bgp_addr_data = bgp_data.setdefault("address_family", {})
ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {})
neigh_unicast = (
True
if ipv4_data.setdefault("unicast", {})
or ipv6_data.setdefault("unicast", {})
else False
)
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
tgen,
topo,
input_dict,
if not bgp_addr_data:
logger.debug(
"Router %s: 'address_family' not present in "
"input_dict for BGP",
router,
afi_test,
config_data=data_all_bgp,
)
else:
ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {})
neigh_unicast = (
True
if ipv4_data.setdefault("unicast", {})
or ipv6_data.setdefault("unicast", {})
else False
)
try:
result = create_common_configuration(
tgen, router, data_all_bgp, "bgp", build, load_config
)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
if neigh_unicast:
data_all_bgp = __create_bgp_unicast_neighbor(
tgen,
topo,
bgp_data,
router,
afi_test,
config_data=data_all_bgp,
)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
try:
result = create_common_configuration(
tgen, router, data_all_bgp, "bgp", build, load_config
)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.debug("Exiting lib API: create_router_bgp()")
return result
@ -206,19 +215,16 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
True or False
"""
result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
bgp_data = input_dict[router]["bgp"]
bgp_data = input_dict
del_bgp_action = bgp_data.setdefault("delete", False)
if del_bgp_action:
config_data = ["no router bgp"]
return config_data
config_data = []
if "local_as" not in bgp_data and build:
logger.error(
logger.debug(
"Router %s: 'local_as' not present in input_dict" "for BGP", router
)
return False
@ -229,6 +235,12 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
if vrf_id:
cmd = "{} vrf {}".format(cmd, vrf_id)
if del_bgp_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
return config_data
config_data.append(cmd)
config_data.append("no bgp ebgp-requires-policy")
@ -325,12 +337,15 @@ def __create_bgp_unicast_neighbor(
* `build` : Only for initial setup phase this is set as True.
"""
result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
add_neigh = True
bgp_data = input_dict
if "router bgp" in config_data:
add_neigh = False
bgp_data = input_dict[router]["bgp"]["address_family"]
bgp_data = input_dict["address_family"]
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict:
@ -403,14 +418,19 @@ def __create_bgp_unicast_neighbor(
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
logger.error(
logger.debug(
"Router %s: 'redist_type' not present in " "input_dict", router
)
else:
cmd = "redistribute {}".format(redistribute["redist_type"])
redist_attr = redistribute.setdefault("attribute", None)
if redist_attr:
cmd = "{} {}".format(cmd, redist_attr)
if isinstance(redist_attr, dict):
for key, value in redist_attr.items():
cmd = "{} {} {}".format(cmd, key, value)
else:
cmd = "{} {}".format(cmd, redist_attr)
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
@ -453,13 +473,18 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
config_data = []
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
bgp_data = input_dict[router]["bgp"]["address_family"]
bgp_data = input_dict["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
for name, peer_dict in neigh_data.iteritems():
for dest_link, peer in peer_dict["dest_link"].iteritems():
nh_details = topo[name]
remote_as = nh_details["bgp"]["local_as"]
if "vrfs" in topo[router]:
remote_as = nh_details["bgp"][0]["local_as"]
else:
remote_as = nh_details["bgp"]["local_as"]
update_source = None
if dest_link in nh_details["links"].keys():
@ -549,7 +574,7 @@ def __create_bgp_unicast_address_family(
config_data = []
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
bgp_data = input_dict[router]["bgp"]["address_family"]
bgp_data = input_dict["address_family"]
neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
for peer_name, peer_dict in deepcopy(neigh_data).iteritems():
@ -605,8 +630,12 @@ def __create_bgp_unicast_address_family(
allowas_in = peer.setdefault("allowas-in", None)
# next-hop-self
if next_hop_self:
config_data.append("{} next-hop-self".format(neigh_cxt))
if next_hop_self is not None:
if next_hop_self is True:
config_data.append("{} next-hop-self".format(neigh_cxt))
else:
config_data.append("no {} next-hop-self".format(neigh_cxt))
# send_community
if send_community:
config_data.append("{} send-community".format(neigh_cxt))
@ -840,80 +869,291 @@ def verify_router_id(tgen, topo, input_dict):
@retry(attempts=20, wait=2, return_is_str=True)
def verify_bgp_convergence(tgen, topo):
def verify_bgp_convergence(tgen, topo, dut=None):
"""
API will verify if BGP is converged with in the given time frame.
Running "show bgp summary json" command and verify bgp neighbor
state is established,
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
* `addr_type`: ip_type, ipv4/ipv6
* `dut`: device under test
Usage
-----
# To veriry is BGP is converged for all the routers used in
topology
results = verify_bgp_convergence(tgen, topo, "ipv4")
results = verify_bgp_convergence(tgen, topo, dut="r1")
Returns
-------
errormsg(str) or True
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
if "bgp" not in topo["routers"][router]:
continue
logger.info("Verifying BGP Convergence on router %s", router)
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
if dut is not None and dut != router:
continue
logger.info("Verifying BGP Convergence on router %s:", router)
show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
total_peer = 0
bgp_data_list = topo["routers"][router]["bgp"]
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
if type(bgp_data_list) is not list:
bgp_data_list = [bgp_data_list]
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
for bgp_data in bgp_data_list:
if "vrf" in bgp_data:
vrf = bgp_data["vrf"]
if vrf is None:
vrf = "default"
else:
vrf = "default"
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
# To find neighbor ip type
bgp_addr_type = bgp_data["address_family"]
if "l2vpn" in bgp_addr_type:
total_evpn_peer = 0
no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
neighbor_ip = data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
continue
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s", router)
else:
errormsg = "BGP is not converged for router {}".format(router)
return errormsg
bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
total_evpn_peer += len(bgp_neighbors)
no_of_evpn_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.items():
for _addr_type, dest_link_dict in peer_data.items():
data = topo["routers"][bgp_neighbor]["links"]
for dest_link in dest_link_dict.keys():
if dest_link in data:
peer_details = peer_data[_addr_type][dest_link]
neighbor_ip = data[dest_link][_addr_type].split("/")[0]
nh_state = None
if (
"ipv4Unicast" in show_bgp_json[vrf]
or "ipv6Unicast" in show_bgp_json[vrf]
):
errormsg = (
"[DUT: %s] VRF: %s, "
"ipv4Unicast/ipv6Unicast"
" address-family present"
" under l2vpn" % (router, vrf)
)
return errormsg
l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
"peers"
]
nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
if nh_state == "Established":
no_of_evpn_peer += 1
if no_of_evpn_peer == total_evpn_peer:
logger.info(
"[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
router,
vrf,
)
else:
errormsg = (
"[DUT: %s] VRF: %s, BGP is not converged "
"for evpn peers" % (router, vrf)
)
return errormsg
else:
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
total_peer = 0
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.items():
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
peer_details = peer_data["dest_link"][dest_link]
# for link local neighbors
if (
"neighbor_type" in peer_details
and peer_details["neighbor_type"] == "link-local"
):
neighbor_ip = get_ipv6_linklocal_address(
topo["routers"], bgp_neighbor, dest_link
)
elif "source_link" in peer_details:
neighbor_ip = topo["routers"][bgp_neighbor][
"links"
][peer_details["source_link"]][addr_type].split(
"/"
)[
0
]
elif (
"neighbor_type" in peer_details
and peer_details["neighbor_type"] == "unnumbered"
):
neighbor_ip = data[dest_link]["peer-interface"]
else:
neighbor_ip = data[dest_link][addr_type].split("/")[
0
]
nh_state = None
if addr_type == "ipv4":
ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
"peers"
]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
"peers"
]
nh_state = ipv6_data[neighbor_ip]["state"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
else:
errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
return errormsg
logger.debug("Exiting API: verify_bgp_convergence()")
return True
@retry(attempts=3, wait=4, return_is_str=True)
def verify_bgp_community(
tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False
):
"""
API to veiryf BGP large community is attached in route for any given
DUT by running "show bgp ipv4/6 {route address} json" command.
Parameters
----------
* `tgen`: topogen object
* `addr_type` : ip type, ipv4/ipv6
* `dut`: Device Under Test
* `network`: network for which set criteria needs to be verified
* `input_dict`: having details like - for which router, community and
values needs to be verified
* `vrf`: VRF name
* `bestpath`: To check best path cli
Usage
-----
networks = ["200.50.2.0/32"]
input_dict = {
"largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
}
result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
Returns
-------
errormsg(str) or True
"""
logger.debug("Entering lib API: verify_bgp_community()")
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
logger.info(
"Verifying BGP community attributes on dut %s: for %s " "network %s",
router,
addr_type,
network,
)
command = "show bgp"
sleep(5)
for net in network:
if vrf:
cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
elif bestpath:
cmd = "{} {} {} bestpath json".format(command, addr_type, net)
else:
cmd = "{} {} {} json".format(command, addr_type, net)
show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
if "paths" not in show_bgp_json:
return "Prefix {} not found in BGP table of router: {}".format(net, router)
as_paths = show_bgp_json["paths"]
found = False
for i in range(len(as_paths)):
if (
"largeCommunity" in show_bgp_json["paths"][i]
or "community" in show_bgp_json["paths"][i]
):
found = True
logger.info(
"Large Community attribute is found for route:" " %s in router: %s",
net,
router,
)
if input_dict is not None:
for criteria, comm_val in input_dict.items():
show_val = show_bgp_json["paths"][i][criteria]["string"]
if comm_val == show_val:
logger.info(
"Verifying BGP %s for prefix: %s"
" in router: %s, found expected"
" value: %s",
criteria,
net,
router,
comm_val,
)
else:
errormsg = (
"Failed: Verifying BGP attribute"
" {} for route: {} in router: {}"
", expected value: {} but found"
": {}".format(criteria, net, router, comm_val, show_val)
)
return errormsg
if not found:
errormsg = (
"Large Community attribute is not found for route: "
"{} in router: {} ".format(net, router)
)
return errormsg
logger.debug("Exiting lib API: verify_bgp_community()")
return True
def modify_as_number(tgen, topo, input_dict):
"""
API reads local_as and remote_as from user defined input_dict and
@ -1090,6 +1330,7 @@ def clear_bgp(tgen, addr_type, router, vrf=None):
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if router not in tgen.routers():
return False
@ -1102,9 +1343,21 @@ def clear_bgp(tgen, addr_type, router, vrf=None):
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
if addr_type == "ipv4":
run_frr_cmd(rnode, "clear ip bgp *")
if vrf:
for _vrf in vrf:
run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
else:
run_frr_cmd(rnode, "clear ip bgp *")
elif addr_type == "ipv6":
run_frr_cmd(rnode, "clear bgp ipv6 *")
if vrf:
for _vrf in vrf:
run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
else:
run_frr_cmd(rnode, "clear bgp ipv6 *")
else:
run_frr_cmd(rnode, "clear bgp *")
sleep(5)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
@ -1698,7 +1951,6 @@ def verify_best_path_as_per_bgp_attribute(
"show bgp ipv4/6 json" command will be run and verify best path according
to shortest as-path, highest local-preference and med, lowest weight and
route origin IGP>EGP>INCOMPLETE.
Parameters
----------
* `tgen` : topogen object
@ -1707,7 +1959,6 @@ def verify_best_path_as_per_bgp_attribute(
* `attribute` : calculate best path using this attribute
* `input_dict`: defines different routes to calculate for which route
best path is selected
Usage
-----
# To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
@ -1741,114 +1992,155 @@ def verify_best_path_as_per_bgp_attribute(
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
command = "show bgp {} json".format(addr_type)
# Verifying show bgp json
command = "show bgp"
sleep(5)
sleep(2)
logger.info("Verifying router %s RIB for best path:", router)
sh_ip_bgp_json = run_frr_cmd(rnode, command, isjson=True)
static_route = False
advertise_network = False
for route_val in input_dict.values():
net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
networks = net_data["advertise_networks"]
if "static_routes" in route_val:
static_route = True
networks = route_val["static_routes"]
else:
advertise_network = True
net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
networks = net_data["advertise_networks"]
for network in networks:
route = network["network"]
_network = network["network"]
no_of_ip = network.setdefault("no_of_ip", 1)
vrf = network.setdefault("vrf", None)
route_attributes = sh_ip_bgp_json["routes"][route]
_next_hop = None
compare = None
attribute_dict = {}
for route_attribute in route_attributes:
next_hops = route_attribute["nexthops"]
for next_hop in next_hops:
next_hop_ip = next_hop["ip"]
attribute_dict[next_hop_ip] = route_attribute[attribute]
# AS_PATH attribute
if attribute == "path":
# Find next_hop for the route have minimum as_path
_next_hop = min(
attribute_dict, key=lambda x: len(set(attribute_dict[x]))
)
compare = "SHORTEST"
# LOCAL_PREF attribute
elif attribute == "locPrf":
# Find next_hop for the route have highest local preference
_next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "HIGHEST"
# WEIGHT attribute
elif attribute == "weight":
# Find next_hop for the route have highest weight
_next_hop = max(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "HIGHEST"
# ORIGIN attribute
elif attribute == "origin":
# Find next_hop for the route have IGP as origin, -
# - rule is IGP>EGP>INCOMPLETE
_next_hop = [
key for (key, value) in attribute_dict.iteritems() if value == "IGP"
][0]
compare = ""
# MED attribute
elif attribute == "metric":
# Find next_hop for the route have LOWEST MED
_next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
compare = "LOWEST"
# Show ip route
if addr_type == "ipv4":
command = "show ip route json"
if vrf:
cmd = "{} vrf {}".format(command, vrf)
else:
command = "show ipv6 route json"
cmd = command
rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
cmd = "{} {}".format(cmd, addr_type)
cmd = "{} json".format(cmd)
sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if not bool(rib_routes_json):
errormsg = "No route found in RIB of router {}..".format(router)
return errormsg
routes = generate_ips(_network, no_of_ip)
for route in routes:
route = str(ipaddr.IPNetwork(unicode(route)))
st_found = False
nh_found = False
# Find best is installed in RIB
if route in rib_routes_json:
st_found = True
# Verify next_hop in rib_routes_json
if rib_routes_json[route][0]["nexthops"][0]["ip"] in attribute_dict:
nh_found = True
else:
errormsg = (
"Incorrect Nexthop for BGP route {} in "
"RIB of router {}, Expected: {}, Found:"
" {}\n".format(
route,
router,
rib_routes_json[route][0]["nexthops"][0]["ip"],
_next_hop,
if route in sh_ip_bgp_json["routes"]:
route_attributes = sh_ip_bgp_json["routes"][route]
_next_hop = None
compare = None
attribute_dict = {}
for route_attribute in route_attributes:
next_hops = route_attribute["nexthops"]
for next_hop in next_hops:
next_hop_ip = next_hop["ip"]
attribute_dict[next_hop_ip] = route_attribute[attribute]
# AS_PATH attribute
if attribute == "path":
# Find next_hop for the route have minimum as_path
_next_hop = min(
attribute_dict, key=lambda x: len(set(attribute_dict[x]))
)
)
return errormsg
compare = "SHORTEST"
if st_found and nh_found:
logger.info(
"Best path for prefix: %s with next_hop: %s is "
"installed according to %s %s: (%s) in RIB of "
"router %s",
route,
_next_hop,
compare,
attribute,
attribute_dict[_next_hop],
router,
)
# LOCAL_PREF attribute
elif attribute == "locPrf":
# Find next_hop for the route have highest local preference
_next_hop = max(
attribute_dict, key=(lambda k: attribute_dict[k])
)
compare = "HIGHEST"
# WEIGHT attribute
elif attribute == "weight":
# Find next_hop for the route have highest weight
_next_hop = max(
attribute_dict, key=(lambda k: attribute_dict[k])
)
compare = "HIGHEST"
# ORIGIN attribute
elif attribute == "origin":
# Find next_hop for the route have IGP as origin, -
# - rule is IGP>EGP>INCOMPLETE
_next_hop = [
key
for (key, value) in attribute_dict.iteritems()
if value == "IGP"
][0]
compare = ""
# MED attribute
elif attribute == "metric":
# Find next_hop for the route have LOWEST MED
_next_hop = min(
attribute_dict, key=(lambda k: attribute_dict[k])
)
compare = "LOWEST"
# Show ip route
if addr_type == "ipv4":
command_1 = "show ip route"
else:
command_1 = "show ipv6 route"
if vrf:
cmd = "{} vrf {} json".format(command_1, vrf)
else:
cmd = "{} json".format(command_1)
rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if not bool(rib_routes_json):
errormsg = "No route found in RIB of router {}..".format(router)
return errormsg
st_found = False
nh_found = False
# Find best is installed in RIB
if route in rib_routes_json:
st_found = True
# Verify next_hop in rib_routes_json
if (
rib_routes_json[route][0]["nexthops"][0]["ip"]
in attribute_dict
):
nh_found = True
else:
errormsg = (
"Incorrect Nexthop for BGP route {} in "
"RIB of router {}, Expected: {}, Found:"
" {}\n".format(
route,
router,
rib_routes_json[route][0]["nexthops"][0]["ip"],
_next_hop,
)
)
return errormsg
if st_found and nh_found:
logger.info(
"Best path for prefix: %s with next_hop: %s is "
"installed according to %s %s: (%s) in RIB of "
"router %s",
route,
_next_hop,
compare,
attribute,
attribute_dict[_next_hop],
router,
)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True
@ -1965,7 +2257,7 @@ def verify_best_path_as_per_admin_distance(
return True
@retry(attempts=10, wait=2, return_is_str=True, initial_wait=2)
@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
"""
This API is to verify whether bgp rib has any
@ -1995,7 +2287,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
errormsg(str) or True
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
logger.debug("Entering lib API: verify_bgp_rib()")
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
@ -2210,7 +2502,7 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
"routes are: {}\n".format(dut, found_routes)
)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
logger.debug("Exiting lib API: verify_bgp_rib()")
return True

File diff suppressed because it is too large Load Diff

View File

@ -37,6 +37,7 @@ from lib.common_config import (
create_prefix_lists,
create_route_maps,
create_bgp_community_lists,
create_vrf_cfg,
)
from lib.bgp import create_router_bgp
@ -49,7 +50,6 @@ def build_topo_from_json(tgen, topo):
Reads configuration from JSON file. Adds routers, creates interface
names dynamically and link routers as defined in JSON to create
topology. Assigns IPs dynamically to all interfaces of each router.
* `tgen`: Topogen object
* `topo`: json file data
"""
@ -203,6 +203,7 @@ def build_config_from_json(tgen, topo, save_bkup=True):
func_dict = OrderedDict(
[
("vrfs", create_vrf_cfg),
("links", create_interfaces_cfg),
("static_routes", create_static_routes),
("prefix_lists", create_prefix_lists),