mirror_frr/tests/topotests/lib/ospf.py
nguggarigoud b29a56b3ae tests: removing initial wait time in ospf.py
1. Removed initial_wait in ospf library.
2. Removed one test case which was random
   failure, will add back after traiging.

Signed-off-by: naveen <nguggarigoud@vmware.com>
2020-09-18 15:13:46 +05:30

1183 lines
44 KiB
Python

#
# Copyright (c) 2020 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
from copy import deepcopy
import traceback
from time import sleep
from lib.topolog import logger
import ipaddr
# Import common_config to use commomnly used APIs
from lib.common_config import (create_common_configuration,
InvalidCLIError, retry,
generate_ips,
check_address_types,
validate_ip_address,
run_frr_cmd)
LOGDIR = "/tmp/topotests/"
TMPDIR = None
################################
# Configure procs
################################
def create_router_ospf(
tgen, topo, input_dict=None, build=False,
load_config=True):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
input_dict = {
"r1": {
"ospf": {
"router_id": "22.22.22.22",
"area": [{ "id":0.0.0.0, "type": "nssa"}]
}
}
result = create_router_ospf(tgen, topo, input_dict)
Returns
-------
True or False
"""
logger.debug("Entering lib API: create_router_ospf()")
result = False
if not input_dict:
input_dict = deepcopy(topo)
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
for router in input_dict.keys():
if "ospf" not in input_dict[router]:
logger.debug("Router %s: 'ospf' not present in input_dict", router)
continue
result = __create_ospf_global(
tgen, input_dict, router, build, load_config)
if result is True:
ospf_data = input_dict[router]["ospf"]
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(
tgen, input_dict, router, build=False,
load_config=True):
"""
Helper API to create ospf global configuration.
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `router` : router to be configured.
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Returns
-------
True or False
"""
result = False
logger.debug("Entering lib API: __create_ospf_global()")
try:
ospf_data = input_dict[router]["ospf"]
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
config_data = ["no router ospf"]
result = create_common_configuration(tgen, router, config_data,
"ospf", build,
load_config)
return result
config_data = []
cmd = "router ospf"
config_data.append(cmd)
# router id
router_id = ospf_data.setdefault("router_id", None)
del_router_id = ospf_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no ospf router-id")
if router_id:
config_data.append("ospf router-id {}".format(
router_id))
# redistribute command
redistribute_data = ospf_data.setdefault("redistribute", {})
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
logger.debug("Router %s: 'redist_type' not present in "
"input_dict", router)
else:
cmd = "redistribute {}".format(
redistribute["redist_type"])
for red_type in redistribute_data:
if "route_map" in red_type:
cmd = cmd + " route-map {}".format(red_type[
'route_map'])
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
#area information
area_data = ospf_data.setdefault("area", {})
if area_data:
for area in area_data:
if "id" not in area:
logger.debug("Router %s: 'area id' not present in "
"input_dict", router)
else:
cmd = "area {}".format(area["id"])
if "type" in area:
cmd = cmd + " {}".format(area["type"])
del_action = area.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
result = create_common_configuration(tgen, router, config_data,
"ospf", build, load_config)
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
if summary_data:
for summary in summary_data:
if "prefix" not in summary:
logger.debug("Router %s: 'summary-address' not present in "
"input_dict", router)
else:
cmd = "summary {}/{}".format(summary["prefix"], summary[
"mask"])
_tag = summary.setdefault("tag", None)
if _tag:
cmd = "{} tag {}".format(cmd, _tag)
_advertise = summary.setdefault("advertise", True)
if not _advertise:
cmd = "{} no-advertise".format(cmd)
del_action = summary.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
result = create_common_configuration(tgen, router, config_data,
"ospf", build, load_config)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.debug("Exiting lib API: create_ospf_global()")
return result
def create_router_ospf6(
tgen, topo, input_dict=None, build=False,
load_config=True):
"""
API to configure ospf on router
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
Usage
-----
input_dict = {
"r1": {
"ospf6": {
"router_id": "22.22.22.22",
}
}
Returns
-------
True or False
"""
logger.debug("Entering lib API: create_router_ospf()")
result = False
if not input_dict:
input_dict = deepcopy(topo)
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
for router in input_dict.keys():
if "ospf" not in input_dict[router]:
logger.debug("Router %s: 'ospf' not present in input_dict", router)
continue
result = __create_ospf_global(
tgen, input_dict, router, build, load_config)
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf6_global(
tgen, input_dict, router, build=False,
load_config=True):
"""
Helper API to create ospf global configuration.
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `router` : router id to be configured.
* `build` : Only for initial setup phase this is set as True.
Returns
-------
True or False
"""
result = False
logger.debug("Entering lib API: __create_ospf_global()")
try:
ospf_data = input_dict[router]["ospf6"]
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
config_data = ["no ipv6 router ospf"]
result = create_common_configuration(tgen, router, config_data,
"ospf", build,
load_config)
return result
config_data = []
cmd = "router ospf"
config_data.append(cmd)
router_id = ospf_data.setdefault("router_id", None)
del_router_id = ospf_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no ospf router-id")
if router_id:
config_data.append("ospf router-id {}".format(
router_id))
result = create_common_configuration(tgen, router, config_data,
"ospf", build, load_config)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
logger.debug("Exiting lib API: create_ospf_global()")
return result
def config_ospf_interface (tgen, topo, input_dict=None, build=False,
load_config=True):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
r1_ospf_auth = {
"r1": {
"links": {
"r2": {
"ospf": {
"authentication": 'message-digest',
"authentication-key": "ospf",
"message-digest-key": "10"
}
}
}
}
}
result = config_ospf_interface(tgen, topo, r1_ospf_auth)
Returns
-------
True or False
"""
logger.debug("Enter lib config_ospf_interface")
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]['links'].keys():
if "ospf" not in input_dict[router]['links'][lnk]:
logger.debug("Router %s: ospf configs is not present in"
"input_dict, passed input_dict", router,
input_dict)
continue
ospf_data = input_dict[router]['links'][lnk]['ospf']
data_ospf_area = ospf_data.setdefault("area", None)
data_ospf_auth = ospf_data.setdefault("authentication", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
try:
intf = topo['routers'][router]['links'][lnk]['interface']
except KeyError:
intf = topo['switches'][router]['links'][lnk]['interface']
# interface
cmd = "interface {}".format(intf)
config_data.append(cmd)
# interface area config
if data_ospf_area:
cmd = "ip ospf area {}".format(data_ospf_area)
config_data.append(cmd)
# interface ospf auth
if data_ospf_auth:
if data_ospf_auth == 'null':
cmd = "ip ospf authentication null"
elif data_ospf_auth == 'message-digest':
cmd = "ip ospf authentication message-digest"
else:
cmd = "ip ospf authentication"
if 'del_action' in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "message-digest-key" in ospf_data:
cmd = "ip ospf message-digest-key {} md5 {}".format(
ospf_data["message-digest-key"],ospf_data[
"authentication-key"])
if 'del_action' in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "authentication-key" in ospf_data and \
"message-digest-key" not in ospf_data:
cmd = "ip ospf authentication-key {}".format(ospf_data[
"authentication-key"])
if 'del_action' in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf dr priority
if data_ospf_dr_priority in ospf_data:
cmd = "ip ospf priority {}".format(
ospf_data["priority"])
if 'del_action' in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost in ospf_data:
cmd = "ip ospf cost {}".format(
ospf_data["cost"])
if 'del_action' in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
else:
result = create_common_configuration(tgen, router, config_data,
"interface_config",
build=build)
logger.debug("Exiting lib API: create_igmp_config()")
return result
def clear_ospf(tgen, router):
"""
This API is to clear ospf neighborship by running
clear ip ospf interface * command,
Parameters
----------
* `tgen`: topogen object
* `router`: device under test
Usage
-----
clear_ospf(tgen, "r1")
"""
logger.debug("Entering lib API: clear_ospf()")
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
# Clearing OSPF
logger.info("Clearing ospf process for router %s..", router)
run_frr_cmd(rnode, "clear ip ospf interface ")
logger.debug("Exiting lib API: clear_ospf()")
################################
# Verification procs
################################
@retry(attempts=40, wait=2, return_is_str=True)
def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
"""
This API is to verify ospf neighborship by running
show ip ospf neighbour command,
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `lan` : verify neighbors in lan topology
Usage
-----
1. To check FULL neighbors.
verify_ospf_neighbor(tgen, topo, dut=dut)
2. To check neighbors with their roles.
input_dict = {
"r0": {
"ospf": {
"neighbors": {
"r1": {
"state": "Full",
"role": "DR"
},
"r2": {
"state": "Full",
"role": "DROther"
},
"r3": {
"state": "Full",
"role": "DROther"
}
}
}
}
}
result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_neighbor()")
result = False
if input_dict:
for router, rnode in tgen.routers().iteritems():
if 'ospf' not in topo['routers'][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(rnode,
"show ip ospf neighbor all json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = input_dict[router]["ospf"]
ospf_nbr_list = ospf_data_list['neighbors']
for ospf_nbr, nbr_data in ospf_nbr_list.items():
data_ip = topo['routers'][ospf_nbr]['links']
data_rid = topo['routers'][ospf_nbr]['ospf']['router_id']
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo['switches']:
if 'ospf' in topo['switches'][switch]['links'][router]:
neighbor_ip = data_ip[switch]['ipv4'].split("/")[0]
else:
continue
else:
neighbor_ip = data_ip[router]['ipv4'].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
nh_state = show_ospf_json[nbr_rid][0][
'state'].split('/')[0]
intf_state = show_ospf_json[nbr_rid][0][
'state'].split('/')[1]
except KeyError:
errormsg = "[DUT: {}] OSPF peer {} missing".format(router,
nbr_rid)
return errormsg
nbr_state = nbr_data.setdefault("state",None)
nbr_role = nbr_data.setdefault("role",None)
if nbr_state:
if nbr_state == nh_state:
logger.info("[DUT: {}] OSPF Nbr is {}:{} State {}".format
(router, ospf_nbr, nbr_rid, nh_state))
result = True
else:
errormsg = ("[DUT: {}] OSPF is not Converged, neighbor"
" state is {}".format(router, nh_state))
return errormsg
if nbr_role:
if nbr_role == intf_state:
logger.info("[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
router, ospf_nbr, nbr_rid, nbr_role))
else:
errormsg = ("[DUT: {}] OSPF is not Converged with rid"
"{}, role is {}".format(router, nbr_rid, intf_state))
return errormsg
continue
else:
for router, rnode in tgen.routers().iteritems():
if 'ospf' not in topo['routers'][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(rnode,
"show ip ospf neighbor all json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf"]
ospf_neighbors = ospf_data_list['neighbors']
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
ospf_nbr_list = ospf_data_list['neighbors']
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
if nbr_data:
data_ip = topo['routers'][nbr_data["nbr"]]['links']
data_rid = topo['routers'][nbr_data["nbr"]][
'ospf']['router_id']
else:
data_ip = topo['routers'][ospf_nbr]['links']
data_rid = topo['routers'][ospf_nbr]['ospf']['router_id']
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo['switches']:
if 'ospf' in topo['switches'][switch]['links'][router]:
neighbor_ip = data_ip[switch]['ipv4'].split("/")[0]
else:
continue
else:
neighbor_ip = data_ip[router]['ipv4'].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
nh_state = show_ospf_json[nbr_rid][0][
'state'].split('/')[0]
except KeyError:
errormsg = "[DUT: {}] OSPF peer {} missing,from "\
"{} ".format(router,
nbr_rid, ospf_nbr)
return errormsg
if nh_state == 'Full':
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF is Converged".format(router))
result = True
else:
errormsg = ("[DUT: {}] OSPF is not Converged".format(router))
return errormsg
logger.debug("Exiting API: verify_ospf_neighbor()")
return result
@retry(attempts=21, wait=2, return_is_str=True)
def verify_ospf_rib(tgen, dut, input_dict, next_hop=None,
tag=None, metric=None, fib=None):
"""
This API is to verify ospf routes by running
show ip ospf route command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `next_hop` : next to be verified
* `tag` : tag to be verified
* `metric` : metric to be verified
* `fib` : True if the route is installed in FIB.
Usage
-----
input_dict = {
"r1": {
"static_routes": [
{
"network": ip_net,
"no_of_ip": 1,
"routeType": "N"
}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
Returns
-------
True or False (Error Message)
"""
logger.info("Entering lib API: verify_ospf_rib()")
result = False
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.iteritems():
if router != dut:
continue
logger.info("Checking router %s RIB:", router)
# Verifying RIB routes
command = "show ip ospf route"
found_routes = []
missing_routes = []
if "static_routes" in input_dict[routerInput] or \
"prefix" in input_dict[routerInput]:
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
errormsg = "[DUT: {}] No routes found in OSPF route " \
"table".format(router)
return errormsg
network = static_route["network"]
no_of_ip = static_route.setdefault("no_of_ip", 1)
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
st_found = False
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != 'ipv4':
continue
if st_rt in ospf_rib_json:
st_found = True
found_routes.append(st_rt)
if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
if 'fib' in ospf_rib_json[st_rt][
mnh]["nexthops"][0]:
found_hops.append([rib_r[
"ip"] for rib_r in ospf_rib_json[
st_rt][mnh]["nexthops"]])
if found_hops[0]:
missing_list_of_nexthops = \
set(found_hops[0]).difference(next_hop)
additional_nexthops_in_required_nhs = \
set(next_hop).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
"Nexthop "
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt, dut)
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
additional_nexthops_in_required_nhs,
st_rt, dut))
return errormsg
else:
nh_found = True
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
found_hops = [rib_r["ip"] for rib_r in
ospf_rib_json[st_rt][
"nexthops"]]
if found_hops:
missing_list_of_nexthops = \
set(found_hops).difference(next_hop)
additional_nexthops_in_required_nhs = \
set(next_hop).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
"Missing nexthop %s for route"\
" %s in RIB of router %s\n", \
additional_nexthops_in_required_nhs, \
st_rt, dut)
errormsg=("Nexthop {} is Missing for "\
"route {} in RIB of router {}\n".format(
additional_nexthops_in_required_nhs,
st_rt, dut))
return errormsg
else:
nh_found = True
if _rtype:
if "routeType" not in ospf_rib_json[
st_rt]:
errormsg = ("[DUT: {}]: routeType missing"
"for route {} in OSPF RIB \n".\
format(dut, st_rt))
return errormsg
elif _rtype != ospf_rib_json[st_rt][
"routeType"]:
errormsg = ("[DUT: {}]: routeType mismatch"
"for route {} in OSPF RIB \n".\
format(dut, st_rt))
return errormsg
else:
logger.info("DUT: {}]: Found routeType {}"
"for route {}".\
format(dut, _rtype, st_rt))
if tag:
if "tag" not in ospf_rib_json[
st_rt]:
errormsg = ("[DUT: {}]: tag is not"
" present for"
" route {} in RIB \n".\
format(dut, st_rt
))
return errormsg
if _tag != ospf_rib_json[
st_rt]["tag"]:
errormsg = ("[DUT: {}]: tag value {}"
" is not matched for"
" route {} in RIB \n".\
format(dut, _tag, st_rt,
))
return errormsg
if metric is not None:
if "type2cost" not in ospf_rib_json[
st_rt]:
errormsg = ("[DUT: {}]: metric is"
" not present for"
" route {} in RIB \n".\
format(dut, st_rt))
return errormsg
if metric != ospf_rib_json[
st_rt]["type2cost"]:
errormsg = ("[DUT: {}]: metric value "
"{} is not matched for "
"route {} in RIB \n".\
format(dut, metric, st_rt,
))
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info("[DUT: {}]: Found next_hop {} for all OSPF"
" routes in RIB".format(router, next_hop))
if len(missing_routes) > 0:
errormsg = ("[DUT: {}]: Missing route in RIB, "
"routes: {}".\
format(dut, missing_routes))
return errormsg
if found_routes:
logger.info("[DUT: %s]: Verified routes in RIB, found"
" routes are: %s\n", dut, found_routes)
result = True
logger.info("Exiting lib API: verify_ospf_rib()")
return result
@retry(attempts=10, wait=2, return_is_str=True)
def verify_ospf_interface(tgen, topo, dut=None,lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `lan`: if set to true this interface belongs to LAN.
* `input_dict` : Input dict data, required when configuring from testcase
Usage
-----
input_dict= {
'r0': {
'links':{
's1': {
'ospf':{
'priority':98,
'timerDeadSecs': 4,
'area': '0.0.0.3',
'mcastMemberOspfDesignatedRouters': True,
'mcastMemberOspfAllRouters': True,
'ospfEnabled': True,
}
}
}
}
}
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
for router, rnode in tgen.routers().iteritems():
if 'ospf' not in topo['routers'][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json",
isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
intf = topo['routers'][router]['links'][ospf_intf]['interface']
if intf in show_ospf_json['interfaces']:
for intf_attribute in intf_data['ospf']:
if intf_data['ospf'][intf_attribute] == show_ospf_json[
'interfaces'][intf][intf_attribute]:
logger.info("[DUT: %s] OSPF interface %s: %s is %s",
router, intf, intf_attribute, intf_data['ospf'][
intf_attribute])
else:
errormsg= "[DUT: {}] OSPF interface {}: {} is {}, \
Expected is {}".format(router, intf, intf_attribute,
intf_data['ospf'][intf_attribute], show_ospf_json[
'interfaces'][intf][intf_attribute])
return errormsg
result = True
logger.debug("Exiting API: verify_ospf_interface()")
return result
@retry(attempts=11, wait=2, return_is_str=True)
def verify_ospf_database(tgen, topo, dut, input_dict):
"""
This API is to verify ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"Router Link States": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"Net Link States": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = verify_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: verify_ospf_database()")
if 'ospf' not in topo['routers'][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json",
isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault(
'AS External Link States', None)
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if ospf_area in show_ospf_json['areas']:
if 'Router Link States' in area_lsa:
for lsa in area_lsa['Router Link States']:
if lsa in show_ospf_json['areas'][ospf_area][
'Router Link States']:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router "
"LSA %s", router, ospf_area, lsa)
result = True
else:
errormsg = \
"[DUT: {}] OSPF LSDB area {}: expected" \
" Router LSA is {}".format(router, ospf_area, lsa)
return errormsg
if 'Net Link States' in area_lsa:
for lsa in area_lsa['Net Link States']:
if lsa in show_ospf_json['areas'][ospf_area][
'Net Link States']:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network "
"LSA %s", router, ospf_area, lsa)
result = True
else:
errormsg = \
"[DUT: {}] OSPF LSDB area {}: expected" \
" Network LSA is {}".format(router, ospf_area, lsa)
return errormsg
if 'Summary Link States' in area_lsa:
for lsa in area_lsa['Summary Link States']:
if lsa in show_ospf_json['areas'][ospf_area][
'Summary Link States']:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary "
"LSA %s", router, ospf_area, lsa)
result = True
else:
errormsg = \
"[DUT: {}] OSPF LSDB area {}: expected" \
" Summary LSA is {}".format(router, ospf_area, lsa)
return errormsg
if 'ASBR-Summary Link States' in area_lsa:
for lsa in area_lsa['ASBR-Summary Link States']:
if lsa in show_ospf_json['areas'][ospf_area][
'ASBR-Summary Link States']:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary "
"LSA %s", router, ospf_area, lsa)
result = True
else:
errormsg = \
"[DUT: {}] OSPF LSDB area {}: expected" \
" ASBR Summary LSA is {}".format(
router, ospf_area, lsa)
return errormsg
if ospf_external_lsa:
for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
if ospf_ext_lsa in show_ospf_json['AS External Link States']:
logger.info(
"[DUT: %s] OSPF LSDB:External LSA %s",
router, ospf_ext_lsa)
result = True
else:
errormsg = \
"[DUT: {}] OSPF LSDB : expected" \
" External LSA is {}".format(router, ospf_ext_lsa)
return errormsg
logger.debug("Exiting API: verify_ospf_database()")
return result
@retry(attempts=10, wait=2, return_is_str=True)
def verify_ospf_summary(tgen, topo, dut, input_dict):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
Usage
-----
input_dict = {
"11.0.0.0/8": {
"Summary address": "11.0.0.0/8",
"Metric-type": "E2",
"Metric": 20,
"Tag": 0,
"External route count": 5
}
}
result = verify_ospf_summary(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_summary()")
result = False
router = dut
logger.info("Verifying OSPF summary on router %s:", router)
if 'ospf' not in topo['routers'][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(
router)
return errormsg
rnode = tgen.routers()[dut]
show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json",
isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# To find neighbor ip type
ospf_summary_data = input_dict
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
summary = ospf_summary_data[ospf_summ]['Summary address']
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
logger.info("[DUT: %s] OSPF summary %s:%s is %s",
router, summary, summ, summ_data[summ])
result = True
else:
errormsg = ("[DUT: {}] OSPF summary {}:{} is %s, "
"Expected is {}".format(router,summary, summ,
show_ospf_json[summary][summ]))
return errormsg
logger.debug("Exiting API: verify_ospf_summary()")
return result