tests: Adding timers and bgp 4 bytes ASN test

Signed-off-by: Ashish Pant <ashish12pant@gmail.com>

Adding verify and config apis for bgp timer testcase
Adding verify and config apis for changing ASN to 4 bytes
This commit is contained in:
Ashish Pant 2019-06-25 06:04:17 +05:30
parent e6db2bb135
commit 77ef1af6e3
3 changed files with 749 additions and 2 deletions

View File

@ -53,11 +53,13 @@ from mininet.topo import Topo
from lib.common_config import (
start_topology, stop_topology, write_test_header,
write_test_footer
write_test_footer, reset_config_on_routers
)
from lib.topolog import logger
from lib.bgp import (
verify_bgp_convergence, create_router_bgp, verify_router_id
verify_bgp_convergence, create_router_bgp, verify_router_id,
modify_as_number, verify_as_numbers, clear_bgp_and_verify,
verify_bgp_timers_and_functionality
)
from lib.topojson import build_topo_from_json, build_config_from_json
@ -209,6 +211,106 @@ def test_modify_and_delete_router_id(request):
write_test_footer(tc_name)
def test_bgp_config_with_4byte_as_number(request):
"""
Configure BGP with 4 byte ASN and verify it works fine
"""
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
input_dict = {
"r1": {
"bgp": {
"local_as": 131079
}
},
"r2": {
"bgp": {
"local_as": 131079
}
},
"r3": {
"bgp": {
"local_as": 131079
}
},
"r4": {
"bgp": {
"local_as": 131080
}
}
}
result = modify_as_number(tgen, topo, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
result = verify_as_numbers(tgen, topo, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
def test_bgp_timers_functionality(request):
"""
Test to modify bgp timers and verify timers functionality.
"""
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
pytest.skip('skipped because of BGP Convergence failure')
# test case name
tc_name = request.node.name
write_test_header(tc_name)
# Creating configuration from JSON
reset_config_on_routers(tgen)
# Api call to modfiy BGP timerse
input_dict = {
"r1": {
"bgp": {
"address_family": {
"ipv4": {
"unicast": {
"neighbor": {
"r2": {
"dest_link":{
"r1": {
"keepalivetimer": 60,
"holddowntimer": 180,
}
}
}
}
}
}
}
}
}
}
result = create_router_bgp(tgen, topo, deepcopy(input_dict))
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
# Api call to clear bgp, so timer modification would take place
clear_bgp_and_verify(tgen, topo, 'r1')
# Verifying bgp timers functionality
result = verify_bgp_timers_and_functionality(tgen, topo, input_dict)
assert result is True, "Testcase {} :Failed \n Error: {}". \
format(tc_name, result)
write_test_footer(tc_name)
if __name__ == '__main__':
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))

View File

