Merge pull request #8683 from kuldeepkash/bgp_gshut

tests: Add bgp_gshut_topo1 test suite
This commit is contained in:
Russ White 2021-06-01 11:43:48 -04:00 committed by GitHub
commit 3d7bd27619
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1748 additions and 0 deletions

View File

@ -0,0 +1,221 @@
{
"ipv4base": "10.0.0.0",
"ipv4mask": 24,
"ipv6base": "fd00::",
"ipv6mask": 64,
"link_ip_start": {
"ipv4": "10.0.1.0",
"v4mask": 24,
"ipv6": "fd00::",
"v6mask": 64
},
"lo_prefix": {
"ipv4": "1.0.",
"v4mask": 32,
"ipv6": "2001:DB8:F::",
"v6mask": 128
},
"routers": {
"r1": {
"links": {
"r2": {"ipv4": "auto", "ipv6": "auto"},
"r3": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"redistribute": [
{"redist_type": "static"}
],
"neighbor": {
"r2": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
},
"r3": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"redistribute": [
{"redist_type": "static"}
],
"neighbor": {
"r2": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
},
"r3": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
}
}
},
"static_routes":[
{
"network":"100.0.10.1/32",
"no_of_ip":5,
"next_hop":"Null0"
},
{
"network":"1::1/128",
"no_of_ip":5,
"next_hop":"Null0"
}]
},
"r2": {
"links": {
"r1": {"ipv4": "auto", "ipv6": "auto"},
"r4": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "200",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2": {}
}
},
"r4": {
"dest_link": {
"r2": {}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2": {}
}
},
"r4": {
"dest_link": {
"r2": {}
}
}
}
}
}
}
}
},
"r3": {
"links": {
"r1": {"ipv4": "auto", "ipv6": "auto"},
"r4": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "300",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3": {}
}
},
"r4": {
"dest_link": {
"r3": {}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3": {}
}
},
"r4": {
"dest_link": {
"r3": {}
}
}
}
}
}
}
}
},
"r4": {
"links": {
"r2": {"ipv4": "auto", "ipv6": "auto"},
"r3": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "400",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4": {}
}
},
"r3": {
"dest_link": {
"r4": {}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4": {}
}
},
"r3": {
"dest_link": {
"r4": {}
}
}
}
}
}
}
}
}
}
}

View File

@ -0,0 +1,221 @@
{
"ipv4base": "10.0.0.0",
"ipv4mask": 24,
"ipv6base": "fd00::",
"ipv6mask": 64,
"link_ip_start": {
"ipv4": "10.0.1.0",
"v4mask": 24,
"ipv6": "fd00::",
"v6mask": 64
},
"lo_prefix": {
"ipv4": "1.0.",
"v4mask": 32,
"ipv6": "2001:DB8:F::",
"v6mask": 128
},
"routers": {
"r1": {
"links": {
"r2": {"ipv4": "auto", "ipv6": "auto"},
"r3": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"redistribute": [
{"redist_type": "static"}
],
"neighbor": {
"r2": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
},
"r3": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
},
"ipv6": {
"unicast": {
"redistribute": [
{"redist_type": "static"}
],
"neighbor": {
"r2": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
},
"r3": {
"dest_link": {
"r1": {
"keepalivetimer": 1,
"holddowntimer": 3
}
}
}
}
}
}
}
},
"static_routes":[
{
"network":"100.0.10.1/32",
"no_of_ip":5,
"next_hop":"Null0"
},
{
"network":"1::1/128",
"no_of_ip":5,
"next_hop":"Null0"
}]
},
"r2": {
"links": {
"r1": {"ipv4": "auto", "ipv6": "auto"},
"r4": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2": {}
}
},
"r4": {
"dest_link": {
"r2": {}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r2": {}
}
},
"r4": {
"dest_link": {
"r2": {}
}
}
}
}
}
}
}
},
"r3": {
"links": {
"r1": {"ipv4": "auto", "ipv6": "auto"},
"r4": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "100",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3": {}
}
},
"r4": {
"dest_link": {
"r3": {}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r1": {
"dest_link": {
"r3": {}
}
},
"r4": {
"dest_link": {
"r3": {}
}
}
}
}
}
}
}
},
"r4": {
"links": {
"r2": {"ipv4": "auto", "ipv6": "auto"},
"r3": {"ipv4": "auto", "ipv6": "auto"}
},
"bgp": {
"local_as": "200",
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4": {}
}
},
"r3": {
"dest_link": {
"r4": {}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r2": {
"dest_link": {
"r4": {}
}
},
"r3": {
"dest_link": {
"r4": {}
}
}
}
}
}
}
}
}
}
}

