tests: Adding 5 test cases to bgp-basic suite

Signed-off-by: Ashish Pant <ashish12pant@gmail.com>

Adding test cases and verfication API for new test cases
This commit is contained in:
Ashish Pant 2019-06-25 06:33:09 +05:30
parent 77ef1af6e3
commit cb6b1d9058
3 changed files with 539 additions and 16 deletions

View File

@ -53,7 +53,8 @@ from mininet.topo import Topo
from lib.common_config import (
start_topology, stop_topology, write_test_header,
write_test_footer, reset_config_on_routers
write_test_footer, reset_config_on_routers, create_static_routes,
verify_rib, verify_admin_distance_for_static_routes
)
from lib.topolog import logger
from lib.bgp import (
@ -311,6 +312,277 @@ def test_bgp_timers_functionality(request):
write_test_footer(tc_name)
def test_static_routes(request):
""" Test to create and verify static routes. """
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
# Creating configuration from JSON
reset_config_on_routers(tgen)
# Api call to create static routes
input_dict = {
"r1": {
"static_routes": [{
"network": "10.0.20.1/32",
"no_of_ip": 9,
"admin_distance": 100,
"next_hop": "10.0.0.2"
}]
}
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
# Api call to redistribute static routes
input_dict_1 = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"redistribute": [
{"redist_type": "static"},
{"redist_type": "connected"}
]
}
}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
# Verifying RIB routes
dut = 'r3'
protocol = 'bgp'
next_hop = '10.0.0.2'
result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop,
protocol=protocol)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
def test_admin_distance_for_existing_static_routes(request):
""" Test to modify and verify admin distance for existing static routes."""
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
# Creating configuration from JSON
reset_config_on_routers(tgen)
input_dict = {
"r1": {
"static_routes": [{
"network": "10.0.20.1/32",
"admin_distance": 10,
"next_hop": "10.0.0.2"
}]
}
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
# Verifying admin distance once modified
result = verify_admin_distance_for_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
def test_advertise_network_using_network_command(request):
""" Test advertise networks using network command."""
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
# Creating configuration from JSON
reset_config_on_routers(tgen)
# Api call to advertise networks
input_dict = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"advertise_networks": [
{
"network": "20.0.0.0/32",
"no_of_network": 10
},
{
"network": "30.0.0.0/32",
"no_of_network": 10
}
]
}
}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
# Verifying RIB routes
dut = 'r2'
protocol = "bgp"
result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
def test_clear_bgp_and_verify(request):
"""
Created few static routes and verified all routes are learned via BGP
cleared BGP and verified all routes are intact
"""
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
# Creating configuration from JSON
reset_config_on_routers(tgen)
# clear ip bgp
result = clear_bgp_and_verify(tgen, topo, 'r1')
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
def test_bgp_with_loopback_interface(request):
"""
Test BGP with loopback interface
Adding keys:value pair "dest_link": "lo" and "source_link": "lo"
peer dict of input json file for all router's creating config using
loopback interface. Once BGP neighboship is up then verifying BGP
convergence
"""
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
# Creating configuration from JSON
reset_config_on_routers(tgen)
for routerN in sorted(topo['routers'].keys()):
for bgp_neighbor in \
topo['routers'][routerN]['bgp']['address_family']['ipv4'][
'unicast']['neighbor'].keys():
# Adding ['source_link'] = 'lo' key:value pair
topo['routers'][routerN]['bgp']['address_family']['ipv4'][
'unicast']['neighbor'][bgp_neighbor]["dest_link"] = {
'lo': {
"source_link": "lo",
}
}
# Creating configuration from JSON
build_config_from_json(tgen, topo)
input_dict = {
"r1": {
"static_routes": [{
"network": "1.0.2.17/32",
"next_hop": "10.0.0.2"
},
{
"network": "1.0.3.17/32",
"next_hop": "10.0.0.6"
}
]
},
"r2": {
"static_routes": [{
"network": "1.0.1.17/32",
"next_hop": "10.0.0.1"
},
{
"network": "1.0.3.17/32",
"next_hop": "10.0.0.10"
}
]
},
"r3": {
"static_routes": [{
"network": "1.0.1.17/32",
"next_hop": "10.0.0.5"
},
{
"network": "1.0.2.17/32",
"next_hop": "10.0.0.9"
},
{
"network": "1.0.4.17/32",
"next_hop": "10.0.0.14"
}
]
},
"r4": {
"static_routes": [{
"network": "1.0.3.17/32",
"next_hop": "10.0.0.13"
}]
}
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
# Api call verify whether BGP is converged
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
if __name__ == '__main__':
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))

View File

@ -883,14 +883,6 @@ def clear_bgp_and_verify(tgen, topo, router):
peer_uptime_before_clear_bgp = {}
# Verifying BGP convergence before bgp clear command
for retry in range(1, 11):
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
logger.info(show_bgp_json)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
@ -902,6 +894,14 @@ def clear_bgp_and_verify(tgen, topo, router):
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
logger.info(show_bgp_json)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
@ -964,13 +964,6 @@ def clear_bgp_and_verify(tgen, topo, router):
peer_uptime_after_clear_bgp = {}
# Verifying BGP convergence after bgp clear command
for retry in range(1, 11):
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
@ -982,6 +975,13 @@ def clear_bgp_and_verify(tgen, topo, router):
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]

View File

