tests: Adding new test suite bgp_as_allow_in

1. Added 5 test cases to verify BGP AS-allow-in behavior in FRR
2. Enhanced framework to support BGP AS-allow-in config(lib/bgp.py)
3. Added API in bgp.py to verify BGP RIB table(lib/bgp.py)

Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
This commit is contained in:
Kuldeep Kashyap 2020-04-01 04:41:45 +00:00
parent fdf44b9038
commit 089f79edd4
2 changed files with 588 additions and 364 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1953,3 +1953,252 @@ def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None)
logger.debug("Exiting lib API: verify_bgp_rib()")
return True
@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
"""
This API is to verify whether bgp rib has any
matching route for a nexthop.
Parameters
----------
* `tgen`: topogen object
* `dut`: input dut router name
* `addr_type` : ip type ipv4/ipv6
* `input_dict` : input dict, has details of static routes
* `next_hop`[optional]: next_hop which needs to be verified,
default = static
* 'aspath'[optional]: aspath which needs to be verified
Usage
-----
dut = 'r1'
next_hop = "192.168.1.10"
input_dict = topo['routers']
aspath = "100 200 300"
result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
next_hop, aspath)
Returns
-------
errormsg(str) or True
"""
logger.debug("Entering lib API: verify_bgp_rib()")
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
list1 = []
list2 = []
for routerInput in input_dict.keys():
for router, rnode in router_list.iteritems():
if router != dut:
continue
# Verifying RIB routes
command = "show bgp"
# Static routes
sleep(2)
logger.info("Checking router {} BGP RIB:".format(dut))
if "static_routes" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
found_routes = []
missing_routes = []
st_found = False
nh_found = False
vrf = static_route.setdefault("vrf", None)
if vrf:
cmd = "{} vrf {} {}".format(command, vrf, addr_type)
else:
cmd = "{} {}".format(command, addr_type)
cmd = "{} json".format(cmd)
rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) == False:
errormsg = "No route found in rib of router {}..".format(router)
return errormsg
network = static_route["network"]
if "no_of_ip" in static_route:
no_of_ip = static_route["no_of_ip"]
else:
no_of_ip = 1
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
if st_rt in rib_routes_json["routes"]:
st_found = True
found_routes.append(st_rt)
if next_hop:
if not isinstance(next_hop, list):
next_hop = [next_hop]
list1 = next_hop
found_hops = [
rib_r["ip"]
for rib_r in rib_routes_json["routes"][st_rt][0][
"nexthops"
]
]
list2 = found_hops
missing_list_of_nexthops = set(list2).difference(list1)
additional_nexthops_in_required_nhs = set(
list1
).difference(list2)
if list2:
if additional_nexthops_in_required_nhs:
logger.info(
"Missing nexthop %s for route"
" %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is Missing for "
"route {} in RIB of router {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
if aspath:
found_paths = rib_routes_json["routes"][st_rt][0][
"path"
]
if aspath == found_paths:
aspath_found = True
logger.info(
"Found AS path {} for route"
" {} in RIB of router "
"{}\n".format(aspath, st_rt, dut)
)
else:
errormsg = (
"AS Path {} is missing for route"
"for route {} in RIB of router {}\n".format(
aspath, st_rt, dut
)
)
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
"Found next_hop {} for all bgp"
" routes in RIB of"
" router {}\n".format(next_hop, router)
)
if len(missing_routes) > 0:
errormsg = (
"Missing route in RIB of router {}, "
"routes: {}\n".format(dut, missing_routes)
)
return errormsg
if found_routes:
logger.info(
"Verified routes in router {} BGP RIB, "
"found routes are: {} \n".format(dut, found_routes)
)
continue
if "bgp" not in input_dict[routerInput]:
continue
# Advertise networks
bgp_data_list = input_dict[routerInput]["bgp"]
if type(bgp_data_list) is not list:
bgp_data_list = [bgp_data_list]
for bgp_data in bgp_data_list:
vrf_id = bgp_data.setdefault("vrf", None)
if vrf_id:
cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
else:
cmd = "{} {}".format(command, addr_type)
cmd = "{} json".format(cmd)
rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) == False:
errormsg = "No route found in rib of router {}..".format(router)
return errormsg
bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
advertise_network = bgp_net_advertise.setdefault(
"advertise_networks", []
)
for advertise_network_dict in advertise_network:
found_routes = []
missing_routes = []
found = False
network = advertise_network_dict["network"]
if "no_of_network" in advertise_network_dict:
no_of_network = advertise_network_dict["no_of_network"]
else:
no_of_network = 1
# Generating IPs for verification
ip_list = generate_ips(network, no_of_network)
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
if st_rt in rib_routes_json["routes"]:
found = True
found_routes.append(st_rt)
else:
found = False
missing_routes.append(st_rt)
if len(missing_routes) > 0:
errormsg = (
"Missing route in BGP RIB of router {},"
" are: {}\n".format(dut, missing_routes)
)
return errormsg
if found_routes:
logger.info(
"Verified routes in router {} BGP RIB, found "
"routes are: {}\n".format(dut, found_routes)
)
logger.debug("Exiting lib API: verify_bgp_rib()")
return True