Merge pull request #4852 from ashish12pant/fix_log

tests: Enhance execution logs in topojson
This commit is contained in:
Martin Winter 2019-08-29 04:35:37 +02:00 committed by GitHub
commit 4298dfd12e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 485 additions and 403 deletions

View File

@ -219,7 +219,8 @@ def test_next_hop_attribute(request):
dut = "r1"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
# Configure next-hop-self to bgp neighbor
input_dict_1 = {
@ -484,7 +485,7 @@ def test_localpref_attribute(request):
"neighbor": {
"r1": {
"dest_link": {
"r3": {
"r2": {
"route_maps": [
{"name": "RMAP_LOCAL_PREF",
"direction": "in"}
@ -499,6 +500,7 @@ def test_localpref_attribute(request):
}
}
}
result = create_router_bgp(tgen, topo, input_dict_4)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)

View File

@ -386,8 +386,8 @@ def test_ip_prefix_lists_out_permit(request):
tc_name, result)
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
write_test_footer(tc_name)
@ -497,8 +497,8 @@ def test_ip_prefix_lists_in_deny_and_permit_any(request):
dut = "r3"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
write_test_footer(tc_name)
@ -542,7 +542,6 @@ def test_delete_prefix_lists(request):
result = verify_prefix_lists(tgen, input_dict_2)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
logger.info(result)
# Delete prefix list
input_dict_2 = {
@ -714,9 +713,8 @@ def test_ip_prefix_lists_out_deny_and_permit_any(request):
dut = "r4"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
write_test_footer(tc_name)
@ -859,8 +857,8 @@ def test_modify_prefix_lists_in_permit_to_deny(request):
dut = "r3"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
write_test_footer(tc_name)
@ -972,8 +970,8 @@ def test_modify_prefix_lists_in_deny_to_permit(request):
dut = "r3"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
# Modify ip prefix list
input_dict_1 = {
@ -1152,8 +1150,8 @@ def test_modify_prefix_lists_out_permit_to_deny(request):
dut = "r4"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
write_test_footer(tc_name)
@ -1265,8 +1263,8 @@ def test_modify_prefix_lists_out_deny_to_permit(request):
dut = "r4"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
# Modify ip prefix list
input_dict_1 = {
@ -1439,8 +1437,8 @@ def test_ip_prefix_lists_implicit_deny(request):
dut = "r4"
protocol = "bgp"
result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol, expected=False)
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
tc_name, result)
assert result is not True, "Testcase {} : Failed \n Error: Routes still" \
" present in RIB".format(tc_name)
write_test_footer(tc_name)

View File

@ -32,7 +32,8 @@ from lib.common_config import (create_common_configuration,
load_config_to_router,
check_address_types,
generate_ips,
find_interface_with_greater_ip)
find_interface_with_greater_ip,
run_frr_cmd, retry)
BGP_CONVERGENCE_TIMEOUT = 10
@ -116,8 +117,8 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False):
logger.debug("Router %s: 'bgp' not present in input_dict", router)
continue
result = __create_bgp_global(tgen, input_dict, router, build)
if result is True:
data_all_bgp = __create_bgp_global(tgen, input_dict, router, build)
if data_all_bgp:
bgp_data = input_dict[router]["bgp"]
bgp_addr_data = bgp_data.setdefault("address_family", {})
@ -134,8 +135,18 @@ def create_router_bgp(tgen, topo, input_dict=None, build=False):
or ipv6_data.setdefault("unicast", {}) else False
if neigh_unicast:
result = __create_bgp_unicast_neighbor(
tgen, topo, input_dict, router, build)
data_all_bgp = __create_bgp_unicast_neighbor(
tgen, topo, input_dict, router,
config_data=data_all_bgp)
try:
result = create_common_configuration(tgen, router, data_all_bgp,
"bgp", build)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.debug("Exiting lib API: create_router_bgp()")
return result
@ -157,77 +168,66 @@ def __create_bgp_global(tgen, input_dict, router, build=False):
True or False
"""
result = False
logger.debug("Entering lib API: __create_bgp_global()")
try:
bgp_data = input_dict[router]["bgp"]
del_bgp_action = bgp_data.setdefault("delete", False)
if del_bgp_action:
config_data = ["no router bgp"]
result = create_common_configuration(tgen, router, config_data,
"bgp", build=build)
return result
bgp_data = input_dict[router]["bgp"]
del_bgp_action = bgp_data.setdefault("delete", False)
if del_bgp_action:
config_data = ["no router bgp"]
config_data = []
return config_data
if "local_as" not in bgp_data and build:
logger.error("Router %s: 'local_as' not present in input_dict"
"for BGP", router)
return False
config_data = []
local_as = bgp_data.setdefault("local_as", "")
cmd = "router bgp {}".format(local_as)
vrf_id = bgp_data.setdefault("vrf", None)
if vrf_id:
cmd = "{} vrf {}".format(cmd, vrf_id)
if "local_as" not in bgp_data and build:
logger.error("Router %s: 'local_as' not present in input_dict"
"for BGP", router)
return False
config_data.append(cmd)
local_as = bgp_data.setdefault("local_as", "")
cmd = "router bgp {}".format(local_as)
vrf_id = bgp_data.setdefault("vrf", None)
if vrf_id:
cmd = "{} vrf {}".format(cmd, vrf_id)
router_id = bgp_data.setdefault("router_id", None)
del_router_id = bgp_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no bgp router-id")
if router_id:
config_data.append("bgp router-id {}".format(
router_id))
config_data.append(cmd)
aggregate_address = bgp_data.setdefault("aggregate_address",
{})
if aggregate_address:
network = aggregate_address.setdefault("network", None)
if not network:
logger.error("Router %s: 'network' not present in "
"input_dict for BGP", router)
else:
cmd = "aggregate-address {}".format(network)
router_id = bgp_data.setdefault("router_id", None)
del_router_id = bgp_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no bgp router-id")
if router_id:
config_data.append("bgp router-id {}".format(
router_id))
as_set = aggregate_address.setdefault("as_set", False)
summary = aggregate_address.setdefault("summary", False)
del_action = aggregate_address.setdefault("delete", False)
if as_set:
cmd = "{} {}".format(cmd, "as-set")
if summary:
cmd = "{} {}".format(cmd, "summary")
aggregate_address = bgp_data.setdefault("aggregate_address",
{})
if aggregate_address:
network = aggregate_address.setdefault("network", None)
if not network:
logger.error("Router %s: 'network' not present in "
"input_dict for BGP", router)
else:
cmd = "aggregate-address {}".format(network)
if del_action:
cmd = "no {}".format(cmd)
as_set = aggregate_address.setdefault("as_set", False)
summary = aggregate_address.setdefault("summary", False)
del_action = aggregate_address.setdefault("delete", False)
if as_set:
cmd = "{} {}".format(cmd, "as-set")
if summary:
cmd = "{} {}".format(cmd, "summary")
config_data.append(cmd)
if del_action:
cmd = "no {}".format(cmd)
result = create_common_configuration(tgen, router, config_data,
"bgp", build=build)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
config_data.append(cmd)
logger.debug("Exiting lib API: create_bgp_global()")
return result
return config_data
def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, build=False):
def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router,
config_data=None):
"""
Helper API to create configuration for address-family unicast
@ -240,124 +240,118 @@ def __create_bgp_unicast_neighbor(tgen, topo, input_dict, router, build=False):
* `build` : Only for initial setup phase this is set as True.
"""
result = False
logger.debug("Entering lib API: __create_bgp_unicast_neighbor()")
try:
config_data = ["router bgp"]
bgp_data = input_dict[router]["bgp"]["address_family"]
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict:
continue
add_neigh = True
if "router bgp "in config_data:
add_neigh = False
bgp_data = input_dict[router]["bgp"]["address_family"]
if not check_address_types(addr_type):
continue
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict:
continue
if not check_address_types(addr_type):
continue
addr_data = addr_dict["unicast"]
if addr_data:
config_data.append("address-family {} unicast".format(
addr_type
))
addr_data = addr_dict["unicast"]
advertise_network = addr_data.setdefault("advertise_networks",
[])
for advertise_network_dict in advertise_network:
network = advertise_network_dict["network"]
if type(network) is not list:
network = [network]
advertise_network = addr_data.setdefault("advertise_networks",
[])
for advertise_network_dict in advertise_network:
network = advertise_network_dict["network"]
if type(network) is not list:
network = [network]
if "no_of_network" in advertise_network_dict:
no_of_network = advertise_network_dict["no_of_network"]
if "no_of_network" in advertise_network_dict:
no_of_network = advertise_network_dict["no_of_network"]
else:
no_of_network = 1
del_action = advertise_network_dict.setdefault("delete",
False)
# Generating IPs for verification
prefix = str(
ipaddr.IPNetwork(unicode(network[0])).prefixlen)
network_list = generate_ips(network, no_of_network)
for ip in network_list:
ip = str(ipaddr.IPNetwork(unicode(ip)).network)
cmd = "network {}/{}".format(ip, prefix)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
max_paths = addr_data.setdefault("maximum_paths", {})
if max_paths:
ibgp = max_paths.setdefault("ibgp", None)
ebgp = max_paths.setdefault("ebgp", None)
if ibgp:
config_data.append("maximum-paths ibgp {}".format(
ibgp
))
if ebgp:
config_data.append("maximum-paths {}".format(
ebgp
))
aggregate_address = addr_data.setdefault("aggregate_address",
{})
if aggregate_address:
ip = aggregate_address("network", None)
attribute = aggregate_address("attribute", None)
if ip:
cmd = "aggregate-address {}".format(ip)
if attribute:
cmd = "{} {}".format(cmd, attribute)
config_data.append(cmd)
redistribute_data = addr_data.setdefault("redistribute", {})
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
logger.error("Router %s: 'redist_type' not present in "
"input_dict", router)
else:
no_of_network = 1
del_action = advertise_network_dict.setdefault("delete",
False)
# Generating IPs for verification
prefix = str(
ipaddr.IPNetwork(unicode(network[0])).prefixlen)
network_list = generate_ips(network, no_of_network)
for ip in network_list:
ip = str(ipaddr.IPNetwork(unicode(ip)).network)
cmd = "network {}/{}\n".format(ip, prefix)
cmd = "redistribute {}".format(
redistribute["redist_type"])
redist_attr = redistribute.setdefault("attribute",
None)
if redist_attr:
cmd = "{} {}".format(cmd, redist_attr)
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
max_paths = addr_data.setdefault("maximum_paths", {})
if max_paths:
ibgp = max_paths.setdefault("ibgp", None)
ebgp = max_paths.setdefault("ebgp", None)
if ibgp:
config_data.append("maximum-paths ibgp {}".format(
ibgp
))
if ebgp:
config_data.append("maximum-paths {}".format(
ebgp
))
if "neighbor" in addr_data:
neigh_data = __create_bgp_neighbor(topo, input_dict,
router, addr_type, add_neigh)
config_data.extend(neigh_data)
aggregate_address = addr_data.setdefault("aggregate_address",
{})
if aggregate_address:
ip = aggregate_address("network", None)
attribute = aggregate_address("attribute", None)
if ip:
cmd = "aggregate-address {}".format(ip)
if attribute:
cmd = "{} {}".format(cmd, attribute)
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict or not check_address_types(addr_type):
continue
config_data.append(cmd)
addr_data = addr_dict["unicast"]
if "neighbor" in addr_data:
neigh_addr_data = __create_bgp_unicast_address_family(
topo, input_dict, router, addr_type, add_neigh)
redistribute_data = addr_data.setdefault("redistribute", {})
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
logger.error("Router %s: 'redist_type' not present in "
"input_dict", router)
else:
cmd = "redistribute {}".format(
redistribute["redist_type"])
redist_attr = redistribute.setdefault("attribute",
None)
if redist_attr:
cmd = "{} {}".format(cmd, redist_attr)
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
config_data.extend(neigh_addr_data)
if "neighbor" in addr_data:
neigh_data = __create_bgp_neighbor(topo, input_dict,
router, addr_type)
config_data.extend(neigh_data)
for addr_type, addr_dict in bgp_data.iteritems():
if not addr_dict or not check_address_types(addr_type):
continue
addr_data = addr_dict["unicast"]
if "neighbor" in addr_data:
neigh_addr_data = __create_bgp_unicast_address_family(
topo, input_dict, router, addr_type)
config_data.extend(neigh_addr_data)
result = create_common_configuration(tgen, router, config_data,
None, build=build)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.debug("Exiting lib API: __create_bgp_unicast_neighbor()")
return result
return config_data
def __create_bgp_neighbor(topo, input_dict, router, addr_type):
def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
"""
Helper API to create neighbor specific configuration
@ -391,7 +385,8 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type):
neigh_cxt = "neighbor {}".format(ip_addr)
config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
if add_neigh:
config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
if addr_type == "ipv6":
config_data.append("address-family ipv6 unicast")
config_data.append("{} activate".format(neigh_cxt))
@ -429,7 +424,8 @@ def __create_bgp_neighbor(topo, input_dict, router, addr_type):
return config_data
def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type):
def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type,
add_neigh=True):
"""
API prints bgp global config to bgp_json file.
@ -474,9 +470,9 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type):
dest_link]["ipv6"].split("/")[0]
neigh_cxt = "neighbor {}".format(ip_addr)
config_data.append("address-family {} unicast".format(
addr_type
))
#config_data.append("address-family {} unicast".format(
# addr_type
#))
if deactivate:
config_data.append(
"no neighbor {} activate".format(deactivate))
@ -531,6 +527,7 @@ def __create_bgp_unicast_address_family(topo, input_dict, router, addr_type):
#############################################
# Verification APIs
#############################################
@retry(attempts=3, wait=2, return_is_str=True)
def verify_router_id(tgen, topo, input_dict):
"""
Running command "show ip bgp json" for DUT and reading router-id
@ -565,7 +562,7 @@ def verify_router_id(tgen, topo, input_dict):
errormsg(str) or True
"""
logger.info("Entering lib API: verify_router_id()")
logger.debug("Entering lib API: verify_router_id()")
for router in input_dict.keys():
if router not in tgen.routers():
continue
@ -576,9 +573,9 @@ def verify_router_id(tgen, topo, input_dict):
"del_router_id", False)
logger.info("Checking router %s router-id", router)
show_bgp_json = rnode.vtysh_cmd("show ip bgp json",
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
isjson=True)
router_id_out = show_bgp_json["routerId"]
router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
router_id_out = ipaddr.IPv4Address(unicode(router_id_out))
# Once router-id is deleted, highest interface ip should become
@ -598,100 +595,84 @@ def verify_router_id(tgen, topo, input_dict):
router_id_out)
return errormsg
logger.info("Exiting lib API: verify_router_id()")
logger.debug("Exiting lib API: verify_router_id()")
return True
@retry(attempts=20, wait=2, return_is_str=True)
def verify_bgp_convergence(tgen, topo):
"""
API will verify if BGP is converged with in the given time frame.
Running "show bgp summary json" command and verify bgp neighbor
state is established,
Parameters
----------
* `tgen`: topogen object
* `topo`: input json file data
* `addr_type`: ip_type, ipv4/ipv6
Usage
-----
# To veriry is BGP is converged for all the routers used in
topology
results = verify_bgp_convergence(tgen, topo, "ipv4")
Returns
-------
errormsg(str) or True
"""
logger.info("Entering lib API: verify_bgp_confergence()")
logger.debug("Entering lib API: verify_bgp_convergence()")
for router, rnode in tgen.routers().iteritems():
logger.info("Verifying BGP Convergence on router %s:", router)
logger.info("Verifying BGP Convergence on router %s", router)
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
for retry in range(1, 11):
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
return errormsg
# To find neighbor ip type
# To find neighbor ip type
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
total_peer = 0
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
for bgp_neighbor in bgp_neighbors:
total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type):
continue
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
for addr_type in bgp_addr_type.keys():
bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
neighbor_ip = \
data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"][
"peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"][
"peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
no_of_peer = 0
for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
for dest_link in peer_data["dest_link"].keys():
data = topo["routers"][bgp_neighbor]["links"]
if dest_link in data:
neighbor_ip = \
data[dest_link][addr_type].split("/")[0]
if addr_type == "ipv4":
ipv4_data = show_bgp_json["ipv4Unicast"][
"peers"]
nh_state = ipv4_data[neighbor_ip]["state"]
else:
ipv6_data = show_bgp_json["ipv6Unicast"][
"peers"]
nh_state = ipv6_data[neighbor_ip]["state"]
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s", router)
else:
errormsg = "BGP is not converged for router {}".format(
router)
return errormsg
if nh_state == "Established":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("BGP is Converged for router %s", router)
break
else:
logger.warning("BGP is not yet Converged for router %s",
router)
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on"
" router %s...", sleeptime, router)
sleep(sleeptime)
else:
show_bgp_summary = rnode.vtysh_cmd("show bgp summary")
errormsg = "TIMEOUT!! BGP is not converged in {} " \
"seconds for router {} \n {}".format(
BGP_CONVERGENCE_TIMEOUT, router,
show_bgp_summary)
return errormsg
logger.info("Exiting API: verify_bgp_confergence()")
logger.debug("Exiting API: verify_bgp_convergence()")
return True
@ -723,7 +704,7 @@ def modify_as_number(tgen, topo, input_dict):
errormsg(str) or True
"""
logger.info("Entering lib API: modify_as_number()")
logger.debug("Entering lib API: modify_as_number()")
try:
new_topo = deepcopy(topo["routers"])
@ -757,11 +738,12 @@ def modify_as_number(tgen, topo, input_dict):
logger.error(errormsg)
return errormsg
logger.info("Exiting lib API: modify_as_number()")
logger.debug("Exiting lib API: modify_as_number()")
return True
@retry(attempts=3, wait=2, return_is_str=True)
def verify_as_numbers(tgen, topo, input_dict):
"""
This API is to verify AS numbers for given DUT by running
@ -791,7 +773,7 @@ def verify_as_numbers(tgen, topo, input_dict):
errormsg(str) or True
"""
logger.info("Entering lib API: verify_as_numbers()")
logger.debug("Entering lib API: verify_as_numbers()")
for router in input_dict.keys():
if router not in tgen.routers():
continue
@ -800,7 +782,7 @@ def verify_as_numbers(tgen, topo, input_dict):
logger.info("Verifying AS numbers for dut %s:", router)
show_ip_bgp_neighbor_json = rnode.vtysh_cmd(
show_ip_bgp_neighbor_json = run_frr_cmd(rnode,
"show ip bgp neighbor json", isjson=True)
local_as = input_dict[router]["bgp"]["local_as"]
bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
@ -846,7 +828,7 @@ def verify_as_numbers(tgen, topo, input_dict):
"neighbor %s, found expected: %s",
router, bgp_neighbor, remote_as)
logger.info("Exiting lib API: verify_AS_numbers()")
logger.debug("Exiting lib API: verify_AS_numbers()")
return True
@ -873,7 +855,7 @@ def clear_bgp_and_verify(tgen, topo, router):
errormsg(str) or True
"""
logger.info("Entering lib API: clear_bgp_and_verify()")
logger.debug("Entering lib API: clear_bgp_and_verify()")
if router not in tgen.routers():
return False
@ -883,20 +865,14 @@ def clear_bgp_and_verify(tgen, topo, router):
peer_uptime_before_clear_bgp = {}
# Verifying BGP convergence before bgp clear command
for retry in range(1, 11):
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on router"
" %s...", sleeptime, router)
sleep(sleeptime)
else:
errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
sleeptime = 3
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on router"
" %s...", sleeptime, router)
sleep(sleeptime)
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
isjson=True)
logger.info(show_bgp_json)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
errormsg = "BGP is not running"
@ -950,33 +926,33 @@ def clear_bgp_and_verify(tgen, topo, router):
" clear", router)
break
else:
logger.warning("BGP is not yet Converged for router %s "
"before bgp clear", router)
logger.info("BGP is not yet Converged for router %s "
"before bgp clear", router)
else:
errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \
" router {}".format(router)
return errormsg
logger.info(peer_uptime_before_clear_bgp)
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
for addr_type in bgp_addr_type.keys():
if addr_type == "ipv4":
rnode.vtysh_cmd("clear ip bgp *")
run_frr_cmd(rnode, "clear ip bgp *")
elif addr_type == "ipv6":
rnode.vtysh_cmd("clear bgp ipv6 *")
run_frr_cmd(rnode, "clear bgp ipv6 *")
peer_uptime_after_clear_bgp = {}
# Verifying BGP convergence after bgp clear command
for retry in range(1, 11):
sleeptime = 2 * retry
if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on router"
" %s...", sleeptime, router)
sleep(sleeptime)
else:
errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
" router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
return errormsg
for retry in range(11):
sleeptime = 3
# Waiting for BGP to converge
logger.info("Waiting for %s sec for BGP to converge on router"
" %s...", sleeptime, router)
sleep(sleeptime)
show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
show_bgp_json = run_frr_cmd(rnode, "show bgp summary json",
isjson=True)
# Verifying output dictionary show_bgp_json is empty or not
if not bool(show_bgp_json):
@ -1028,9 +1004,12 @@ def clear_bgp_and_verify(tgen, topo, router):
router)
break
else:
logger.warning("BGP is not yet Converged for router %s after"
" bgp clear", router)
logger.info("BGP is not yet Converged for router %s after"
" bgp clear", router)
else:
errormsg = "TIMEOUT!! BGP is not converged in 30 seconds for" \
" router {}".format(router)
return errormsg
logger.info(peer_uptime_after_clear_bgp)
# Comparing peerUptimeEstablishedEpoch dictionaries
if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
@ -1041,7 +1020,7 @@ def clear_bgp_and_verify(tgen, topo, router):
" {}".format(router)
return errormsg
logger.info("Exiting lib API: clear_bgp_and_verify()")
logger.debug("Exiting lib API: clear_bgp_and_verify()")
return True
@ -1077,7 +1056,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
errormsg(str) or True
"""
logger.info("Entering lib API: verify_bgp_timers_and_functionality()")
logger.debug("Entering lib API: verify_bgp_timers_and_functionality()")
sleep(5)
router_list = tgen.routers()
for router in input_dict.keys():
@ -1090,7 +1069,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
router)
show_ip_bgp_neighbor_json = \
rnode.vtysh_cmd("show ip bgp neighbor json", isjson=True)
run_frr_cmd(rnode, "show ip bgp neighbor json", isjson=True)
bgp_addr_type = input_dict[router]["bgp"]["address_family"]
@ -1178,7 +1157,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
sleep(keepalivetimer)
sleep(2)
show_bgp_json = \
rnode.vtysh_cmd("show bgp summary json",
run_frr_cmd(rnode, "show bgp summary json",
isjson=True)
if addr_type == "ipv4":
@ -1192,17 +1171,13 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
(holddowntimer - keepalivetimer):
if nh_state != "Established":
errormsg = "BGP neighborship has not gone " \
"down in {} sec for neighbor {}\n" \
"show_bgp_json: \n {} ".format(
timer, bgp_neighbor,
show_bgp_json)
"down in {} sec for neighbor {}" \
.format(timer, bgp_neighbor)
return errormsg
else:
logger.info("BGP neighborship is intact in %s"
" sec for neighbor %s \n "
"show_bgp_json : \n %s",
timer, bgp_neighbor,
show_bgp_json)
" sec for neighbor %s",
timer, bgp_neighbor)
####################
# Shutting down peer interface and verifying that BGP
@ -1229,7 +1204,7 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
sleep(keepalivetimer)
sleep(2)
show_bgp_json = \
rnode.vtysh_cmd("show bgp summary json",
run_frr_cmd(rnode, "show bgp summary json",
isjson=True)
if addr_type == "ipv4":
@ -1242,22 +1217,19 @@ def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
if timer == holddowntimer:
if nh_state == "Established":
errormsg = "BGP neighborship has not gone " \
"down in {} sec for neighbor {}\n" \
"show_bgp_json: \n {} ".format(
timer, bgp_neighbor,
show_bgp_json)
"down in {} sec for neighbor {}" \
.format(timer, bgp_neighbor)
return errormsg
else:
logger.info("BGP neighborship has gone down in"
" %s sec for neighbor %s \n"
"show_bgp_json : \n %s",
timer, bgp_neighbor,
show_bgp_json)
" %s sec for neighbor %s",
timer, bgp_neighbor)
logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
logger.debug("Exiting lib API: verify_bgp_timers_and_functionality()")
return True
@retry(attempts=3, wait=2, return_is_str=True)
def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
attribute):
"""
@ -1319,7 +1291,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
sleep(2)
logger.info("Verifying router %s RIB for best path:", router)
sh_ip_bgp_json = rnode.vtysh_cmd(command, isjson=True)
sh_ip_bgp_json = run_frr_cmd(rnode, command, isjson=True)
for route_val in input_dict.values():
net_data = route_val["bgp"]["address_family"]["ipv4"]["unicast"]
@ -1380,7 +1352,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
else:
command = "show ipv6 route json"
rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if not bool(rib_routes_json):
@ -1417,6 +1389,7 @@ def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
return True
@retry(attempts=3, wait=2, return_is_str=True)
def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
attribute):
"""
@ -1451,7 +1424,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
errormsg(str) or True
"""
logger.info("Entering lib API: verify_best_path_as_per_admin_distance()")
logger.debug("Entering lib API: verify_best_path_as_per_admin_distance()")
router_list = tgen.routers()
if router not in router_list:
return False
@ -1490,7 +1463,7 @@ def verify_best_path_as_per_admin_distance(tgen, addr_type, router, input_dict,
compare = "LOWEST"
# Show ip route
rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if not bool(rib_routes_json):

View File

@ -23,6 +23,11 @@ from datetime import datetime
from time import sleep
from subprocess import call
from subprocess import STDOUT as SUB_STDOUT
from subprocess import PIPE as SUB_PIPE
from subprocess import Popen
from functools import wraps
from re import search as re_search
import StringIO
import os
import ConfigParser
@ -41,6 +46,7 @@ FRRCFG_FILE = "frr_json.conf"
FRRCFG_BKUP_FILE = "frr_json_initial.conf"
ERROR_LIST = ["Malformed", "Failure", "Unknown"]
ROUTER_LIST = []
####
CD = os.path.dirname(os.path.realpath(__file__))
@ -142,6 +148,35 @@ class InvalidCLIError(Exception):
pass
def run_frr_cmd(rnode, cmd, isjson=False):
"""
Execute frr show commands in priviledged mode
* `rnode`: router node on which commands needs to executed
* `cmd`: Command to be executed on frr
* `isjson`: If command is to get json data or not
:return str:
"""
if cmd:
ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
if True:
if isjson:
logger.debug(ret_data)
print_data = rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
else:
print_data = ret_data
logger.info('Output for command [ %s] on router %s:\n%s',
cmd.rstrip("json"), rnode.name, print_data)
return ret_data
else:
raise InvalidCLIError('No actual cmd passed')
def create_common_configuration(tgen, router, data, config_type=None,
build=False):
"""
@ -186,6 +221,7 @@ def create_common_configuration(tgen, router, data, config_type=None,
frr_cfg_fd.write(config_map[config_type])
for line in data:
frr_cfg_fd.write("{} \n".format(str(line)))
frr_cfg_fd.write("\n")
except IOError as err:
logger.error("Unable to open FRR Config File. error(%s): %s" %
@ -215,10 +251,13 @@ def reset_config_on_routers(tgen, routerName=None):
logger.debug("Entering API: reset_config_on_routers")
router_list = tgen.routers()
for rname, router in router_list.iteritems():
for rname in ROUTER_LIST:
if routerName and routerName != rname:
continue
router = router_list[rname]
logger.info("Configuring router %s to initial test configuration",
rname)
cfg = router.run("vtysh -c 'show running'")
fname = "{}/{}/frr.sav".format(TMPDIR, rname)
dname = "{}/{}/delta.conf".format(TMPDIR, rname)
@ -235,16 +274,35 @@ def reset_config_on_routers(tgen, routerName=None):
f.close()
command = "/usr/lib/frr/frr-reload.py --input {}/{}/frr.sav" \
" --test {}/{}/frr_json_initial.conf > {}". \
format(TMPDIR, rname, TMPDIR, rname, dname)
result = call(command, shell=True, stderr=SUB_STDOUT)
run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname)
init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname)
command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}". \
format(run_cfg_file, init_cfg_file, dname)
result = call(command, shell=True, stderr=SUB_STDOUT,
stdout=SUB_PIPE)
# Assert if command fail
if result > 0:
errormsg = ("Command:{} is failed due to non-zero exit"
" code".format(command))
return errormsg
logger.error("Delta file creation failed. Command executed %s",
command)
with open(run_cfg_file, 'r') as fd:
logger.info('Running configuration saved in %s is:\n%s',
run_cfg_file, fd.read())
with open(init_cfg_file, 'r') as fd:
logger.info('Test configuration saved in %s is:\n%s',
init_cfg_file, fd.read())
err_cmd = ['/usr/bin/vtysh', '-m', '-f', run_cfg_file]
result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE)
output = result.communicate()
for out_data in output:
temp_data = out_data.decode('utf-8').lower()
for out_err in ERROR_LIST:
if out_err.lower() in temp_data:
logger.error("Found errors while validating data in"
" %s", run_cfg_file)
raise InvalidCLIError(out_data)
raise InvalidCLIError("Unknown error in %s", output)
f = open(dname, "r")
delta = StringIO.StringIO()
@ -264,7 +322,7 @@ def reset_config_on_routers(tgen, routerName=None):
delta.write("end\n")
output = router.vtysh_multicmd(delta.getvalue(),
pretty_output=False)
logger.info("New configuration for router {}:".format(rname))
delta.close()
delta = StringIO.StringIO()
cfg = router.run("vtysh -c 'show running'")
@ -276,6 +334,8 @@ def reset_config_on_routers(tgen, routerName=None):
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
logger.info("Configuration on router {} after config reset:".
format(rname))
logger.info(delta.getvalue())
delta.close()
@ -297,34 +357,39 @@ def load_config_to_router(tgen, routerName, save_bkup=False):
logger.debug("Entering API: load_config_to_router")
router_list = tgen.routers()
for rname, router in router_list.iteritems():
if rname == routerName:
try:
frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname,
FRRCFG_BKUP_FILE)
with open(frr_cfg_file, "r") as cfg:
data = cfg.read()
if save_bkup:
with open(frr_cfg_bkup, "w") as bkup:
bkup.write(data)
for rname in ROUTER_LIST:
if routerName and routerName != rname:
continue
output = router.vtysh_multicmd(data, pretty_output=False)
for out_err in ERROR_LIST:
if out_err.lower() in output.lower():
raise InvalidCLIError("%s" % output)
except IOError as err:
errormsg = ("Unable to open config File. error(%s):"
" %s", (err.errno, err.strerror))
return errormsg
router = router_list[rname]
try:
frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE)
frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname,
FRRCFG_BKUP_FILE)
with open(frr_cfg_file, "r+") as cfg:
data = cfg.read()
logger.info("Applying following configuration on router"
" {}:\n{}".format(rname, data))
if save_bkup:
with open(frr_cfg_bkup, "w") as bkup:
bkup.write(data)
logger.info("New configuration for router {}:".format(rname))
output = router.vtysh_multicmd(data, pretty_output=False)
for out_err in ERROR_LIST:
if out_err.lower() in output.lower():
raise InvalidCLIError("%s" % output)
cfg.truncate(0)
except IOError as err:
errormsg = ("Unable to open config File. error(%s):"
" %s", (err.errno, err.strerror))
return errormsg
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
new_config = router.run("vtysh -c 'show running'")
# Router current configuration to log file or console if
# "show_router_config" is defined in "pytest.ini"
if show_router_config:
logger.info(new_config)
logger.info(new_config)
logger.debug("Exting API: load_config_to_router")
return True
@ -337,21 +402,25 @@ def start_topology(tgen):
* `tgen` : topogen object
"""
global TMPDIR
global TMPDIR, ROUTER_LIST
# Starting topology
tgen.start_topology()
# Starting deamons
router_list = tgen.routers()
ROUTER_LIST = sorted(router_list.keys(),
key=lambda x: int(re_search('\d+', x).group(0)))
TMPDIR = os.path.join(LOGDIR, tgen.modname)
for rname, router in router_list.iteritems():
router_list = tgen.routers()
for rname in ROUTER_LIST:
router = router_list[rname]
try:
os.chdir(TMPDIR)
# Creating rouer named dir and empty zebra.conf bgpd.conf files
# Creating router named dir and empty zebra.conf bgpd.conf files
# inside the current directory
if os.path.isdir('{}'.format(rname)):
os.system("rm -rf {}".format(rname))
os.mkdir('{}'.format(rname))
@ -371,13 +440,11 @@ def start_topology(tgen):
router.load_config(
TopoRouter.RD_ZEBRA,
'{}/{}/zebra.conf'.format(TMPDIR, rname)
# os.path.join(tmpdir, '{}/zebra.conf'.format(rname))
)
# Loading empty bgpd.conf file to router, to start the bgp deamon
router.load_config(
TopoRouter.RD_BGP,
'{}/{}/bgpd.conf'.format(TMPDIR, rname)
# os.path.join(tmpdir, '{}/bgpd.conf'.format(rname))
)
# Starting routers
@ -548,7 +615,7 @@ def write_test_header(tc_name):
""" Display message at beginning of test case"""
count = 20
logger.info("*"*(len(tc_name)+count))
logger.info("START -> Testcase : %s", tc_name)
logger.info("START -> Testcase : %s" % tc_name)
logger.info("*"*(len(tc_name)+count))
@ -556,10 +623,65 @@ def write_test_footer(tc_name):
""" Display message at end of test case"""
count = 21
logger.info("="*(len(tc_name)+count))
logger.info("PASSED -> Testcase : %s", tc_name)
logger.info("Testcase : %s -> PASSED", tc_name)
logger.info("="*(len(tc_name)+count))
def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0):
"""
Retries function execution, if return is an errormsg or exception
* `attempts`: Number of attempts to make
* `wait`: Number of seconds to wait between each attempt
* `return_is_str`: Return val is an errormsg in case of failure
* `initial_wait`: Sleeps for this much seconds before executing function
"""
def _retry(func):
@wraps(func)
def func_retry(*args, **kwargs):
_wait = kwargs.pop('wait', wait)
_attempts = kwargs.pop('attempts', attempts)
_attempts = int(_attempts)
if _attempts < 0:
raise ValueError("attempts must be 0 or greater")
if initial_wait > 0:
logger.info("Waiting for [%s]s as initial delay", initial_wait)
sleep(initial_wait)
_return_is_str = kwargs.pop('return_is_str', return_is_str)
for i in range(1, _attempts + 1):
try:
_expected = kwargs.setdefault('expected', True)
kwargs.pop('expected')
ret = func(*args, **kwargs)
logger.debug("Function returned %s" % ret)
if return_is_str and isinstance(ret, bool):
return ret
elif return_is_str and _expected is False:
return ret
if _attempts == i:
return ret
except Exception as err:
if _attempts == i:
logger.info("Max number of attempts (%r) reached",
_attempts)
raise
else:
logger.info("Function returned %s", err)
if i < _attempts:
logger.info("Retry [#%r] after sleeping for %ss"
% (i, _wait))
sleep(_wait)
func_retry._original = func
return func_retry
return _retry
#############################################
# These APIs, will used by testcase
#############################################
@ -589,17 +711,17 @@ def create_interfaces_cfg(tgen, topo, build=False):
interface_name = destRouterLink
else:
interface_name = data["interface"]
interface_data.append("interface {}\n".format(
interface_data.append("interface {}".format(
str(interface_name)
))
if "ipv4" in data:
intf_addr = c_data["links"][destRouterLink]["ipv4"]
interface_data.append("ip address {}\n".format(
interface_data.append("ip address {}".format(
intf_addr
))
if "ipv6" in data:
intf_addr = c_data["links"][destRouterLink]["ipv6"]
interface_data.append("ipv6 address {}\n".format(
interface_data.append("ipv6 address {}".format(
intf_addr
))
result = create_common_configuration(tgen, c_router,
@ -662,7 +784,7 @@ def create_static_routes(tgen, input_dict, build=False):
for router in input_dict.keys():
if "static_routes" not in input_dict[router]:
errormsg = "static_routes not present in input_dict"
logger.info(errormsg)
logger.debug(errormsg)
continue
static_routes_list = []
@ -768,7 +890,7 @@ def create_prefix_lists(tgen, input_dict, build=False):
for router in input_dict.keys():
if "prefix_lists" not in input_dict[router]:
errormsg = "prefix_lists not present in input_dict"
logger.info(errormsg)
logger.debug(errormsg)
continue
config_data = []
@ -922,7 +1044,7 @@ def create_route_maps(tgen, input_dict, build=False):
for router in input_dict.keys():
if "route_maps" not in input_dict[router]:
errormsg = "route_maps not present in input_dict"
logger.info(errormsg)
logger.debug(errormsg)
continue
rmap_data = []
for rmap_name, rmap_value in \
@ -1014,7 +1136,7 @@ def create_route_maps(tgen, input_dict, build=False):
# Weight
if weight:
rmap_data.append("set weight {} \n".format(
rmap_data.append("set weight {}".format(
weight))
# Adding MATCH and SET sequence to RMAP if defined
@ -1092,7 +1214,8 @@ def create_route_maps(tgen, input_dict, build=False):
#############################################
# Verification APIs
#############################################
def _verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
@retry(attempts=10, return_is_str=True, initial_wait=2)
def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
"""
Data will be read from input_dict or input JSON file, API will generate
same prefixes, which were redistributed by either create_static_routes() or
@ -1140,7 +1263,7 @@ def _verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
errormsg(str) or True
"""
logger.info("Entering lib API: verify_rib()")
logger.debug("Entering lib API: verify_rib()")
router_list = tgen.routers()
for routerInput in input_dict.keys():
@ -1160,9 +1283,8 @@ def _verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
else:
command = "show ipv6 route json"
sleep(10)
logger.info("Checking router %s RIB:", router)
rib_routes_json = rnode.vtysh_cmd(command, isjson=True)
rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary rib_routes_json is not empty
if bool(rib_routes_json) is False:
@ -1257,30 +1379,10 @@ def _verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
logger.info("Verified routes in router %s RIB, found routes"
" are: %s", dut, found_routes)
logger.info("Exiting lib API: verify_rib()")
logger.debug("Exiting lib API: verify_rib()")
return True
def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None, expected=True):
"""
Wrapper function for `_verify_rib` that tries multiple time to get results.
When the expected result is `False` we actually should expect for an string instead.
"""
# Use currying to hide the parameters and create a test function.
test_func = partial(_verify_rib, tgen, addr_type, dut, input_dict, next_hop, protocol)
# Call the test function and expect it to return True, otherwise try it again.
if expected is True:
_, result = topotest.run_and_expect(test_func, True, count=20, wait=6)
else:
_, result = topotest.run_and_expect_type(test_func, str, count=20, wait=6)
# Return as normal.
return result
def verify_admin_distance_for_static_routes(tgen, input_dict):
"""
API to verify admin distance for static routes as defined in input_dict/
@ -1311,7 +1413,7 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
errormsg(str) or True
"""
logger.info("Entering lib API: verify_admin_distance_for_static_routes()")
logger.debug("Entering lib API: verify_admin_distance_for_static_routes()")
for router in input_dict.keys():
if router not in tgen.routers():
@ -1326,7 +1428,7 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
command = "show ip route json"
else:
command = "show ipv6 route json"
show_ip_route_json = rnode.vtysh_cmd(command, isjson=True)
show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
logger.info("Verifying admin distance for static route %s"
" under dut %s:", static_route, router)
@ -1356,7 +1458,7 @@ def verify_admin_distance_for_static_routes(tgen, input_dict):
format(network, router))
return errormsg
logger.info("Exiting lib API: verify_admin_distance_for_static_routes()")
logger.debug("Exiting lib API: verify_admin_distance_for_static_routes()")
return True
@ -1384,7 +1486,7 @@ def verify_prefix_lists(tgen, input_dict):
errormsg(str) or True
"""
logger.info("Entering lib API: verify_prefix_lists()")
logger.debug("Entering lib API: verify_prefix_lists()")
for router in input_dict.keys():
if router not in tgen.routers():
@ -1393,7 +1495,7 @@ def verify_prefix_lists(tgen, input_dict):
rnode = tgen.routers()[router]
# Show ip prefix list
show_prefix_list = rnode.vtysh_cmd("show ip prefix-list")
show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
# Verify Prefix list is deleted
prefix_lists_addr = input_dict[router]["prefix_lists"]
@ -1403,12 +1505,12 @@ def verify_prefix_lists(tgen, input_dict):
for prefix_list in prefix_lists_addr[addr_type].keys():
if prefix_list in show_prefix_list:
errormsg = ("Prefix list {} is not deleted from router"
errormsg = ("Prefix list {} is/are present in the router"
" {}".format(prefix_list, router))
return errormsg
logger.info("Prefix list %s is/are deleted successfully"
logger.info("Prefix list %s is/are not present in the router"
" from router %s", prefix_list, router)
logger.info("Exiting lib API: verify_prefix_lissts()")
logger.debug("Exiting lib API: verify_prefix_lissts()")
return True

View File

@ -20,6 +20,7 @@
from collections import OrderedDict
from json import dumps as json_dumps
from re import search as re_search
import ipaddr
import pytest
@ -38,6 +39,9 @@ from lib.common_config import (
from lib.bgp import create_router_bgp
ROUTER_LIST = []
def build_topo_from_json(tgen, topo):
"""
Reads configuration from JSON file. Adds routers, creates interface
@ -48,13 +52,15 @@ def build_topo_from_json(tgen, topo):
* `topo`: json file data
"""
listRouters = []
for routerN in sorted(topo['routers'].iteritems()):
logger.info('Topo: Add router {}'.format(routerN[0]))
tgen.add_router(routerN[0])
listRouters.append(routerN[0])
ROUTER_LIST = sorted(topo['routers'].keys(),
key=lambda x: int(re_search('\d+', x).group(0)))
listRouters = ROUTER_LIST[:]
for routerN in ROUTER_LIST:
logger.info('Topo: Add router {}'.format(routerN))
tgen.add_router(routerN)
listRouters.append(routerN)
listRouters.sort()
if 'ipv4base' in topo:
ipv4Next = ipaddr.IPv4Address(topo['link_ip_start']['ipv4'])
ipv4Step = 2 ** (32 - topo['link_ip_start']['v4mask'])
@ -78,7 +84,7 @@ def build_topo_from_json(tgen, topo):
elif 'link' in x:
return int(x.split('-link')[1])
else:
return int(x.split('r')[1])
return int(re_search('\d+', x).group(0))
for destRouterLink, data in sorted(topo['routers'][curRouter]['links']. \
iteritems(),
key=lambda x: link_sort(x[0])):
@ -179,12 +185,13 @@ def build_config_from_json(tgen, topo, save_bkup=True):
data = topo["routers"]
for func_type in func_dict.keys():
logger.info('Building configuration for {}'.format(func_type))
logger.info('Checking for {} configuration in input data'.format(
func_type))
func_dict.get(func_type)(tgen, data, build=True)
for router in sorted(topo['routers'].keys()):
logger.info('Configuring router {}...'.format(router))
logger.debug('Configuring router {}...'.format(router))
result = load_config_to_router(tgen, router, save_bkup)
if not result:

View File

@ -15,7 +15,7 @@ norecursedirs = .git example-test lib docker
# Display router current configuration during test execution,
# by default configuration will not be shown
show_router_config = True
# show_router_config = True
# Default daemons binaries path.
#frrdir = /usr/lib/frr