@ -20,6 +20,7 @@
from collections import OrderedDict
from datetime import datetime
from time import sleep
import StringIO
import os
import ConfigParser
@ -1095,3 +1096,253 @@ def create_route_maps(tgen, input_dict, build=False):
logger.debug("Exiting lib API: create_prefix_lists()")
return result
#############################################
# Verification APIs
#############################################
def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
"""
Data will be read from input_dict or input JSON file, API will generate
same prefixes, which were redistributed by either create_static_routes() or
advertise_networks_using_network_command() and do will verify next_hop and
each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
command o/p.
Parameters
----------
* `tgen` : topogen object
* `addr_type` : ip type, ipv4/ipv6
* `dut`: Device Under Test, for which user wants to test the data
* `input_dict` : input dict, has details of static routes
* `next_hop`[optional]: next_hop which needs to be verified,
default: static
* `protocol`[optional]: protocol, default = None
Usage
-----
# RIB can be verified for static routes OR network advertised using
network command. Following are input_dicts to create static routes
and advertise networks using network command. Any one of the input_dict
can be passed to verify_rib() to verify routes in DUT"s RIB.
# Creating static routes for r1
input_dict = {
"r1": {
"static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
"admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
}}
# Advertising networks using network command in router r1
input_dict = {
"r1": {
"advertise_networks": [{"start_ip": "20.0.0.0/32",
"no_of_network": 10},
{"start_ip": "30.0.0.0/32"}]
}}
# Verifying ipv4 routes in router r1 learned via BGP
dut = "r2"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: verify_rib()")
router_list = tgen.routers()
for routerInput in input_dict.keys():
for router, rnode in router_list.iteritems():
if router != dut:
continue
# Verifying RIB routes
if addr_type == "ipv4":
if protocol:
command = "show ip route {} json".format(protocol)
else:
command = "show ip route json"
else:
if protocol:
command = "show ipv6 route {} json".format(protocol)
else:
command = "show ipv6 route json"
sleep(2)
logger.info("Checking router %s RIB:", router)
rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) is False:
errormsg = "No {} route found in rib of router {}..". \
format(protocol, router)
return errormsg
if "static_routes" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["static_routes"]
st_found = False
nh_found = False
found_routes = []
missing_routes = []
for static_route in static_routes:
network = static_route["network"]
if "no_of_ip" in static_route:
no_of_ip = static_route["no_of_ip"]
else:
no_of_ip = 0
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
if st_rt in rib_routes_json:
st_found = True
found_routes.append(st_rt)
if next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
found_hops = [rib_r["ip"] for rib_r in
rib_routes_json[st_rt][0][
"nexthops"]]
for nh in next_hop:
nh_found = False
if nh and nh in found_hops:
nh_found = True
else:
errormsg = ("Nexthop {} is Missing for {}"
" route {} in RIB of router"
" {}\n".format(next_hop,
protocol,
st_rt, dut))
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info("Found next_hop %s for all routes in RIB of"
" router %s\n", next_hop, dut)
if not st_found and len(missing_routes) > 0:
errormsg = "Missing route in RIB of router {}, routes: " \
"{}\n".format(dut, missing_routes)
return errormsg
logger.info("Verified routes in router %s RIB, found routes"
" are: %s\n", dut, found_routes)
advertise_network = input_dict[routerInput].setdefault(
"advertise_networks", {})
if advertise_network:
found_routes = []
missing_routes = []
found = False
for advertise_network_dict in advertise_network:
start_ip = advertise_network_dict["network"]
if "no_of_network" in advertise_network_dict:
no_of_network = advertise_network_dict["no_of_network"]
else:
no_of_network = 0
# Generating IPs for verification
ip_list = generate_ips(start_ip, no_of_network)
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
if st_rt in rib_routes_json:
found = True
found_routes.append(st_rt)
else:
missing_routes.append(st_rt)
if not found and len(missing_routes) > 0:
errormsg = "Missing route in RIB of router {}, are: {}" \
" \n".format(dut, missing_routes)
return errormsg
logger.info("Verified routes in router %s RIB, found routes"
" are: %s", dut, found_routes)
logger.info("Exiting lib API: verify_rib()")
return True
def verify_admin_distance_for_static_routes(tgen, input_dict):
"""
API to verify admin distance for static routes as defined in input_dict/
input JSON by running show ip/ipv6 route json command.
Parameter
---------
* `tgen` : topogen object
* `input_dict`: having details like - for which router and static routes
admin dsitance needs to be verified
Usage
-----
# To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
10.0.0.2 in router r1
input_dict = {
"r1": {
"static_routes": [{
"network": "10.0.20.1/32",
"admin_distance": 10,
"next_hop": "10.0.0.2"
}]
}
}
result = verify_admin_distance_for_static_routes(tgen, input_dict)
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
for router in input_dict.keys():
if router not in tgen.routers():
continue
rnode = tgen.routers()[router]
for static_route in input_dict[router]["static_routes"]:
addr_type = validate_ip_address(static_route["network"])
# Command to execute
if addr_type == "ipv4":
command = "show ip route json"
else:
command = "show ipv6 route json"
show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
logger.info("Verifying admin distance for static route %s"
" under dut %s:", static_route, router)
network = static_route["network"]
next_hop = static_route["next_hop"]
admin_distance = static_route["admin_distance"]
route_data = show_ip_route_json[network][0]
if network in show_ip_route_json:
if route_data["nexthops"][0]["ip"] == next_hop:
if route_data["distance"] != admin_distance:
errormsg = ("Verification failed: admin distance"
" for static route {} under dut {},"
" found:{} but expected:{}".
format(static_route, router,
route_data["distance"],
admin_distance))
return errormsg
else:
logger.info("Verification successful: admin"
" distance for static route %s under"
" dut %s, found:%s", static_route,
router, route_data["distance"])
else:
errormsg = ("Static route {} not found in "
"show_ip_route_json for dut {}".
format(network, router))
return errormsg
logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
return True