tests: BGP : Dynamic route leak VRF lite (BGP-GR)

Authored-by: Shreenidhi A R <rshreenidhi@vmware.com>
Signed-off-by: Shreenidhi A R <rshreenidhi@vmware.com>
This commit is contained in:
ARShreenidhi 2021-12-13 21:52:03 -08:00 committed by Iqra Siddiqui
parent 641065d4fc
commit 771ac547f1
3 changed files with 782 additions and 1 deletions

View File

@ -0,0 +1,222 @@
{
"address_types": [
"ipv4",
"ipv6"
],
"ipv4base": "192.168.0.0",
"ipv4mask": 24,
"ipv6base": "fd00::",
"ipv6mask": 64,
"link_ip_start": {
"ipv4": "192.168.0.0",
"v4mask": 24,
"ipv6": "fd00::",
"v6mask": 64
},
"lo_prefix": {
"ipv4": "1.0.",
"v4mask": 32,
"ipv6": "2001:DB8:F::",
"v6mask": 128
},
"routers": {
"r1": {
"links": {
"r2-link1": {
"ipv4": "auto",
"ipv6": "auto",
"vrf": "RED"
}
},
"vrfs": [
{
"name": "RED",
"id": "1"
}
],
"bgp": [
{
"local_as": "100",
"vrf": "RED",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r1-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r1-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
}
}
}
]
},
"r2": {
"links": {
"r1-link1": {
"ipv4": "auto",
"ipv6": "auto",
"vrf": "RED"
},
"r3-link1": {
"ipv4": "auto",
"ipv6": "auto"
}
},
"vrfs": [
{
"name": "RED",
"id": "1"
}
],
"bgp": [
{
"local_as": "200",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r2-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r2-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
}
}
},
{
"local_as": "200",
"vrf": "RED",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
}
}
}
]
},
"r3": {
"links": {
"r2-link1": {
"ipv4": "auto",
"ipv6": "auto"
}
},
"bgp": [
{
"local_as": "300",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r3-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r3-link1": {}
}
}
},
"redistribute": [
{
"redist_type": "static"
}
]
}
}
}
}
]
}
}
}

View File