@ -694,3 +694,565 @@ def verify_bgp_convergence(tgen, topo):
logger.info("Exiting API: verify_bgp_confergence()")
return True
def modify_as_number(tgen, topo, input_dict):
"""
API reads local_as and remote_as from user defined input_dict and
modify router"s ASNs accordingly. Router"s config is modified and
recent/changed config is loadeded to router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : defines for which router ASNs needs to be modified
Usage
-----
To modify ASNs for router r1
input_dict = {
"r1": {
"bgp": {
"local_as": 131079
}
}
result = modify_as_number(tgen, topo, input_dict)
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: modify_as_number()")
try:
new_topo = deepcopy(topo["routers"])
router_dict = {}
for router in input_dict.keys():
# Remove bgp configuration
router_dict.update({
router: {
"bgp": {
"delete": True
}
}
})
new_topo[router]["bgp"]["local_as"] = \
input_dict[router]["bgp"]["local_as"]
logger.info("Removing bgp configuration")
create_router_bgp(tgen, topo, router_dict)
logger.info("Applying modified bgp configuration")
create_router_bgp(tgen, new_topo)
except Exception as e:
# handle any exception
logger.error("Error %s occured. Arguments %s.", e.message, e.args)
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.info("Exiting lib API: modify_as_number()")
return True
def verify_as_numbers(tgen, topo, input_dict):
"""
This API is to verify AS numbers for given DUT by running
"show ip bgp neighbor json" command. Local AS and Remote AS
will ve verified with input_dict data and command output.
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
* `addr_type` : ip type, ipv4/ipv6
* `input_dict`: defines - for which router, AS numbers needs to be verified
Usage
-----
input_dict = {
"r1": {
"bgp": {
"local_as": 131079
}
}
}
result = verify_as_numbers(tgen, topo, addr_type, input_dict)
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: verify_as_numbers()")
for router in input_dict.keys():
if router not in tgen.routers():
continue
rnode = tgen.routers()[router]
logger.info("Verifying AS numbers for dut %s:", router)
show_ip_bgp_neighbor_json = rnode.vtysh_cmd(
"show ip bgp neighbor json", isjson=True)
local_as = input_dict[router]["bgp"]["local_as"]
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type:
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
"neighbor"]
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
for dest_link, peer_dict in peer_data["dest_link"].iteritems():
neighbor_ip = None
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
neighbor_ip = data[dest_link][addr_type]. \
split("/")[0]
neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
# Verify Local AS for router
if neigh_data["localAs"] != local_as:
errormsg = "Failed: Verify local_as for dut {}," \
" found: {} but expected: {}".format(
router, neigh_data["localAs"],
local_as)
return errormsg
else:
logger.info("Verified local_as for dut %s, found"
" expected: %s", router, local_as)
# Verify Remote AS for neighbor
if neigh_data["remoteAs"] != remote_as:
errormsg = "Failed: Verify remote_as for dut " \
"{}'s neighbor {}, found: {} but " \
"expected: {}".format(
router, bgp_neighbor,
neigh_data["remoteAs"], remote_as)
return errormsg
else:
logger.info("Verified remote_as for dut %s's "
"neighbor %s, found expected: %s",
router, bgp_neighbor, remote_as)
logger.info("Exiting lib API: verify_AS_numbers()")
return True
def clear_bgp_and_verify(tgen, topo, router):
"""
This API is to clear bgp neighborship and verify bgp neighborship
is coming up(BGP is converged) usinf "show bgp summary json" command
and also verifying for all bgp neighbors uptime before and after
clear bgp sessions is different as the uptime must be changed once
bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
* `router`: device under test
Usage
-----
result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: clear_bgp_and_verify()")
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
peer_uptime_before_clear_bgp = {}
# Verifying BGP convergence before bgp clear command
for retry in range(1, 11):
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
logger.info(show_bgp_json)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on router"
" %s...", sleeptime, router)
sleep(sleeptime)
else:
errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
total_peer = 0
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
for addr_type in bgp_addr_type:
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link, peer_dict in peer_data["dest_link"].iteritems():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
neighbor_ip = data[dest_link][addr_type].split(
"/")[0]
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"][
"peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
# Peer up time dictionary
peer_uptime_before_clear_bgp[bgp_neighbor] = \
ipv4_data[neighbor_ip]["peerUptime"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"][
"peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
# Peer up time dictionary
peer_uptime_before_clear_bgp[bgp_neighbor] = \
ipv6_data[neighbor_ip]["peerUptime"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s before bgp"
" clear", router)
break
else:
logger.warning("BGP is not yet Converged for router %s "
"before bgp clear", router)
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
for addr_type in bgp_addr_type.keys():
if addr_type == "ipv4":
rnode.vtysh_cmd("clear ip bgp *")
elif addr_type == "ipv6":
rnode.vtysh_cmd("clear bgp ipv6 *")
peer_uptime_after_clear_bgp = {}
# Verifying BGP convergence after bgp clear command
for retry in range(1, 11):
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on router"
" %s...", sleeptime, router)
sleep(sleeptime)
else:
errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
total_peer = 0
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
for addr_type in bgp_addr_type:
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link, peer_dict in peer_data["dest_link"].iteritems():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
neighbor_ip = data[dest_link][addr_type]. \
split("/")[0]
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"][
"peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
peer_uptime_after_clear_bgp[bgp_neighbor] = \
ipv4_data[neighbor_ip]["peerUptime"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"][
"peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
# Peer up time dictionary
peer_uptime_after_clear_bgp[bgp_neighbor] = \
ipv6_data[neighbor_ip]["peerUptime"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s after bgp clear",
router)
break
else:
logger.warning("BGP is not yet Converged for router %s after"
" bgp clear", router)
# Compariung peerUptime dictionaries
if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
logger.info("BGP neighborship is reset after clear BGP on router %s",
router)
else:
errormsg = "BGP neighborship is not reset after clear bgp on router" \
" {}".format(router)
return errormsg
logger.info("Exiting lib API: clear_bgp_and_verify()")
return True
def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
"""
To verify BGP timer config, execute "show ip bgp neighbor json" command
and verify bgp timers with input_dict data.
To veirfy bgp timers functonality, shutting down peer interface
and verify BGP neighborship status.
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
* `addr_type`: ip type, ipv4/ipv6
* `input_dict`: defines for which router, bgp timers needs to be verified
Usage:
# To verify BGP timers for neighbor r2 of router r1
input_dict = {
"r1": {
"bgp": {
"bgp_neighbors":{
"r2":{
"keepalivetimer": 5,
"holddowntimer": 15,
}}}}}
result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
input_dict)
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: verify_bgp_timers_and_functionality()")
sleep(5)
router_list = tgen.routers()
for router in input_dict.keys():
if router not in router_list:
continue
rnode = router_list[router]
logger.info("Verifying bgp timers functionality, DUT is %s:",
router)
show_ip_bgp_neighbor_json = \
rnode.vtysh_cmd("show ip bgp neighbor json", isjson=True)
bgp_addr_type = input_dict[router]["bgp"]["address_family"]
for addr_type in bgp_addr_type:
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
"neighbor"]
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link, peer_dict in peer_data["dest_link"].iteritems():
data = topo["routers"][bgp_neighbor]["links"]
keepalivetimer = peer_dict["keepalivetimer"]
holddowntimer = peer_dict["holddowntimer"]
if dest_link in data:
neighbor_ip = data[dest_link][addr_type]. \
split("/")[0]
neighbor_intf = data[dest_link]["interface"]
# Verify HoldDownTimer for neighbor
bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[
neighbor_ip]["bgpTimerHoldTimeMsecs"]
if bgpHoldTimeMsecs != holddowntimer * 1000:
errormsg = "Verifying holddowntimer for bgp " \
"neighbor {} under dut {}, found: {} " \
"but expected: {}".format(
neighbor_ip, router,
bgpHoldTimeMsecs,
holddowntimer * 1000)
return errormsg
# Verify KeepAliveTimer for neighbor
bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[
neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"]
if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
errormsg = "Verifying keepalivetimer for bgp " \
"neighbor {} under dut {}, found: {} " \
"but expected: {}".format(
neighbor_ip, router,
bgpKeepAliveTimeMsecs,
keepalivetimer * 1000)
return errormsg
####################
# Shutting down peer interface after keepalive time and
# after some time bringing up peer interface.
# verifying BGP neighborship in (hold down-keep alive)
# time, it should not go down
####################
# Wait till keep alive time
logger.info("=" * 20)
logger.info("Scenario 1:")
logger.info("Shutdown and bring up peer interface: %s "
"in keep alive time : %s sec and verify "
" BGP neighborship is intact in %s sec ",
neighbor_intf, keepalivetimer,
(holddowntimer - keepalivetimer))
logger.info("=" * 20)
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
# Shutting down peer ineterface
logger.info("Shutting down interface %s on router %s",
neighbor_intf, bgp_neighbor)
topotest.interface_set_status(
router_list[bgp_neighbor], neighbor_intf,
ifaceaction=False)
# Bringing up peer interface
sleep(5)
logger.info("Bringing up interface %s on router %s..",
neighbor_intf, bgp_neighbor)
topotest.interface_set_status(
router_list[bgp_neighbor], neighbor_intf,
ifaceaction=True)
# Verifying BGP neighborship is intact in
# (holddown - keepalive) time
for timer in range(keepalivetimer, holddowntimer,
int(holddowntimer / 3)):
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
sleep(2)
show_bgp_json = \
rnode.vtysh_cmd("show bgp summary json",
isjson=True)
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
if timer == \
(holddowntimer - keepalivetimer):
if nh_state != "Established":
errormsg = "BGP neighborship has not gone " \
"down in {} sec for neighbor {}\n" \
"show_bgp_json: \n {} ".format(
timer, bgp_neighbor,
show_bgp_json)
return errormsg
else:
logger.info("BGP neighborship is intact in %s"
" sec for neighbor %s \n "
"show_bgp_json : \n %s",
timer, bgp_neighbor,
show_bgp_json)
####################
# Shutting down peer interface and verifying that BGP
# neighborship is going down in holddown time
####################
logger.info("=" * 20)
logger.info("Scenario 2:")
logger.info("Shutdown peer interface: %s and verify BGP"
" neighborship has gone down in hold down "
"time %s sec", neighbor_intf, holddowntimer)
logger.info("=" * 20)
logger.info("Shutting down interface %s on router %s..",
neighbor_intf, bgp_neighbor)
topotest.interface_set_status(router_list[bgp_neighbor],
neighbor_intf,
ifaceaction=False)
# Verifying BGP neighborship is going down in holddown time
for timer in range(keepalivetimer,
(holddowntimer + keepalivetimer),
int(holddowntimer / 3)):
logger.info("Waiting for %s sec..", keepalivetimer)
sleep(keepalivetimer)
sleep(2)
show_bgp_json = \
rnode.vtysh_cmd("show bgp summary json",
isjson=True)
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
if timer == holddowntimer:
if nh_state == "Established":
errormsg = "BGP neighborship has not gone " \
"down in {} sec for neighbor {}\n" \
"show_bgp_json: \n {} ".format(
timer, bgp_neighbor,
show_bgp_json)
return errormsg
else:
logger.info("BGP neighborship has gone down in"
" %s sec for neighbor %s \n"
"show_bgp_json : \n %s",
timer, bgp_neighbor,
show_bgp_json)
logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
return True

View File

@ -20,6 +20,7 @@
from collections import OrderedDict
from datetime import datetime
import StringIO
import os
import ConfigParser
import traceback
@ -194,6 +195,88 @@ def create_common_configuration(tgen, router, data, config_type=None,
return True
def reset_config_on_routers(tgen, routerName=None):
"""
Resets configuration on routers to the snapshot created using input JSON
file. It replaces existing router configuration with FRRCFG_BKUP_FILE
Parameters
----------
* `tgen` : Topogen object
* `routerName` : router config is to be reset
"""
logger.debug("Entering API: reset_config_on_routers")
router_list = tgen.routers()
for rname, router in router_list.iteritems():
if routerName and routerName != rname:
continue
cfg = router.run("vtysh -c 'show running'")
fname = "{}/{}/frr.sav".format(TMPDIR, rname)
dname = "{}/{}/delta.conf".format(TMPDIR, rname)
f = open(fname, "w")
for line in cfg.split("\n"):
line = line.strip()
if (line == "Building configuration..." or
line == "Current configuration:" or
not line):
continue
f.write(line)
f.write("\n")
f.close()
command = "/usr/lib/frr/frr-reload.py --input {}/{}/frr.sav" \
" --test {}/{}/frr_json_initial.conf > {}". \
format(TMPDIR, rname, TMPDIR, rname, dname)
result = os.system(command)
# Assert if command fail
if result > 0:
errormsg = ("Command:{} is failed due to non-zero exit"
" code".format(command))
return errormsg
f = open(dname, "r")
delta = StringIO.StringIO()
delta.write("configure terminal\n")
t_delta = f.read()
for line in t_delta.split("\n"):
line = line.strip()
if (line == "Lines To Delete" or
line == "===============" or
line == "Lines To Add" or
line == "============" or
not line):
continue
delta.write(line)
delta.write("\n")
delta.write("end\n")
output = router.vtysh_multicmd(delta.getvalue(),
pretty_output=False)
logger.info("New configuration for router {}:".format(rname))
delta.close()
delta = StringIO.StringIO()
cfg = router.run("vtysh -c 'show running'")
for line in cfg.split("\n"):
line = line.strip()
delta.write(line)
delta.write("\n")
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
logger.info(delta.getvalue())
delta.close()
logger.debug("Exting API: reset_config_on_routers")
return True
def load_config_to_router(tgen, routerName, save_bkup=False):
"""
Loads configuration on router from the file FRRCFG_FILE.