View File

@ -0,0 +1,630 @@
#!/usr/bin/env python
#
# Copyright (c) 2021 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
"""
Following tests are covered to test ecmp functionality on BGP GSHUT.
1. Verify graceful-shutdown functionality with eBGP peers
2. Verify graceful-shutdown functionality when daemons
bgpd/zebra/staticd and frr services are restarted with eBGP peers
"""
import os
import sys
import time
import json
import pytest
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
sys.path.append(os.path.join(CWD, "../../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.topogen import Topogen, get_topogen
from mininet.topo import Topo
from time import sleep
from lib.common_config import (
start_topology,
write_test_header,
write_test_footer,
verify_rib,
create_static_routes,
check_address_types,
interface_status,
reset_config_on_routers,
step,
get_frr_ipv6_linklocal,
kill_router_daemons,
start_router_daemons,
stop_router,
start_router,
create_route_maps,
create_bgp_community_lists,
delete_route_maps,
required_linux_kernel_version,
)
from lib.topolog import logger
from lib.bgp import (
verify_bgp_convergence,
create_router_bgp,
clear_bgp,
verify_bgp_rib,
verify_bgp_attributes,
)
from lib.topojson import build_topo_from_json, build_config_from_json
# Reading the data from JSON File for topology and configuration creation
jsonFile = "{}/ebgp_gshut_topo1.json".format(CWD)
try:
with open(jsonFile, "r") as topoJson:
topo = json.load(topoJson)
except IOError:
logger.info("Could not read file:", jsonFile)
# Global variables
NETWORK = {"ipv4": "100.0.10.1/32", "ipv6": "1::1/128"}
NEXT_HOP_IP_1 = {"ipv4": "10.0.2.1", "ipv6": "fd00:0:0:1::1"}
NEXT_HOP_IP_2 = {"ipv4": "10.0.4.2", "ipv6": "fd00:0:0:3::2"}
PREFERRED_NEXT_HOP = "link_local"
BGP_CONVERGENCE = False
class GenerateTopo(Topo):
"""
Test topology builder
* `Topo`: Topology object
"""
def build(self, *_args, **_opts):
"Build function"
tgen = get_topogen(self)
# This function only purpose is to create topology
# as defined in input json file.
#
# Create topology (setup module)
# Creating 2 routers topology, r1, r2in IBGP
# Bring up topology
# Building topology from json file
build_topo_from_json(tgen, topo)
def setup_module(mod):
"""
Sets up the pytest environment
* `mod`: module name
"""
global ADDR_TYPES
# Required linux kernel version for this suite to run.
result = required_linux_kernel_version("4.16")
if result is not True:
pytest.skip("Kernel requirements are not met")
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# This function initiates the topology build with Topogen...
tgen = Topogen(GenerateTopo, mod.__name__)
# ... and here it calls Mininet initialization functions.
# Starting topology, create tmp files which are loaded to routers
# to start deamons and then start routers
start_topology(tgen)
# Creating configuration from JSON
build_config_from_json(tgen, topo)
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
# Api call verify whether BGP is converged
ADDR_TYPES = check_address_types()
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
BGP_CONVERGENCE
)
logger.info("Running setup_module() done")
def teardown_module():
"""
Teardown the pytest environment.
* `mod`: module name
"""
logger.info("Running teardown_module to delete topology")
tgen = get_topogen()
# Stop toplogy and Remove tmp files
tgen.stop_topology()
###########################
# Local APIs
###########################
def next_hop_per_address_family(
tgen, dut, peer, addr_type, next_hop_dict, preferred_next_hop=PREFERRED_NEXT_HOP
):
"""
This function returns link_local or global next_hop per address-family
"""
intferface = topo["routers"][peer]["links"]["{}".format(dut)]["interface"]
if addr_type == "ipv6" and "link_local" in preferred_next_hop:
next_hop = get_frr_ipv6_linklocal(tgen, peer, intf=intferface)
else:
next_hop = next_hop_dict[addr_type]
return next_hop
###########################
# TESTCASES
###########################
def test_verify_graceful_shutdown_functionality_with_eBGP_peers_p0(request):
"""
Verify graceful-shutdown functionality with eBGP peers
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
reset_config_on_routers(tgen)
step("Done in base config: Configure base config as per the topology")
step("Base config should be up, verify using BGP convergence")
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Done in base config: Advertise prefixes from R1")
step("Verify BGP routes are received at R3 with best path from R3 to R1")
for addr_type in ADDR_TYPES:
dut = "r3"
next_hop1 = next_hop_per_address_family(
tgen, "r3", "r1", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r3", "r4", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("On R1 configure:")
step("Create standard bgp community-list to permit graceful-shutdown:")
input_dict_1 = {
"r1": {
"bgp_community_lists": [
{
"community_type": "standard",
"action": "permit",
"name": "GSHUT",
"value": "graceful-shutdown",
}
]
}
}
result = create_bgp_community_lists(tgen, input_dict_1)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Create route-map to set community GSHUT in OUT direction")
input_dict_2 = {
"r1": {
"route_maps": {
"GSHUT-OUT": [
{
"action": "permit",
"seq_id": "10",
"set": {"community": {"num": "graceful-shutdown"}},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_2)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
input_dict_3 = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_3)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step(
"FRR is setting local-pref to 0 by-default on receiver GSHUT community, "
"below step is not needed, but keeping for reference"
)
step(
"On R3, apply route-map IN direction to match GSHUT community "
"and set local-preference to 0."
)
step(
"Verify BGP convergence on R3 and ensure all the neighbours state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Verify BGP routes on R3:")
step("local pref for routes coming from R1 is set to 0.")
for addr_type in ADDR_TYPES:
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}],}}}
static_routes = [NETWORK[addr_type]]
result = verify_bgp_attributes(
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Ensure that best path is selected from R4 to R3.")
for addr_type in ADDR_TYPES:
dut = "r3"
next_hop1 = next_hop_per_address_family(
tgen, "r3", "r1", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r3", "r4", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop2)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
write_test_footer(tc_name)
def test_verify_restarting_zebra_bgpd_staticd_frr_with_eBGP_peers_p0(request):
"""
Verify graceful-shutdown functionality when daemons bgpd/zebra/staticd and
frr services are restarted with eBGP peers
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
reset_config_on_routers(tgen)
step("Done in base config: Configure base config as per the topology")
step("Base config should be up, verify using BGP convergence")
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Done in base config: Advertise prefixes from R1")
step("Verify BGP routes are received at R3 with best path from R3 to R1")
for addr_type in ADDR_TYPES:
dut = "r3"
next_hop1 = next_hop_per_address_family(
tgen, "r3", "r1", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r3", "r4", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("On R1 configure:")
step("Create standard bgp community-list to permit graceful-shutdown:")
input_dict_1 = {
"r1": {
"bgp_community_lists": [
{
"community_type": "standard",
"action": "permit",
"name": "GSHUT",
"value": "graceful-shutdown",
}
]
}
}
result = create_bgp_community_lists(tgen, input_dict_1)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Create route-map to set community GSHUT in OUT direction")
input_dict_2 = {
"r1": {
"route_maps": {
"GSHUT-OUT": [
{
"action": "permit",
"seq_id": "10",
"set": {"community": {"num": "graceful-shutdown"}},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_2)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
input_dict_3 = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_3)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step(
"FRR is setting local-pref to 0 by-default on receiver GSHUT community, "
"below step is not needed, but keeping for reference"
)
step(
"On R3, apply route-map IN direction to match GSHUT community "
"and set local-preference to 0."
)
step(
"Verify BGP convergence on R3 and ensure all the neighbours state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Verify BGP routes on R3:")
step("local pref for routes coming from R1 is set to 0.")
for addr_type in ADDR_TYPES:
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}]}}}
static_routes = [NETWORK[addr_type]]
result = verify_bgp_attributes(
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Ensure that best path is selected from R4 to R3.")
for addr_type in ADDR_TYPES:
dut = "r3"
next_hop1 = next_hop_per_address_family(
tgen, "r3", "r1", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r3", "r4", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop2)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Restart daemons and frr services")
for daemon in ["bgpd", "zebra", "staticd", "frr"]:
if daemon != "frr":
kill_router_daemons(tgen, "r3", ["staticd"])
start_router_daemons(tgen, "r3", ["staticd"])
else:
stop_router(tgen, "r3")
start_router(tgen, "r3")
step(
"Verify BGP convergence on R3 and ensure all the neighbours state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Verify BGP routes on R3:")
step("local pref for routes coming from R1 is set to 0.")
for addr_type in ADDR_TYPES:
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}]}}}
static_routes = [NETWORK[addr_type]]
result = verify_bgp_attributes(
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Ensure that best path is selected from R4 to R3.")
for addr_type in ADDR_TYPES:
dut = "r3"
next_hop1 = next_hop_per_address_family(
tgen, "r3", "r1", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r3", "r4", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop2)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
write_test_footer(tc_name)
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))