@ -0,0 +1,554 @@
#!/usr/bin/env python
#
# Copyright (c) 2019 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
# in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
import os
import sys
import time
import pytest
from time import sleep
import traceback
import ipaddress
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join("../"))
sys.path.append(os.path.join("../lib/"))
# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.topogen import Topogen, get_topogen
from lib.topolog import logger
# Required to instantiate the topology builder class.
# Import topoJson from lib, to create topology and initial configuration
from lib.topojson import build_config_from_json
from lib.bgp import (
clear_bgp,
verify_bgp_rib,
verify_graceful_restart,
create_router_bgp,
verify_r_bit,
verify_eor,
verify_f_bit,
verify_bgp_convergence,
verify_gr_address_family,
modify_bgp_config_when_bgpd_down,
verify_graceful_restart_timers,
verify_bgp_convergence_from_running_config,
)
# Import common_config to use commomnly used APIs
from lib.common_config import (create_common_configuration,
InvalidCLIError, retry,
generate_ips, FRRCFG_FILE,
find_interface_with_greater_ip,
check_address_types,
validate_ip_address,
run_frr_cmd,
get_frr_ipv6_linklocal)
from lib.common_config import (
write_test_header,
reset_config_on_routers,
start_topology,
kill_router_daemons,
start_router_daemons,
verify_rib,
check_address_types,
write_test_footer,
check_router_status,
step,
get_frr_ipv6_linklocal,
create_static_routes,
required_linux_kernel_version,
)
pytestmark = [pytest.mark.bgpd]
# Global variables
BGP_CONVERGENCE = False
GR_RESTART_TIMER = 5
GR_SELECT_DEFER_TIMER = 5
GR_STALEPATH_TIMER = 5
# Global variables
# STATIC_ROUTES=[]
NETWORK1_1 = {"ipv4": "192.0.2.1/32", "ipv6": "2001:DB8::1:1/128"}
NETWORK1_2 = {"ipv4": "192.0.2.2/32", "ipv6": "2001:DB8::2:1/128"}
NETWORK2_1 = {"ipv4": "192.0.2.3/32", "ipv6": "2001:DB8::3:1/128"}
NETWORK2_2 = {"ipv4": "192.0.2.4/32", "ipv6": "2001:DB8::4:1/128"}
NETWORK3_1 = {"ipv4": "192.0.2.5/32", "ipv6": "2001:DB8::5:1/128"}
NETWORK3_2 = {"ipv4": "192.0.2.6/32", "ipv6": "2001:DB8::6:1/128"}
NETWORK4_1 = {"ipv4": "192.0.2.7/32", "ipv6": "2001:DB8::7:1/128"}
NETWORK4_2 = {"ipv4": "192.0.2.8/32", "ipv6": "2001:DB8::8:1/128"}
NETWORK5_1 = {"ipv4": "192.0.2.9/32", "ipv6": "2001:DB8::9:1/128"}
NETWORK5_2 = {"ipv4": "192.0.2.10/32", "ipv6": "2001:DB8::10:1/128"}
NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
PREFERRED_NEXT_HOP = "link_local"
def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name,
dut, peer):
"""
result = configure_gr_followed_by_clear(tgen, topo, dut)
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
"""
result = create_router_bgp(tgen, topo, input_dict)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
for addr_type in ADDR_TYPES:
clear_bgp(tgen, addr_type, dut)
for addr_type in ADDR_TYPES:
clear_bgp(tgen, addr_type, peer)
result = verify_bgp_convergence_from_running_config(tgen)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
return True
def verify_stale_routes_list(tgen, addr_type, dut, input_dict):
"""
This API is use verify Stale routes on refering the network with next hop value
Parameters
----------
* `tgen`: topogen object
* `dut`: input dut router name
* `addr_type` : ip type ipv4/ipv6
* `input_dict` : input dict, has details of static routes
Usage
-----
dut = 'r1'
input_dict = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
"no_of_ip": 2,
"vrf": "RED"
}
]
}
}
result = verify_stale_routes_list(tgen, addr_type, dut, input_dict)
Returns
-------
errormsg(str) or True
"""
logger.debug("Entering lib API: verify_stale_routes_list()")
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
list1 = []
list2 = []
found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.items():
if router != dut:
continue
# Verifying RIB routes
command = "show bgp"
# Static routes
sleep(2)
logger.info('Checking router {} BGP RIB:'.format(dut))
if 'static_routes' in input_dict[routerInput]:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
found_routes = []
missing_routes = []
st_found = False
nh_found = False
vrf = static_route.setdefault("vrf", None)
community = static_route.setdefault("community", None)
largeCommunity = \
static_route.setdefault("largeCommunity", None)
if vrf:
cmd = "{} vrf {} {}".\
format(command, vrf, addr_type)
if community:
cmd = "{} community {}".\
format(cmd, community)
if largeCommunity:
cmd = "{} large-community {}".\
format(cmd, largeCommunity)
else:
cmd = "{} {}".\
format(command, addr_type)
cmd = "{} json".format(cmd)
rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) == False:
errormsg = "[DUT: {}]: No route found in rib of router". \
format(router)
return errormsg
elif "warning" in rib_routes_json:
errormsg = "[DUT: {}]: {}". \
format(router, rib_routes_json["warning"])
return errormsg
network = static_route["network"]
if "no_of_ip" in static_route:
no_of_ip = static_route["no_of_ip"]
else:
no_of_ip = 1
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(st_rt))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
if st_rt in rib_routes_json["routes"]:
st_found = True
found_routes.append(st_rt)
for mnh in range(0, len(rib_routes_json[
'routes'][st_rt])):
found_hops.append([rib_r[
"ip"] for rib_r in rib_routes_json[
'routes'][st_rt][
mnh]["nexthops"]])
return found_hops
else:
return 'error msg - no hops found'
def setup_module(mod):
"""
Sets up the pytest environment
* `mod`: module name
"""
# Required linux kernel version for this suite to run.
result = required_linux_kernel_version("4.16")
if result is not True:
pytest.skip("Kernel requirements are not met")
global ADDR_TYPES
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# This function initiates the topology build with Topogen...
json_file = "{}/bgp_gr_functionality_topo3.json".format(CWD)
tgen = Topogen(json_file, mod.__name__)
global topo
topo = tgen.json_topo
# ... and here it calls Mininet initialization functions.
# Starting topology, create tmp files which are loaded to routers
# to start deamons and then start routers
start_topology(tgen)
# Creating configuration from JSON
build_config_from_json(tgen, topo)
# Api call verify whether BGP is converged
ADDR_TYPES = check_address_types()
for addr_type in ADDR_TYPES:
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error:" " {}".format(
BGP_CONVERGENCE
)
logger.info("Running setup_module() done")
def teardown_module(mod):
"""
Teardown the pytest environment
* `mod`: module name
"""
logger.info("Running teardown_module to delete topology")
tgen = get_topogen()
# Stop toplogy and Remove tmp files
tgen.stop_topology()
logger.info(
"Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
)
logger.info("=" * 40)
################################################################################
#
# TEST CASES
#
################################################################################
def test_bgp_gr_stale_routes(request):
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)
step("Verify the router failures")
if tgen.routers_have_failure():
check_router_status(tgen)
step("Creating 5 static Routes in Router R3 with NULL0 as Next hop")
for addr_type in ADDR_TYPES:
input_dict_1 = {
"r3": {
"static_routes": [{
"network": [NETWORK1_1[addr_type]] + [NETWORK1_2[addr_type]],
"next_hop": NEXT_HOP_IP[addr_type]
},
{
"network": [NETWORK2_1[addr_type]] + [NETWORK2_2[addr_type]],
"next_hop": NEXT_HOP_IP[addr_type]
},
{
"network": [NETWORK3_1[addr_type]] + [NETWORK3_2[addr_type]],
"next_hop": NEXT_HOP_IP[addr_type]
},
{
"network": [NETWORK4_1[addr_type]] + [NETWORK4_2[addr_type]],
"next_hop": NEXT_HOP_IP[addr_type]
},
{
"network": [NETWORK5_1[addr_type]] + [NETWORK5_2[addr_type]],
"next_hop": NEXT_HOP_IP[addr_type]
}]
}
}
result = create_static_routes(tgen, input_dict_1)
assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
tc_name, result)
step("verifying Created Route at R3 in VRF default")
for addr_type in ADDR_TYPES:
dut = 'r3'
input_dict_1= {'r3': topo['routers']['r3']}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
#done
step("verifying Created Route at R2 in VRF default")
for addr_type in ADDR_TYPES:
dut = 'r2'
input_dict_1= {'r2': topo['routers']['r2']}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
step("importing vrf RED on R2 under Address Family")
for addr_type in ADDR_TYPES:
input_import_vrf={
"r2": {
"bgp": [
{
"local_as": 200,
"vrf": "RED",
"address_family": {addr_type: {"unicast": {"import": {"vrf": "default"}}}},
}
]
}
}
result = create_router_bgp(tgen, topo,input_import_vrf)
assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
#done
step("verifying static Routes at R2 in VRF RED")
for addr_type in ADDR_TYPES:
dut = 'r2'
input_dict_1= {'r2': topo['routers']['r2']}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
step("verifying static Routes at R1 in VRF RED")
for addr_type in ADDR_TYPES:
dut = 'r1'
input_dict_1= {'r1': topo['routers']['r1']}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
step("Configuring Graceful restart at R2 and R3 ")
input_dict = {
"r2": {
"bgp": {
"local_as": "200",
"graceful-restart": {
"graceful-restart": True,
}
}
},
"r3": {
"bgp": {
"local_as": "300",
"graceful-restart": {
"graceful-restart": True
}
}
}
}
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name,dut='r2', peer='r3')
step("verify Graceful restart at R2")
for addr_type in ADDR_TYPES:
result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
dut='r2', peer='r3')
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
step("verify Graceful restart at R3")
for addr_type in ADDR_TYPES:
result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
dut='r3', peer='r2')
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
step("Configuring Graceful-restart-disable at R3")
input_dict = {
"r2": {
"bgp": {
"local_as": "200",
"graceful-restart": {
"graceful-restart": False,
}
}
},
"r3": {
"bgp": {
"local_as": "300",
"graceful-restart": {
"graceful-restart": False
}
}
}
}
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name,dut='r3', peer='r2')
step("Verify Graceful-restart-disable at R3")
for addr_type in ADDR_TYPES:
result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
dut='r3', peer='r2')
assert result is True, \
"Testcase {} :Failed \n Error {}". \
format(tc_name, result)
for iteration in range(5):
step("graceful-restart-disable:True at R3")
input_dict = {
"r3": {
"bgp": {
"graceful-restart": {
"graceful-restart-disable": True,
}
}
}
}
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name,
dut='r3', peer='r2')
step("Verifying Routes at R2 on enabling GRD")
dut = 'r2'
for addr_type in ADDR_TYPES:
input_dict_1= {'r2': topo['routers']['r2']}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
step("Verify stale Routes in Router R2 enabling GRD")
for addr_type in ADDR_TYPES:
dut = "r2"
protocol = "bgp"
verify_nh_for_static_rtes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
"no_of_ip": 2,
"vrf": "RED"
}
]
}
}
bgp_rib_next_hops = verify_stale_routes_list(
tgen, addr_type, dut, verify_nh_for_static_rtes)
assert (len(bgp_rib_next_hops)== 1) is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, bgp_rib_next_hops,expected=True)
step("graceful-restart-disable:False at R3")
input_dict = {
"r3": {
"bgp": {
"graceful-restart": {
"graceful-restart-disable": False,
}
}
}
}
configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name,
dut='r3', peer='r2')
step("Verifying Routes at R2 on disabling GRD")
dut = 'r2'
for addr_type in ADDR_TYPES:
input_dict_1= {'r2': topo['routers']['r2']}
result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
step("Verify stale Routes in Router R2 on disabling GRD")
for addr_type in ADDR_TYPES:
dut = "r2"
protocol = "bgp"
verify_nh_for_static_rtes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
"no_of_ip": 2,
"vrf": "RED"
}
]
}
}
bgp_rib_next_hops = verify_stale_routes_list(tgen, addr_type, dut, verify_nh_for_static_rtes)
stale_route_status=len(bgp_rib_next_hops)== 1
assert stale_route_status is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, stale_route_status,expected=True)
write_test_footer(tc_name)

View File

@ -3061,7 +3061,12 @@ def verify_graceful_restart(
if router != dut:
continue
bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
try:
bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
except TypeError:
bgp_addr_type = topo["routers"][dut]["bgp"][0]["address_family"]
# bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
if addr_type in bgp_addr_type:
if not check_address_types(addr_type):