topotests: Add supported topotests for bgpd vrf-lite best path selection

Co-authored-by: Kuldeep Kashyap <kashyapk@vmware.com>
Signed-off-by: Iqra Siddiqui <imujeebsiddi@vmware.com>
This commit is contained in:
Iqra Siddiqui 2021-09-30 03:24:49 -07:00 committed by Kuldeep Kashyap
parent ad1844f7bd
commit 687c62fc2a
5 changed files with 3196 additions and 2 deletions

View File

@ -0,0 +1,563 @@
{
"address_types": ["ipv4","ipv6"],
"ipv4base": "10.0.0.0",
"ipv4mask": 30,
"ipv6base": "fd00::",
"ipv6mask": 64,
"link_ip_start": {
"ipv4": "10.0.0.0",
"v4mask": 30,
"ipv6": "fd00::",
"v6mask": 64
},
"lo_prefix": {
"ipv4": "1.0.",
"v4mask": 32,
"ipv6": "2001:db8:f::",
"v6mask": 128
},
"routers": {
"r1": {
"links": {
"r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "ISR"},
"r3-link1": {"ipv4": "auto", "ipv6": "auto"},
"r4-link1": {"ipv4": "auto", "ipv6": "auto"}
},
"vrfs":[
{
"name": "ISR",
"id": "1"
}
],
"bgp":
[
{
"local_as": "100",
"vrf": "ISR",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r1-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"next_hop_self": true
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r1-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"next_hop_self": true,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
},
{
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
},
{
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r4": {
"dest_link": {
"r1-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r4": {
"dest_link": {
"r1-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
}
],
"static_routes":[
{
"network": ["11.11.11.1/32", "11.11.11.11/32"],
"next_hop":"Null0",
"vrf": "ISR"
},
{
"network": ["11:11::1/128", "11:11::11/128"],
"next_hop":"Null0",
"vrf": "ISR"
},
{
"network": ["10.10.10.10/32", "10.10.10.100/32"],
"next_hop":"Null0"
},
{
"network": ["10:10::10/128", "10:10::100/128"],
"next_hop":"Null0"
}
],
"route_maps": {
"rmap_global": [{
"action": "permit",
"set": {
"ipv6": {
"nexthop": "prefer-global"
}
}
}]
}
},
"r2": {
"links": {
"r1-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "ISR"},
"r3-link1": {"ipv4": "auto", "ipv6": "auto"},
"r4-link1": {"ipv4": "auto", "ipv6": "auto"}
},
"vrfs":[
{
"name": "ISR",
"id": "1"
}
],
"bgp":
[
{
"local_as": "100",
"vrf": "ISR",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"next_hop_self": true
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"next_hop_self": true,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
},
{
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r2-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r2-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
},
{
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r4": {
"dest_link": {
"r2-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r4": {
"dest_link": {
"r2-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
}
],
"static_routes":[
{
"network": ["22.22.22.2/32", "22.22.22.22/32"],
"next_hop":"Null0",
"vrf": "ISR"
},
{
"network": ["22:22::2/128", "22:22::22/128"],
"next_hop":"Null0",
"vrf": "ISR"
},
{
"network": ["20.20.20.20/32", "20.20.20.200/32"],
"next_hop":"Null0"
},
{
"network": ["20:20::20/128", "20:20::200/128"],
"next_hop":"Null0"
}
],
"route_maps": {
"rmap_global": [{
"action": "permit",
"set": {
"ipv6": {
"nexthop": "prefer-global"
}
}
}]
}
},
"r3": {
"links": {
"r1-link1": {"ipv4": "auto", "ipv6": "auto"},
"r2-link1": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp":
[
{
"local_as": "300",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
},
{
"local_as": "300",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r3-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r3-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
}
],
"static_routes":[
{
"network": ["30.30.30.3/32", "30.30.30.30/32"],
"next_hop":"Null0"
},
{
"network": ["30:30::3/128", "30:30::30/128"],
"next_hop":"Null0"
},
{
"network": ["50.50.50.5/32", "50.50.50.50/32"],
"next_hop":"Null0"
},
{
"network": ["50:50::5/128", "50:50::50/128"],
"next_hop":"Null0"
}
],
"route_maps": {
"rmap_global": [{
"action": "permit",
"set": {
"ipv6": {
"nexthop": "prefer-global"
}
}
}]
}
},
"r4": {
"links": {
"r1-link1": {"ipv4": "auto", "ipv6": "auto"},
"r2-link1": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp":
[
{
"local_as": "400",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r4-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r4-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
},
{
"local_as": "400",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4-link1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4-link1": {
"keepalivetimer": 1,
"holddowntimer": 3,
"route_maps": [{
"name": "rmap_global",
"direction": "in"
}]
}
}
}
}
}
}
}
}
],
"static_routes":[
{
"network": ["40.40.40.4/32", "40.40.40.40/32"],
"next_hop":"Null0"
},
{
"network": ["40:40::4/128", "40:40::40/128"],
"next_hop":"Null0"
},
{
"network": ["50.50.50.5/32", "50.50.50.50/32"],
"next_hop":"Null0"
},
{
"network": ["50:50::5/128", "50:50::50/128"],
"next_hop":"Null0"
}
],
"route_maps": {
"rmap_global": [{
"action": "permit",
"set": {
"ipv6": {
"nexthop": "prefer-global"
}
}
}]
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,916 @@
#!/usr/bin/env python
#
# Copyright (c) 2020 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation,
# Inc. ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
"""
Following tests are covered to test BGP VRF Lite:
1. Verify BGP best path selection algorithm works fine when
routes are imported from ISR to default vrf and vice versa.
"""
import os
import sys
import time
import pytest
import platform
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
sys.path.append(os.path.join(CWD, "../lib/"))
# Required to instantiate the topology builder class.
# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.topogen import Topogen, get_topogen
from lib.topotest import version_cmp
from lib.common_config import (
start_topology,
write_test_header,
check_address_types,
write_test_footer,
step,
create_route_maps,
create_prefix_lists,
check_router_status,
get_frr_ipv6_linklocal,
shutdown_bringup_interface,
)
from lib.topolog import logger
from lib.bgp import (
verify_bgp_convergence,
create_router_bgp,
verify_bgp_community,
verify_bgp_rib,
clear_bgp,
verify_best_path_as_per_bgp_attribute
)
from lib.topojson import build_config_from_json
pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
# Global variables
NETWORK1_1 = {"ipv4": "11.11.11.1/32", "ipv6": "11:11::1/128"}
NETWORK1_2 = {"ipv4": "11.11.11.11/32", "ipv6": "11:11::11/128"}
NETWORK1_3 = {"ipv4": "10.10.10.10/32", "ipv6": "10:10::10/128"}
NETWORK1_4 = {"ipv4": "10.10.10.100/32", "ipv6": "10:10::100/128"}
NETWORK2_1 = {"ipv4": "22.22.22.2/32", "ipv6": "22:22::2/128"}
NETWORK2_2 = {"ipv4": "22.22.22.22/32", "ipv6": "22:22::22/128"}
NETWORK2_3 = {"ipv4": "20.20.20.20/32", "ipv6": "20:20::20/128"}
NETWORK2_4 = {"ipv4": "20.20.20.200/32", "ipv6": "20:20::200/128"}
NETWORK3_1 = {"ipv4": "30.30.30.3/32", "ipv6": "30:30::3/128"}
NETWORK3_2 = {"ipv4": "30.30.30.30/32", "ipv6": "30:30::30/128"}
NETWORK3_3 = {"ipv4": "50.50.50.5/32", "ipv6": "50:50::5/128"}
NETWORK3_4 = {"ipv4": "50.50.50.50/32", "ipv6": "50:50::50/128"}
NETWORK4_1 = {"ipv4": "40.40.40.4/32", "ipv6": "40:40::4/128"}
NETWORK4_2 = {"ipv4": "40.40.40.40/32", "ipv6": "40:40::40/128"}
NETWORK4_3 = {"ipv4": "50.50.50.5/32", "ipv6": "50:50::5/128"}
NETWORK4_4 = {"ipv4": "50.50.50.50/32", "ipv6": "50:50::50/128"}
NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
LOOPBACK_1 = {
"ipv4": "10.0.0.7/24",
"ipv6": "fd00:0:0:1::7/64",
"ipv4_mask": "255.255.255.0",
"ipv6_mask": None,
}
LOOPBACK_2 = {
"ipv4": "10.0.0.16/24",
"ipv6": "fd00:0:0:3::5/64",
"ipv4_mask": "255.255.255.0",
"ipv6_mask": None,
}
PREFERRED_NEXT_HOP = "global"
def setup_module(mod):
"""
Sets up the pytest environment
* `mod`: module name
"""
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# This function initiates the topology build with Topogen...
json_file = "{}/bgp_vrf_lite_best_path_topo1.json".format(CWD)
tgen = Topogen(json_file, mod.__name__)
global topo
topo = tgen.json_topo
# ... and here it calls Mininet initialization functions.
# Starting topology, create tmp files which are loaded to routers
# to start deamons and then start routers
start_topology(tgen)
# Run these tests for kernel version 4.19 or above
if version_cmp(platform.release(), "4.19") < 0:
error_msg = (
"BGP vrf dynamic route leak tests will not run "
'(have kernel "{}", but it requires >= 4.19)'.format(platform.release())
)
pytest.skip(error_msg)
# Creating configuration from JSON
build_config_from_json(tgen, topo)
global BGP_CONVERGENCE
global ADDR_TYPES
ADDR_TYPES = check_address_types()
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error: {}".format(
BGP_CONVERGENCE
)
logger.info("Running setup_module() done")
def teardown_module():
"""Teardown the pytest environment"""
logger.info("Running teardown_module to delete topology")
tgen = get_topogen()
# Stop toplogy and Remove tmp files
tgen.stop_topology()
logger.info(
"Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
)
logger.info("=" * 40)
#####################################################
#
# Testcases
#
#####################################################
def disable_route_map_to_prefer_global_next_hop(tgen, topo):
"""
This API is to remove prefer global route-map applied on neighbors
Parameter:
----------
* `tgen` : Topogen object
* `topo` : Input JSON data
Returns:
--------
True/errormsg
"""
logger.info("Remove prefer-global rmap applied on neighbors")
input_dict = {
"r1": {
"bgp": [
{
"local_as": "100",
"vrf": "ISR",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r1-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
{
"local_as": "100",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
{
"local_as": "100",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r4": {
"dest_link": {
"r1-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
]
},
"r2": {
"bgp": [
{
"local_as": "100",
"vrf": "ISR",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
{
"local_as": "100",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r2-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
{
"local_as": "100",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r4": {
"dest_link": {
"r2-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
]
},
"r3": {
"bgp": [
{
"local_as": "300",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
{
"local_as": "300",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r3-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
]
},
"r4": {
"bgp": [
{
"local_as": "400",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r4-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
{
"local_as": "400",
"address_family": {
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4-link1": {
"route_maps": [
{
"name": "rmap_global",
"direction": "in",
"delete": True,
}
]
}
}
}
}
}
}
},
},
]
},
}
result = create_router_bgp(tgen, topo, input_dict)
assert result is True, "Testcase :Failed \n Error: {}".format(result)
return True
def test_bgp_best_path_with_dynamic_import_p0(request):
"""
1.5.6. Verify BGP best path selection algorithm works fine when
routes are imported from ISR to default vrf and vice versa.
"""
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)
build_config_from_json(tgen, topo)
if tgen.routers_have_failure():
check_router_status(tgen)
for addr_type in ADDR_TYPES:
step(
"Redistribute configured static routes into BGP process" " on R1/R2 and R3"
)
input_dict_1 = {}
DUT = ["r1", "r2", "r3", "r4"]
VRFS = ["ISR", "ISR", "default", "default"]
AS_NUM = [100, 100, 300, 400]
for dut, vrf, as_num in zip(DUT, VRFS, AS_NUM):
temp = {dut: {"bgp": []}}
input_dict_1.update(temp)
temp[dut]["bgp"].append(
{
"local_as": as_num,
"vrf": vrf,
"address_family": {
addr_type: {
"unicast": {"redistribute": [{"redist_type": "static"}]}
}
},
}
)
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} :Failed \n Error: {}".format(
tc_name, result
)
for addr_type in ADDR_TYPES:
step("Import from default vrf into vrf ISR on R1 and R2 as below")
input_dict_vrf = {}
DUT = ["r1", "r2"]
VRFS = ["ISR", "ISR"]
AS_NUM = [100, 100]
for dut, vrf, as_num in zip(DUT, VRFS, AS_NUM):
temp = {dut: {"bgp": []}}
input_dict_vrf.update(temp)
temp[dut]["bgp"].append(
{
"local_as": as_num,
"vrf": vrf,
"address_family": {
addr_type: {"unicast": {"import": {"vrf": "default"}}}
},
}
)
result = create_router_bgp(tgen, topo, input_dict_vrf)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
input_dict_default = {}
DUT = ["r1", "r2"]
VRFS = ["default", "default"]
AS_NUM = [100, 100]
for dut, vrf, as_num in zip(DUT, VRFS, AS_NUM):
temp = {dut: {"bgp": []}}
input_dict_default.update(temp)
temp[dut]["bgp"].append(
{
"local_as": as_num,
"vrf": vrf,
"address_family": {
addr_type: {"unicast": {"import": {"vrf": "ISR"}}}
},
}
)
result = create_router_bgp(tgen, topo, input_dict_default)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
step(
"Verify ECMP/Next-hop/Imported routes Vs Locally originated "
"routes/eBGP routes vs iBGP routes --already covered in almost"
" all tests"
)
for addr_type in ADDR_TYPES:
step("Verify Pre-emption")
input_routes_r3 = {
"r3": {"static_routes": [{"network": [NETWORK3_3[addr_type]]}]}
}
intf_r3_r1 = topo["routers"]["r3"]["links"]["r1-link1"]["interface"]
intf_r4_r1 = topo["routers"]["r4"]["links"]["r1-link1"]["interface"]
if addr_type == "ipv6" and "link_local" in PREFERRED_NEXT_HOP:
nh_r3_r1 = get_frr_ipv6_linklocal(tgen, "r3", intf=intf_r3_r1)
nh_r4_r1 = get_frr_ipv6_linklocal(tgen, "r4", intf=intf_r4_r1)
else:
nh_r3_r1 = topo["routers"]["r3"]["links"]["r1-link1"][addr_type].split("/")[
0
]
nh_r4_r1 = topo["routers"]["r4"]["links"]["r1-link1"][addr_type].split("/")[
0
]
result = verify_bgp_rib(
tgen, addr_type, "r1", input_routes_r3, next_hop=[nh_r4_r1]
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
step("Shutdown interface connected to r1 from r4:")
shutdown_bringup_interface(tgen, "r4", intf_r4_r1, False)
for addr_type in ADDR_TYPES:
input_routes_r3 = {
"r3": {"static_routes": [{"network": [NETWORK3_3[addr_type]]}]}
}
intf_r3_r1 = topo["routers"]["r3"]["links"]["r1-link1"]["interface"]
intf_r4_r1 = topo["routers"]["r4"]["links"]["r1-link1"]["interface"]
if addr_type == "ipv6" and "link_local" in PREFERRED_NEXT_HOP:
nh_r3_r1 = get_frr_ipv6_linklocal(tgen, "r3", intf=intf_r3_r1)
nh_r4_r1 = get_frr_ipv6_linklocal(tgen, "r4", intf=intf_r4_r1)
else:
nh_r3_r1 = topo["routers"]["r3"]["links"]["r1-link1"][addr_type].split("/")[
0
]
nh_r4_r1 = topo["routers"]["r4"]["links"]["r1-link1"][addr_type].split("/")[
0
]
step("Verify next-hop is changed")
result = verify_bgp_rib(
tgen, addr_type, "r1", input_routes_r3, next_hop=[nh_r3_r1]
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
step("Bringup interface connected to r1 from r4:")
shutdown_bringup_interface(tgen, "r4", intf_r4_r1, True)
for addr_type in ADDR_TYPES:
input_routes_r3 = {
"r3": {"static_routes": [{"network": [NETWORK3_3[addr_type]]}]}
}
intf_r3_r1 = topo["routers"]["r3"]["links"]["r1-link1"]["interface"]
intf_r4_r1 = topo["routers"]["r4"]["links"]["r1-link1"]["interface"]
if addr_type == "ipv6" and "link_local" in PREFERRED_NEXT_HOP:
nh_r3_r1 = get_frr_ipv6_linklocal(tgen, "r3", intf=intf_r3_r1)
nh_r4_r1 = get_frr_ipv6_linklocal(tgen, "r4", intf=intf_r4_r1)
else:
nh_r3_r1 = topo["routers"]["r3"]["links"]["r1-link1"][addr_type].split("/")[
0
]
nh_r4_r1 = topo["routers"]["r4"]["links"]["r1-link1"][addr_type].split("/")[
0
]
step("Verify next-hop is not chnaged aftr shutdown:")
result = verify_bgp_rib(
tgen, addr_type, "r1", input_routes_r3, next_hop=[nh_r3_r1]
)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
step("Active-Standby scenario(as-path prepend and Local pref)")
for addr_type in ADDR_TYPES:
step("Create prefix-list")
input_dict_pf = {
"r1": {
"prefix_lists": {
addr_type: {
"pf_ls_{}".format(addr_type): [
{
"seqid": 10,
"network": NETWORK3_4[addr_type],
"action": "permit",
}
]
}
}
}
}
result = create_prefix_lists(tgen, input_dict_pf)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
for addr_type in ADDR_TYPES:
step("Create route-map to match prefix-list and set localpref 500")
input_dict_rm = {
"r1": {
"route_maps": {
"rmap_PATH1_{}".format(addr_type): [
{
"action": "permit",
"seq_id": 10,
"match": {
addr_type: {
"prefix_lists": "pf_ls_{}".format(addr_type)
}
},
"set": {"locPrf": 500},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_rm)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
step("Create route-map to match prefix-list and set localpref 600")
input_dict_rm = {
"r1": {
"route_maps": {
"rmap_PATH2_{}".format(addr_type): [
{
"action": "permit",
"seq_id": 20,
"match": {
addr_type: {
"prefix_lists": "pf_ls_{}".format(addr_type)
}
},
"set": {"locPrf": 600},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_rm)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
input_dict_rma = {
"r1": {
"bgp": [
{
"local_as": "100",
"address_family": {
addr_type: {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1-link1": {
"route_maps": [
{
"name": "rmap_PATH1_{}".format(
addr_type
),
"direction": "in",
}
]
}
}
},
"r4": {
"dest_link": {
"r1-link1": {
"route_maps": [
{
"name": "rmap_PATH2_{}".format(
addr_type
),
"direction": "in",
}
]
}
}
},
}
}
}
},
}
]
}
}
result = create_router_bgp(tgen, topo, input_dict_rma)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
dut = "r1"
attribute = "locPrf"
for addr_type in ADDR_TYPES:
step("Verify bestpath is installed as per highest localpref")
input_routes_r3 = {
"r3": {
"static_routes": [
{"network": [NETWORK3_3[addr_type], NETWORK3_4[addr_type]]}
]
}
}
result = verify_best_path_as_per_bgp_attribute(
tgen, addr_type, dut, input_routes_r3, attribute
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
for addr_type in ADDR_TYPES:
step("Create route-map to match prefix-list and set localpref 700")
input_dict_rm = {
"r1": {
"route_maps": {
"rmap_PATH1_{}".format(addr_type): [
{
"action": "permit",
"seq_id": 10,
"match": {
addr_type: {
"prefix_lists": "pf_ls_{}".format(addr_type)
}
},
"set": {"locPrf": 700},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_rm)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
for addr_type in ADDR_TYPES:
step("Verify bestpath is changed as per highest localpref")
input_routes_r3 = {
"r3": {
"static_routes": [
{"network": [NETWORK3_3[addr_type], NETWORK3_4[addr_type]]}
]
}
}
result = verify_best_path_as_per_bgp_attribute(
tgen, addr_type, dut, input_routes_r3, attribute
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
for addr_type in ADDR_TYPES:
step("Create route-map to match prefix-list and set as-path prepend")
input_dict_rm = {
"r1": {
"route_maps": {
"rmap_PATH2_{}".format(addr_type): [
{
"action": "permit",
"seq_id": 20,
"match": {
addr_type: {
"prefix_lists": "pf_ls_{}".format(addr_type)
}
},
"set": {
"localpref": 700,
"path": {"as_num": "111", "as_action": "prepend"},
},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_rm)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
attribute = "path"
for addr_type in ADDR_TYPES:
step("Verify bestpath is changed as per shortest as-path")
input_routes_r3 = {
"r3": {
"static_routes": [
{"network": [NETWORK3_3[addr_type], NETWORK3_4[addr_type]]}
]
}
}
result = verify_best_path_as_per_bgp_attribute(
tgen, addr_type, dut, input_routes_r3, attribute
)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result
)
write_test_footer(tc_name)
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))

View File

@ -0,0 +1,539 @@
#!/usr/bin/env python
#
# Copyright (c) 2021 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation,
# Inc. ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
"""
Following tests are covered to test BGP VRF Lite:
1. Verify that locally imported routes are selected as best path over eBGP imported routes
peers.
2. Verify ECMP for imported routes from different VRFs.
"""
import os
import sys
import time
import pytest
import platform
from time import sleep
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
sys.path.append(os.path.join(CWD, "../lib/"))
# Required to instantiate the topology builder class.
# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.topogen import Topogen, get_topogen
from lib.topotest import version_cmp
from lib.common_config import (
start_topology,
write_test_header,
check_address_types,
write_test_footer,
reset_config_on_routers,
verify_rib,
step,
create_static_routes,
check_router_status,
apply_raw_config
)
from lib.topolog import logger
from lib.bgp import (
verify_bgp_convergence,
create_router_bgp,
verify_bgp_rib,
verify_bgp_bestpath
)
from lib.topojson import build_config_from_json
pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
# Global variables
NETWORK1_1 = {"ipv4": "11.11.11.1/32", "ipv6": "11:11::1/128"}
NETWORK1_2 = {"ipv4": "11.11.11.11/32", "ipv6": "11:11::11/128"}
NETWORK1_3 = {"ipv4": "10.10.10.1/32", "ipv6": "10:10::1/128"}
NETWORK1_4 = {"ipv4": "10.10.10.100/32", "ipv6": "10:10::100/128"}
NETWORK1_5 = {"ipv4": "110.110.110.1/32", "ipv6": "110:110::1/128"}
NETWORK1_6 = {"ipv4": "110.110.110.100/32", "ipv6": "110:110::100/128"}
NETWORK2_1 = {"ipv4": "22.22.22.2/32", "ipv6": "22:22::2/128"}
NETWORK2_2 = {"ipv4": "22.22.22.22/32", "ipv6": "22:22::22/128"}
NETWORK2_3 = {"ipv4": "20.20.20.20/32", "ipv6": "20:20::20/128"}
NETWORK2_4 = {"ipv4": "20.20.20.200/32", "ipv6": "20:20::200/128"}
NETWORK2_5 = {"ipv4": "220.220.220.20/32", "ipv6": "220:220::20/128"}
NETWORK2_6 = {"ipv4": "220.220.220.200/32", "ipv6": "220:220::200/128"}
NETWORK3_1 = {"ipv4": "30.30.30.3/32", "ipv6": "30:30::3/128"}
NETWORK3_2 = {"ipv4": "30.30.30.30/32", "ipv6": "30:30::30/128"}
PREFIX_LIST = {
"ipv4": ["11.11.11.1", "22.22.22.2", "22.22.22.22"],
"ipv6": ["11:11::1", "22:22::2", "22:22::22"],
}
PREFERRED_NEXT_HOP = "global"
VRF_LIST = ["RED", "BLUE", "GREEN"]
COMM_VAL_1 = "100:100"
COMM_VAL_2 = "500:500"
COMM_VAL_3 = "600:600"
BESTPATH = {
"ipv4": "0.0.0.0",
"ipv6": "::"
}
def setup_module(mod):
"""
Sets up the pytest environment
* `mod`: module name
"""
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# This function initiates the topology build with Topogen...
json_file = "{}/bgp_vrf_lite_best_path_topo2.json".format(CWD)
tgen = Topogen(json_file, mod.__name__)
global topo
topo = tgen.json_topo
# ... and here it calls Mininet initialization functions.
# Starting topology, create tmp files which are loaded to routers
# to start deamons and then start routers
start_topology(tgen)
# Run these tests for kernel version 4.19 or above
if version_cmp(platform.release(), "4.19") < 0:
error_msg = (
"BGP vrf dynamic route leak tests will not run "
'(have kernel "{}", but it requires >= 4.19)'.format(platform.release())
)
pytest.skip(error_msg)
# Creating configuration from JSON
build_config_from_json(tgen, topo)
global BGP_CONVERGENCE
global ADDR_TYPES
ADDR_TYPES = check_address_types()
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error: {}".format(
BGP_CONVERGENCE
)
logger.info("Running setup_module() done")
def teardown_module():
"""Teardown the pytest environment"""
logger.info("Running teardown_module to delete topology")
tgen = get_topogen()
# Stop toplogy and Remove tmp files
tgen.stop_topology()
logger.info(
"Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
)
logger.info("=" * 40)
#####################################################
#
# Testcases
#
#####################################################
def test_dynamic_import_ecmp_imported_routed_diffrent_vrfs_p0(request):
"""
Verify ECMP for imported routes from different VRFs.
"""
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)
if tgen.routers_have_failure():
check_router_status(tgen)
reset_config_on_routers(tgen)
step("Configure same static routes in tenant vrfs RED and GREEN on router "
"R3 and redistribute in respective BGP process")
for vrf_name in ["RED", "GREEN"]:
for addr_type in ADDR_TYPES:
if vrf_name == "GREEN":
next_hop_vrf = topo["routers"]["r1"]["links"][
"r3-link3"][addr_type].split("/")[0]
else:
next_hop_vrf = topo["routers"]["r2"]["links"][
"r3-link1"][addr_type].split("/")[0]
static_routes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
"next_hop": next_hop_vrf,
"vrf": vrf_name
}
]
}
}
result = create_static_routes(tgen, static_routes)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Redistribute static route on BGP VRF : {}".format(vrf_name))
temp = {}
for addr_type in ADDR_TYPES:
temp.update({
addr_type: {
"unicast": {
"redistribute": [{
"redist_type": "static"
}]
}
}
})
redist_dict = {"r3": {"bgp": [{
"vrf": vrf_name, "local_as": 3, "address_family": temp
}]}}
result = create_router_bgp(tgen, topo, redist_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Verify that configured static routes are installed in respective "
"BGP table for vrf RED & GREEN")
for vrf_name in ["RED", "GREEN"]:
for addr_type in ADDR_TYPES:
if vrf_name == "GREEN":
next_hop_vrf = topo["routers"]["r1"]["links"][
"r3-link3"][addr_type].split("/")[0]
else:
next_hop_vrf = topo["routers"]["r2"]["links"][
"r3-link1"][addr_type].split("/")[0]
static_routes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
"vrf": vrf_name
}
]
}
}
result = verify_bgp_rib(tgen, addr_type, "r3", static_routes,
next_hop=next_hop_vrf)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
result = verify_rib(tgen, addr_type, "r3", static_routes,
next_hop=next_hop_vrf)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
step("Import vrf RED and GREEN into default vrf and Configure ECMP")
bgp_val = []
for vrf_name in ["RED", "GREEN"]:
temp = {}
for addr_type in ADDR_TYPES:
temp.update({
addr_type: {
"unicast": {
"import": {
"vrf": vrf_name
},
"maximum_paths": {
"ebgp": 2
}
}
}
})
bgp_val.append({
"local_as": 3, "address_family": temp
})
import_dict = {"r3": {"bgp": bgp_val}}
result = create_router_bgp(tgen, topo, import_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Configure bgp bestpath on router r3")
r3_raw_config = {
"r3": {
"raw_config": [
"router bgp 3",
"bgp bestpath as-path multipath-relax"
]
}
}
result = apply_raw_config(tgen, r3_raw_config)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Verify that routes are imported with two different next-hop vrfs "
"and IPs. Additionally R3 must do ECMP for both the routes.")
for addr_type in ADDR_TYPES:
next_hop_vrf = [
topo["routers"]["r2"]["links"]["r3-link1"][addr_type]. \
split("/")[0],
topo["routers"]["r1"]["links"]["r3-link3"][addr_type]. \
split("/")[0]
]
static_routes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
}
]
}
}
result = verify_bgp_rib(tgen, addr_type, "r3", static_routes,
next_hop=next_hop_vrf)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
result = verify_rib(tgen, addr_type, "r3", static_routes,
next_hop=next_hop_vrf)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
step("Now change the next-hop of static routes in vrf RED and GREEN to "
"same IP address")
for addr_type in ADDR_TYPES:
next_hop_vrf = topo["routers"]["r1"]["links"][
"r3-link3"][addr_type].split("/")[0]
static_routes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
"next_hop": next_hop_vrf,
"vrf": "RED"
},
{
"network": [NETWORK1_1[addr_type]],
"next_hop": topo["routers"]["r2"]["links"][
"r3-link1"][addr_type].split("/")[0],
"vrf": "RED",
"delete": True
}
]
}
}
result = create_static_routes(tgen, static_routes)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Verify that now routes are imported with two different next-hop "
"vrfs but same IPs. Additionally R3 must do ECMP for both the routes")
for addr_type in ADDR_TYPES:
next_hop_vrf = [
topo["routers"]["r1"]["links"]["r3-link3"][addr_type].\
split("/")[0],
topo["routers"]["r1"]["links"]["r3-link3"][addr_type]. \
split("/")[0]
]
static_routes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_1[addr_type]],
}
]
}
}
result = verify_bgp_rib(tgen, addr_type, "r3", static_routes,
next_hop=next_hop_vrf)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
result = verify_rib(tgen, addr_type, "r3", static_routes,
next_hop=next_hop_vrf)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
write_test_footer(tc_name)
def test_locally_imported_routes_selected_as_bestpath_over_ebgp_imported_routes_p0(request):
"""
Verify ECMP for imported routes from different VRFs.
"""
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)
if tgen.routers_have_failure():
check_router_status(tgen)
reset_config_on_routers(tgen)
step("Configure same static routes on R2 and R3 vrfs and redistribute in BGP "
"for GREEN and RED vrf instances")
for dut, network in zip(["r2", "r3"], [
[NETWORK1_1, NETWORK1_2], [NETWORK1_1, NETWORK1_2]]):
for vrf_name, network_vrf in zip(["RED", "GREEN"], network):
step("Configure static route for VRF : {} on {}".format(vrf_name,
dut))
for addr_type in ADDR_TYPES:
static_routes = {
dut: {
"static_routes": [
{
"network": [network_vrf[addr_type]],
"next_hop": "blackhole",
"vrf": vrf_name
}
]
}
}
result = create_static_routes(tgen, static_routes)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
for dut, as_num in zip(["r2", "r3"], ["2", "3"]):
for vrf_name in ["RED", "GREEN"]:
step("Redistribute static route on BGP VRF : {}".format(vrf_name))
temp = {}
for addr_type in ADDR_TYPES:
temp.update({
addr_type: {
"unicast": {
"redistribute": [{
"redist_type": "static"
}]
}
}
})
redist_dict = {dut: {"bgp": [{
"vrf": vrf_name, "local_as": as_num, "address_family": temp
}]}}
result = create_router_bgp(tgen, topo, redist_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Verify that R2 and R3 has installed redistributed routes in default "
"and RED vrfs and GREEN respectively:")
for dut, network in zip(["r2", "r3"],
[[NETWORK1_1, NETWORK1_2],
[NETWORK1_1, NETWORK1_2]]):
for vrf_name, network_vrf in zip(["RED", "GREEN"], network):
for addr_type in ADDR_TYPES:
static_routes = {
dut: {
"static_routes": [
{
"network": [network_vrf[addr_type]],
"next_hop": "blackhole",
"vrf": vrf_name
}
]
}
}
result = verify_bgp_rib(tgen, addr_type, dut, static_routes)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
step("Import vrf RED's route in vrf GREEN on R3")
temp = {}
for addr_type in ADDR_TYPES:
temp.update({
addr_type: {
"unicast": {
"import": {
"vrf": "RED"
}
}
}
})
import_dict = {"r3": {"bgp": [{
"vrf": "GREEN", "local_as": 3, "address_family": temp
}]}}
result = create_router_bgp(tgen, topo, import_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
step("Verify that locally imported routes are installed over eBGP imported"
" routes from VRF RED into VRF GREEN")
for addr_type in ADDR_TYPES:
static_routes = {
"r3": {
"static_routes": [
{
"network": [NETWORK1_2[addr_type]],
"next_hop": "blackhole",
"vrf": "GREEN"
}
]
}
}
input_routes = {
"r3": {
addr_type: [
{
"network": NETWORK1_2[addr_type],
"bestpath": BESTPATH[addr_type],
"vrf": "GREEN"
}
]
}
}
result = verify_bgp_bestpath(tgen, addr_type, input_routes)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
result = verify_rib(tgen, addr_type, "r3", static_routes)
assert result is True, "Testcase {} : Failed \n Error {}". \
format(tc_name, result)
write_test_footer(tc_name)
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))