View File

@ -0,0 +1,676 @@
#!/usr/bin/env python
#
# Copyright (c) 2021 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
"""
Following tests are covered to test ecmp functionality on BGP GSHUT.
1. Verify graceful-shutdown functionality with iBGP peers
2. Verify graceful-shutdown functionality after
deleting/re-adding route-map with iBGP peers
"""
import os
import sys
import time
import json
import pytest
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
sys.path.append(os.path.join(CWD, "../../"))
# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.topogen import Topogen, get_topogen
from mininet.topo import Topo
from time import sleep
from lib.common_config import (
start_topology,
write_test_header,
write_test_footer,
verify_rib,
create_static_routes,
check_address_types,
interface_status,
reset_config_on_routers,
step,
get_frr_ipv6_linklocal,
kill_router_daemons,
start_router_daemons,
stop_router,
start_router,
create_route_maps,
create_bgp_community_lists,
delete_route_maps,
required_linux_kernel_version,
)
from lib.topolog import logger
from lib.bgp import (
verify_bgp_convergence,
create_router_bgp,
clear_bgp,
verify_bgp_rib,
verify_bgp_attributes,
)
from lib.topojson import build_topo_from_json, build_config_from_json
# Reading the data from JSON File for topology and configuration creation
jsonFile = "{}/ibgp_gshut_topo1.json".format(CWD)
try:
with open(jsonFile, "r") as topoJson:
topo = json.load(topoJson)
except IOError:
logger.info("Could not read file:", jsonFile)
# Global variables
NETWORK = {"ipv4": "100.0.10.1/32", "ipv6": "1::1/128"}
NEXT_HOP_IP_1 = {"ipv4": "10.0.3.1", "ipv6": "fd00:0:0:3::1"}
NEXT_HOP_IP_2 = {"ipv4": "10.0.4.1", "ipv6": "fd00:0:0:2::1"}
PREFERRED_NEXT_HOP = "link_local"
BGP_CONVERGENCE = False
class GenerateTopo(Topo):
"""
Test topology builder
* `Topo`: Topology object
"""
def build(self, *_args, **_opts):
"Build function"
tgen = get_topogen(self)
# This function only purpose is to create topology
# as defined in input json file.
#
# Create topology (setup module)
# Creating 2 routers topology, r1, r2in IBGP
# Bring up topology
# Building topology from json file
build_topo_from_json(tgen, topo)
def setup_module(mod):
"""
Sets up the pytest environment
* `mod`: module name
"""
global ADDR_TYPES
# Required linux kernel version for this suite to run.
result = required_linux_kernel_version("4.16")
if result is not True:
pytest.skip("Kernel requirements are not met")
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# This function initiates the topology build with Topogen...
tgen = Topogen(GenerateTopo, mod.__name__)
# ... and here it calls Mininet initialization functions.
# Starting topology, create tmp files which are loaded to routers
# to start deamons and then start routers
start_topology(tgen)
# Creating configuration from JSON
build_config_from_json(tgen, topo)
# Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
# Api call verify whether BGP is converged
ADDR_TYPES = check_address_types()
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
BGP_CONVERGENCE
)
logger.info("Running setup_module() done")
def teardown_module():
"""
Teardown the pytest environment.
* `mod`: module name
"""
logger.info("Running teardown_module to delete topology")
tgen = get_topogen()
# Stop toplogy and Remove tmp files
tgen.stop_topology()
###########################
# Local APIs
###########################
def next_hop_per_address_family(
tgen, dut, peer, addr_type, next_hop_dict, preferred_next_hop=PREFERRED_NEXT_HOP
):
"""
This function returns link_local or global next_hop per address-family
"""
intferface = topo["routers"][peer]["links"]["{}".format(dut)]["interface"]
if addr_type == "ipv6" and "link_local" in preferred_next_hop:
next_hop = get_frr_ipv6_linklocal(tgen, peer, intf=intferface)
else:
next_hop = next_hop_dict[addr_type]
return next_hop
###########################
# TESTCASES
###########################
def test_verify_graceful_shutdown_functionality_with_iBGP_peers_p0(request):
"""
Verify graceful-shutdown functionality with iBGP peers
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
reset_config_on_routers(tgen)
step("Done in base config: Configure base config as per the topology")
step("Base config should be up, verify using BGP convergence")
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Done in base config: Advertise prefixes from R1")
step("Verify BGP routes are received at R4 with best path from R3 to R1")
for addr_type in ADDR_TYPES:
dut = "r4"
next_hop1 = next_hop_per_address_family(
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("On R1 configure:")
step("Create standard bgp community-list to permit graceful-shutdown:")
input_dict_1 = {
"r1": {
"bgp_community_lists": [
{
"community_type": "standard",
"action": "permit",
"name": "GSHUT",
"value": "graceful-shutdown",
}
]
}
}
result = create_bgp_community_lists(tgen, input_dict_1)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Create route-map to set community GSHUT in OUT direction")
input_dict_2 = {
"r1": {
"route_maps": {
"GSHUT-OUT": [
{
"action": "permit",
"seq_id": "10",
"set": {"community": {"num": "graceful-shutdown"}},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_2)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
input_dict_3 = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_3)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step(
"FRR is setting local-pref to 0 by-default on receiver GSHUT community, "
"below step is not needed, but keeping for reference"
)
step(
"On R3, apply route-map IN direction to match GSHUT community "
"and set local-preference to 0."
)
step(
"Verify BGP convergence on R4 and ensure all the neighbours state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Verify BGP routes on R4:")
step("local pref for routes coming from R1 is set to 0.")
for addr_type in ADDR_TYPES:
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}],}}}
static_routes = [NETWORK[addr_type]]
result = verify_bgp_attributes(
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Ensure that best path is selected from R4 to R3.")
for addr_type in ADDR_TYPES:
dut = "r4"
next_hop1 = next_hop_per_address_family(
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
write_test_footer(tc_name)
def test_verify_deleting_re_adding_route_map_with_iBGP_peers_p0(request):
"""
Verify graceful-shutdown functionality after deleting/re-adding route-map
with iBGP peers
"""
tc_name = request.node.name
write_test_header(tc_name)
tgen = get_topogen()
reset_config_on_routers(tgen)
step("Done in base config: Configure base config as per the topology")
step("Base config should be up, verify using BGP convergence")
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Done in base config: Advertise prefixes from R1")
step("Verify BGP routes are received at R4 with best path from R3 to R1")
for addr_type in ADDR_TYPES:
dut = "r4"
next_hop1 = next_hop_per_address_family(
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("On R1 configure:")
step("Create standard bgp community-list to permit graceful-shutdown:")
input_dict_1 = {
"r1": {
"bgp_community_lists": [
{
"community_type": "standard",
"action": "permit",
"name": "GSHUT",
"value": "graceful-shutdown",
}
]
}
}
result = create_bgp_community_lists(tgen, input_dict_1)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Create route-map to set community GSHUT in OUT direction")
input_dict_2 = {
"r1": {
"route_maps": {
"GSHUT-OUT": [
{
"action": "permit",
"seq_id": "10",
"set": {"community": {"num": "graceful-shutdown"}},
}
]
}
}
}
result = create_route_maps(tgen, input_dict_2)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
input_dict_3 = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
"ipv6": {
"unicast": {
"neighbor": {
"r3": {
"dest_link": {
"r1": {
"route_maps": [
{
"name": "GSHUT-OUT",
"direction": "out",
}
]
}
}
}
}
}
},
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_3)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step(
"FRR is setting local-pref to 0 by-default on receiver GSHUT community, "
"below step is not needed, but keeping for reference"
)
step(
"On R3, apply route-map IN direction to match GSHUT community "
"and set local-preference to 0."
)
step(
"Verify BGP convergence on R4 and ensure all the neighbours state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Verify BGP routes on R4:")
step("local pref for routes coming from R1 is set to 0.")
for addr_type in ADDR_TYPES:
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}],}}}
static_routes = [NETWORK[addr_type]]
result = verify_bgp_attributes(
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Ensure that best path is selected from R4 to R3.")
for addr_type in ADDR_TYPES:
dut = "r4"
next_hop1 = next_hop_per_address_family(
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Delete route-map from R1")
del_rmap_dict = {
"r1": {
"route_maps": {
"GSHUT-OUT": [
{
"action": "permit",
"seq_id": "10",
"set": {
"community": {"num": "graceful-shutdown", "delete": True}
},
}
]
}
}
}
result = create_route_maps(tgen, del_rmap_dict)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step(
"Verify BGP convergence on R3 and ensure that all neighbor state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Verify BGP routes on R4:")
step("Ensure that best path is selected from R1->R3")
for addr_type in ADDR_TYPES:
dut = "r4"
next_hop1 = next_hop_per_address_family(
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop=[next_hop1])
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=[next_hop1])
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Re-add route-map in R1")
result = create_route_maps(tgen, input_dict_2)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step(
"Verify BGP convergence on R3 and ensure all the neighbours state "
"is established"
)
result = verify_bgp_convergence(tgen, topo)
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
step("Verify BGP routes on R4:")
step("local pref for routes coming from R1 is set to 0.")
for addr_type in ADDR_TYPES:
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}]}}}
static_routes = [NETWORK[addr_type]]
result = verify_bgp_attributes(
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
step("Ensure that best path is selected from R4 to R3.")
for addr_type in ADDR_TYPES:
dut = "r4"
next_hop1 = next_hop_per_address_family(
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
)
next_hop2 = next_hop_per_address_family(
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
)
input_topo = {key: topo["routers"][key] for key in ["r1"]}
result = verify_bgp_rib(
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
assert result is True, "Test case {} : Failed \n Error: {}".format(
tc_name, result
)
write_test_footer(tc_name)
if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))