diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py index 040b2f4a43..1d6ea0cda0 100755 --- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py +++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py @@ -53,7 +53,8 @@ from mininet.topo import Topo from lib.common_config import ( start_topology, stop_topology, write_test_header, - write_test_footer, reset_config_on_routers + write_test_footer, reset_config_on_routers, create_static_routes, + verify_rib, verify_admin_distance_for_static_routes ) from lib.topolog import logger from lib.bgp import ( @@ -311,6 +312,277 @@ def test_bgp_timers_functionality(request): write_test_footer(tc_name) + + +def test_static_routes(request): + """ Test to create and verify static routes. """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to create static routes + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "no_of_ip": 9, + "admin_distance": 100, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Api call to redistribute static routes + input_dict_1 = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "redistribute": [ + {"redist_type": "static"}, + {"redist_type": "connected"} + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict_1) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying RIB routes + dut = 'r3' + protocol = 'bgp' + next_hop = '10.0.0.2' + result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop, + protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_admin_distance_for_existing_static_routes(request): + """ Test to modify and verify admin distance for existing static routes.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying admin distance once modified + result = verify_admin_distance_for_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_advertise_network_using_network_command(request): + """ Test advertise networks using network command.""" + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # Api call to advertise networks + input_dict = { + "r1": { + "bgp": { + "address_family": { + "ipv4": { + "unicast": { + "advertise_networks": [ + { + "network": "20.0.0.0/32", + "no_of_network": 10 + }, + { + "network": "30.0.0.0/32", + "no_of_network": 10 + } + ] + } + } + } + } + } + } + + result = create_router_bgp(tgen, topo, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Verifying RIB routes + dut = 'r2' + protocol = "bgp" + result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_clear_bgp_and_verify(request): + """ + Created few static routes and verified all routes are learned via BGP + cleared BGP and verified all routes are intact + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + # clear ip bgp + result = clear_bgp_and_verify(tgen, topo, 'r1') + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + +def test_bgp_with_loopback_interface(request): + """ + Test BGP with loopback interface + + Adding keys:value pair "dest_link": "lo" and "source_link": "lo" + peer dict of input json file for all router's creating config using + loopback interface. Once BGP neighboship is up then verifying BGP + convergence + """ + + tgen = get_topogen() + if BGP_CONVERGENCE is not True: + pytest.skip('skipped because of BGP Convergence failure') + + # test case name + tc_name = request.node.name + write_test_header(tc_name) + + # Creating configuration from JSON + reset_config_on_routers(tgen) + + for routerN in sorted(topo['routers'].keys()): + for bgp_neighbor in \ + topo['routers'][routerN]['bgp']['address_family']['ipv4'][ + 'unicast']['neighbor'].keys(): + + # Adding ['source_link'] = 'lo' key:value pair + topo['routers'][routerN]['bgp']['address_family']['ipv4'][ + 'unicast']['neighbor'][bgp_neighbor]["dest_link"] = { + 'lo': { + "source_link": "lo", + } + } + + # Creating configuration from JSON + build_config_from_json(tgen, topo) + + input_dict = { + "r1": { + "static_routes": [{ + "network": "1.0.2.17/32", + "next_hop": "10.0.0.2" + }, + { + "network": "1.0.3.17/32", + "next_hop": "10.0.0.6" + } + ] + }, + "r2": { + "static_routes": [{ + "network": "1.0.1.17/32", + "next_hop": "10.0.0.1" + }, + { + "network": "1.0.3.17/32", + "next_hop": "10.0.0.10" + } + ] + }, + "r3": { + "static_routes": [{ + "network": "1.0.1.17/32", + "next_hop": "10.0.0.5" + }, + { + "network": "1.0.2.17/32", + "next_hop": "10.0.0.9" + }, + { + "network": "1.0.4.17/32", + "next_hop": "10.0.0.14" + } + ] + }, + "r4": { + "static_routes": [{ + "network": "1.0.3.17/32", + "next_hop": "10.0.0.13" + }] + } + } + result = create_static_routes(tgen, input_dict) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + # Api call verify whether BGP is converged + result = verify_bgp_convergence(tgen, topo) + assert result is True, "Testcase {} :Failed \n Error: {}". \ + format(tc_name, result) + + write_test_footer(tc_name) + + if __name__ == '__main__': args = ["-s"] + sys.argv[1:] sys.exit(pytest.main(args)) diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py index 04e4b6ebac..a4b65cb987 100644 --- a/tests/topotests/lib/bgp.py +++ b/tests/topotests/lib/bgp.py @@ -883,14 +883,6 @@ def clear_bgp_and_verify(tgen, topo, router): peer_uptime_before_clear_bgp = {} # Verifying BGP convergence before bgp clear command for retry in range(1, 11): - show_bgp_json = rnode.vtysh_cmd("show bgp summary json", - isjson=True) - logger.info(show_bgp_json) - # Verifying output dictionary show_bgp_json is empty or not - if not bool(show_bgp_json): - errormsg = "BGP is not running" - return errormsg - sleeptime = 2 * retry if sleeptime <= BGP_CONVERGENCE_TIMEOUT: # Waiting for BGP to converge @@ -902,6 +894,14 @@ def clear_bgp_and_verify(tgen, topo, router): " router {}".format(BGP_CONVERGENCE_TIMEOUT, router) return errormsg + show_bgp_json = rnode.vtysh_cmd("show bgp summary json", + isjson=True) + logger.info(show_bgp_json) + # Verifying output dictionary show_bgp_json is empty or not + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + # To find neighbor ip type bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] for addr_type in bgp_addr_type.keys(): @@ -964,13 +964,6 @@ def clear_bgp_and_verify(tgen, topo, router): peer_uptime_after_clear_bgp = {} # Verifying BGP convergence after bgp clear command for retry in range(1, 11): - show_bgp_json = rnode.vtysh_cmd("show bgp summary json", - isjson=True) - # Verifying output dictionary show_bgp_json is empty or not - if not bool(show_bgp_json): - errormsg = "BGP is not running" - return errormsg - sleeptime = 2 * retry if sleeptime <= BGP_CONVERGENCE_TIMEOUT: # Waiting for BGP to converge @@ -982,6 +975,13 @@ def clear_bgp_and_verify(tgen, topo, router): " router {}".format(BGP_CONVERGENCE_TIMEOUT, router) return errormsg + show_bgp_json = rnode.vtysh_cmd("show bgp summary json", + isjson=True) + # Verifying output dictionary show_bgp_json is empty or not + if not bool(show_bgp_json): + errormsg = "BGP is not running" + return errormsg + # To find neighbor ip type bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index 78d8d022ad..99825dd2ad 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -20,6 +20,7 @@ from collections import OrderedDict from datetime import datetime +from time import sleep import StringIO import os import ConfigParser @@ -1095,3 +1096,253 @@ def create_route_maps(tgen, input_dict, build=False): logger.debug("Exiting lib API: create_prefix_lists()") return result + +############################################# +# Verification APIs +############################################# +def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None): + """ + Data will be read from input_dict or input JSON file, API will generate + same prefixes, which were redistributed by either create_static_routes() or + advertise_networks_using_network_command() and do will verify next_hop and + each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" + command o/p. + + Parameters + ---------- + * `tgen` : topogen object + * `addr_type` : ip type, ipv4/ipv6 + * `dut`: Device Under Test, for which user wants to test the data + * `input_dict` : input dict, has details of static routes + * `next_hop`[optional]: next_hop which needs to be verified, + default: static + * `protocol`[optional]: protocol, default = None + + Usage + ----- + # RIB can be verified for static routes OR network advertised using + network command. Following are input_dicts to create static routes + and advertise networks using network command. Any one of the input_dict + can be passed to verify_rib() to verify routes in DUT"s RIB. + + # Creating static routes for r1 + input_dict = { + "r1": { + "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \ + "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}] + }} + # Advertising networks using network command in router r1 + input_dict = { + "r1": { + "advertise_networks": [{"start_ip": "20.0.0.0/32", + "no_of_network": 10}, + {"start_ip": "30.0.0.0/32"}] + }} + # Verifying ipv4 routes in router r1 learned via BGP + dut = "r2" + protocol = "bgp" + result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_rib()") + + router_list = tgen.routers() + for routerInput in input_dict.keys(): + for router, rnode in router_list.iteritems(): + if router != dut: + continue + + # Verifying RIB routes + if addr_type == "ipv4": + if protocol: + command = "show ip route {} json".format(protocol) + else: + command = "show ip route json" + else: + if protocol: + command = "show ipv6 route {} json".format(protocol) + else: + command = "show ipv6 route json" + + sleep(2) + logger.info("Checking router %s RIB:", router) + rib_routes_json = rnode.vtysh_cmd(command, isjson=True) + + # Verifying output dictionary rib_routes_json is not empty + if bool(rib_routes_json) is False: + errormsg = "No {} route found in rib of router {}..". \ + format(protocol, router) + return errormsg + + if "static_routes" in input_dict[routerInput]: + static_routes = input_dict[routerInput]["static_routes"] + st_found = False + nh_found = False + found_routes = [] + missing_routes = [] + for static_route in static_routes: + network = static_route["network"] + if "no_of_ip" in static_route: + no_of_ip = static_route["no_of_ip"] + else: + no_of_ip = 0 + + # Generating IPs for verification + ip_list = generate_ips(network, no_of_ip) + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + if st_rt in rib_routes_json: + st_found = True + found_routes.append(st_rt) + + if next_hop: + if type(next_hop) is not list: + next_hop = [next_hop] + + found_hops = [rib_r["ip"] for rib_r in + rib_routes_json[st_rt][0][ + "nexthops"]] + for nh in next_hop: + nh_found = False + if nh and nh in found_hops: + nh_found = True + else: + errormsg = ("Nexthop {} is Missing for {}" + " route {} in RIB of router" + " {}\n".format(next_hop, + protocol, + st_rt, dut)) + + return errormsg + else: + missing_routes.append(st_rt) + if nh_found: + logger.info("Found next_hop %s for all routes in RIB of" + " router %s\n", next_hop, dut) + + if not st_found and len(missing_routes) > 0: + errormsg = "Missing route in RIB of router {}, routes: " \ + "{}\n".format(dut, missing_routes) + return errormsg + + logger.info("Verified routes in router %s RIB, found routes" + " are: %s\n", dut, found_routes) + + advertise_network = input_dict[routerInput].setdefault( + "advertise_networks", {}) + if advertise_network: + found_routes = [] + missing_routes = [] + found = False + for advertise_network_dict in advertise_network: + start_ip = advertise_network_dict["network"] + if "no_of_network" in advertise_network_dict: + no_of_network = advertise_network_dict["no_of_network"] + else: + no_of_network = 0 + + # Generating IPs for verification + ip_list = generate_ips(start_ip, no_of_network) + for st_rt in ip_list: + st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) + + if st_rt in rib_routes_json: + found = True + found_routes.append(st_rt) + else: + missing_routes.append(st_rt) + + if not found and len(missing_routes) > 0: + errormsg = "Missing route in RIB of router {}, are: {}" \ + " \n".format(dut, missing_routes) + return errormsg + + logger.info("Verified routes in router %s RIB, found routes" + " are: %s", dut, found_routes) + + logger.info("Exiting lib API: verify_rib()") + return True + + +def verify_admin_distance_for_static_routes(tgen, input_dict): + """ + API to verify admin distance for static routes as defined in input_dict/ + input JSON by running show ip/ipv6 route json command. + + Parameter + --------- + * `tgen` : topogen object + * `input_dict`: having details like - for which router and static routes + admin dsitance needs to be verified + Usage + ----- + # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop + 10.0.0.2 in router r1 + input_dict = { + "r1": { + "static_routes": [{ + "network": "10.0.20.1/32", + "admin_distance": 10, + "next_hop": "10.0.0.2" + }] + } + } + result = verify_admin_distance_for_static_routes(tgen, input_dict) + + Returns + ------- + errormsg(str) or True + """ + + logger.info("Entering lib API: verify_admin_distance_for_static_routes()") + + for router in input_dict.keys(): + if router not in tgen.routers(): + continue + + rnode = tgen.routers()[router] + + for static_route in input_dict[router]["static_routes"]: + addr_type = validate_ip_address(static_route["network"]) + # Command to execute + if addr_type == "ipv4": + command = "show ip route json" + else: + command = "show ipv6 route json" + show_ip_route_json = rnode.vtysh_cmd(command, isjson=True) + + logger.info("Verifying admin distance for static route %s" + " under dut %s:", static_route, router) + network = static_route["network"] + next_hop = static_route["next_hop"] + admin_distance = static_route["admin_distance"] + route_data = show_ip_route_json[network][0] + if network in show_ip_route_json: + if route_data["nexthops"][0]["ip"] == next_hop: + if route_data["distance"] != admin_distance: + errormsg = ("Verification failed: admin distance" + " for static route {} under dut {}," + " found:{} but expected:{}". + format(static_route, router, + route_data["distance"], + admin_distance)) + return errormsg + else: + logger.info("Verification successful: admin" + " distance for static route %s under" + " dut %s, found:%s", static_route, + router, route_data["distance"]) + + else: + errormsg = ("Static route {} not found in " + "show_ip_route_json for dut {}". + format(network, router)) + return errormsg + + logger.info("Exiting lib API: verify_admin_distance_for_static_routes()") + return True