View File

@ -989,6 +989,14 @@ def __create_bgp_unicast_address_family(
if "no_allowas_in" in peer:
allow_as_in = peer["no_allowas_in"]
config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
if "shutdown" in peer:
shut_val = peer["shutdown"]
if shut_val is True:
config_data.append("{} shutdown".format(neigh_cxt))
elif shut_val is False:
config_data.append("no {} shutdown".format(neigh_cxt))
if prefix_lists:
for prefix_list in prefix_lists:
name = prefix_list.setdefault("name", {})
@ -2221,6 +2229,7 @@ def verify_bgp_attributes(
rmap_name=None,
input_dict=None,
seq_id=None,
vrf=None,
nexthop=None,
expected=True,
):
@ -2275,7 +2284,10 @@ def verify_bgp_attributes(
logger.info("Verifying BGP set attributes for dut {}:".format(router))
for static_route in static_routes:
cmd = "show bgp {} {} json".format(addr_type, static_route)
if vrf:
cmd = "show bgp vrf {} {} {} json".format(vrf, addr_type, static_route)
else:
cmd = "show bgp {} {} json".format(addr_type, static_route)
show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
dict_to_test = []
@ -2821,7 +2833,6 @@ def verify_bgp_rib(
st_rt,
dut,
)
return errormsg
else:
nh_found = True
@ -4428,6 +4439,83 @@ def verify_evpn_routes(
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return False
@retry(retry_timeout=10)
def verify_bgp_bestpath(tgen, addr_type, input_dict):
"""
Verifies bgp next hop values in best-path output
* `dut` : device under test
* `addr_type` : Address type ipv4/ipv6
* `input_dict`: having details like multipath and bestpath
Usage
-----
input_dict_1 = {
"r1": {
"ipv4" : {
"bestpath": "50.0.0.1",
"multipath": ["50.0.0.1", "50.0.0.2"],
"network": "100.0.0.0/24"
}
"ipv6" : {
"bestpath": "1000::1",
"multipath": ["1000::1", "1000::2"]
"network": "2000::1/128"
}
}
}
result = verify_bgp_bestpath(tgen, input_dict)
"""
result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
for dut in input_dict.keys():
rnode = tgen.routers()[dut]
logger.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut)
result = False
for network_dict in input_dict[dut][addr_type]:
nw_addr = network_dict.setdefault("network", None)
vrf = network_dict.setdefault("vrf", None)
bestpath = network_dict.setdefault("bestpath", None)
if vrf:
cmd = "show bgp vrf {} {} {} bestpath json".format(
vrf, addr_type, nw_addr
)
else:
cmd = "show bgp {} {} bestpath json".format(addr_type, nw_addr)
data = run_frr_cmd(rnode, cmd, isjson=True)
route = data["paths"][0]
if "bestpath" in route:
if route["bestpath"]["overall"] is True:
_bestpath = route["nexthops"][0]["ip"]
if _bestpath != bestpath:
return (
"DUT:[{}] Bestpath do not match for"
" network: {}, Expected "
" {} as bgp bestpath found {}".format(
dut, nw_addr, bestpath, _bestpath
)
)
logger.info(
"DUT:[{}] Found expected bestpath: "
" {} for network: {}".format(dut, _bestpath, nw_addr)
)
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
"""
This api is used to verify the tcp-mss value assigned to a neigbour of DUT