mirror_frr/tests/topotests/lib/ospf.py
Manoj Naragund 044561b1bb tests: Added ospfv2 flood reduction topotest changes.
Have added topotest to verify below scenarios.

1. Verify OSPF Flood reduction functionality with ospf enabled on process level.
2. Verify OSPF Flood reduction functionality with ospf enabled on area level.
3. Verify OSPF Flood reduction functionality between different area's

Have sussessfully tested these in my local setup

Signed-off-by: nguggarigoud <nguggarigoud@vmware.com>
2023-02-13 04:36:29 -08:00

3035 lines
115 KiB
Python

#
# Copyright (c) 2020 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
import ipaddress
import sys
from copy import deepcopy
from time import sleep
# Import common_config to use commomnly used APIs
from lib.common_config import (
create_common_configurations,
InvalidCLIError,
generate_ips,
retry,
run_frr_cmd,
validate_ip_address,
)
from lib.topolog import logger
from lib.topotest import frr_unicode
################################
# Configure procs
################################
def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
input_dict = {
"r1": {
"ospf": {
"router_id": "22.22.22.22",
"area": [{ "id": "0.0.0.0", "type": "nssa"}]
}
}
result = create_router_ospf(tgen, topo, input_dict)
Returns
-------
True or False
"""
logger.debug("Entering lib API: create_router_ospf()")
result = False
if topo is None:
topo = tgen.json_topo
if not input_dict:
input_dict = deepcopy(topo)
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
for ospf in ["ospf", "ospf6"]:
config_data_dict = {}
for router in input_dict.keys():
if ospf not in input_dict[router]:
logger.debug("Router %s: %s not present in input_dict", router, ospf)
continue
config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, ospf
)
if config_data:
if router not in config_data_dict:
config_data_dict[router] = config_data
else:
config_data_dict[router].extend(config_data)
try:
result = create_common_configurations(
tgen, config_data_dict, ospf, build, load_config
)
except InvalidCLIError:
logger.error("create_router_ospf (ipv4)", exc_info=True)
result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
"""
Helper API to create ospf global configuration.
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `router` : router to be configured.
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
* `ospf` : either 'ospf' or 'ospf6'
Usage
-----
input_dict = {
"routers": {
"r1": {
"links": {
"r3": {
"ipv6": "2013:13::1/64",
"ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
}
},
"ospf6": {
"router_id": "1.1.1.1",
"neighbors": {
"r3": {
"area": "1.1.1.1"
}
}
}
}
}
Returns
-------
list of configuration commands
"""
config_data = []
if ospf not in input_dict[router]:
return config_data
logger.debug("Entering lib API: __create_ospf_global()")
ospf_data = input_dict[router][ospf]
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
config_data = ["no router {}".format(ospf)]
return config_data
cmd = "router {}".format(ospf)
config_data.append(cmd)
# router id
router_id = ospf_data.setdefault("router_id", None)
del_router_id = ospf_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no {} router-id".format(ospf))
if router_id:
config_data.append("{} router-id {}".format(ospf, router_id))
# log-adjacency-changes
log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
if del_log_adj_changes:
config_data.append("no log-adjacency-changes detail")
if log_adj_changes:
config_data.append("log-adjacency-changes {}".format(log_adj_changes))
# aggregation timer
aggr_timer = ospf_data.setdefault("aggr_timer", None)
del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
if del_aggr_timer:
config_data.append("no aggregation timer")
if aggr_timer:
config_data.append("aggregation timer {}".format(aggr_timer))
# maximum path information
ecmp_data = ospf_data.setdefault("maximum-paths", {})
if ecmp_data:
cmd = "maximum-paths {}".format(ecmp_data)
del_action = ospf_data.setdefault("del_max_path", False)
if del_action:
cmd = "no maximum-paths"
config_data.append(cmd)
# Flood reduction.
flood_data = ospf_data.setdefault("flood-reduction", {})
if flood_data:
cmd = "flood-reduction"
del_action = ospf_data.setdefault("del_flood_reduction", False)
if del_action:
cmd = "no flood-reduction"
config_data.append(cmd)
# LSA refresh timer - A hidden command.
refresh_data = ospf_data.setdefault("lsa-refresh", {})
if refresh_data:
cmd = "ospf lsa-refresh {}".format(refresh_data)
del_action = ospf_data.setdefault("del_lsa_refresh", False)
if del_action:
cmd = "no ospf lsa-refresh"
config_data.append(cmd)
# redistribute command
redistribute_data = ospf_data.setdefault("redistribute", {})
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
logger.debug(
"Router %s: 'redist_type' not present in " "input_dict", router
)
else:
cmd = "redistribute {}".format(redistribute["redist_type"])
for red_type in redistribute_data:
if "route_map" in red_type:
cmd = cmd + " route-map {}".format(red_type["route_map"])
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# area information
area_data = ospf_data.setdefault("area", {})
if area_data:
for area in area_data:
if "id" not in area:
logger.debug(
"Router %s: 'area id' not present in " "input_dict", router
)
else:
cmd = "area {}".format(area["id"])
if "type" in area:
cmd = cmd + " {}".format(area["type"])
if "flood-reduction" in area:
cmd = cmd + " flood-reduction"
del_action = area.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# def route information
def_rte_data = ospf_data.setdefault("default-information", {})
if def_rte_data:
if "originate" not in def_rte_data:
logger.debug(
"Router %s: 'originate key' not present in " "input_dict", router
)
else:
cmd = "default-information originate"
if "always" in def_rte_data:
cmd = cmd + " always"
if "metric" in def_rte_data:
cmd = cmd + " metric {}".format(def_rte_data["metric"])
if "metric-type" in def_rte_data:
cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
if "route-map" in def_rte_data:
cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
del_action = def_rte_data.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
if summary_data:
for summary in summary_data:
if "prefix" not in summary:
logger.debug(
"Router %s: 'summary-address' not present in " "input_dict",
router,
)
else:
cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
_tag = summary.setdefault("tag", None)
if _tag:
cmd = "{} tag {}".format(cmd, _tag)
_advertise = summary.setdefault("advertise", True)
if not _advertise:
cmd = "{} no-advertise".format(cmd)
del_action = summary.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# ospf gr information
gr_data = ospf_data.setdefault("graceful-restart", {})
if gr_data:
if "opaque" in gr_data and gr_data["opaque"]:
cmd = "capability opaque"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "helper enable" in gr_data and not gr_data["helper enable"]:
cmd = "graceful-restart helper enable"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list:
for rtrs in gr_data["helper enable"]:
cmd = "graceful-restart helper enable {}".format(rtrs)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "helper" in gr_data:
if type(gr_data["helper"]) is not list:
gr_data["helper"] = list(gr_data["helper"])
for helper_role in gr_data["helper"]:
cmd = "graceful-restart helper {}".format(helper_role)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "supported-grace-time" in gr_data:
cmd = "graceful-restart helper supported-grace-time {}".format(
gr_data["supported-grace-time"]
)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
logger.debug("Exiting lib API: create_ospf_global()")
return config_data
def config_ospf_interface(
tgen, topo=None, input_dict=None, build=False, load_config=True
):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
r1_ospf_auth = {
"r1": {
"links": {
"r2": {
"ospf": {
"authentication": "message-digest",
"authentication-key": "ospf",
"message-digest-key": "10"
}
}
}
}
}
result = config_ospf_interface(tgen, topo, r1_ospf_auth)
Returns
-------
True or False
"""
logger.debug("Enter lib config_ospf_interface")
result = False
if topo is None:
topo = tgen.json_topo
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
config_data_dict = {}
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
if "ospf" not in input_dict[router]["links"][lnk]:
logger.debug(
"Router %s: ospf config is not present in" "input_dict", router
)
continue
ospf_data = input_dict[router]["links"][lnk]["ospf"]
data_ospf_area = ospf_data.setdefault("area", None)
data_ospf_auth = ospf_data.setdefault("authentication", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
try:
intf = topo["routers"][router]["links"][lnk]["interface"]
except KeyError:
intf = topo["switches"][router]["links"][lnk]["interface"]
# interface
cmd = "interface {}".format(intf)
config_data.append(cmd)
# interface area config
if data_ospf_area:
cmd = "ip ospf area {}".format(data_ospf_area)
config_data.append(cmd)
# interface ospf auth
if data_ospf_auth:
if data_ospf_auth == "null":
cmd = "ip ospf authentication null"
elif data_ospf_auth == "message-digest":
cmd = "ip ospf authentication message-digest"
else:
cmd = "ip ospf authentication"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "message-digest-key" in ospf_data:
cmd = "ip ospf message-digest-key {} md5 {}".format(
ospf_data["message-digest-key"], ospf_data["authentication-key"]
)
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if (
"authentication-key" in ospf_data
and "message-digest-key" not in ospf_data
):
cmd = "ip ospf authentication-key {}".format(
ospf_data["authentication-key"]
)
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf dr priority
if data_ospf_dr_priority:
cmd = "ip ospf priority {}".format(ospf_data["priority"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost:
cmd = "ip ospf cost {}".format(ospf_data["cost"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf mtu
if data_ospf_mtu:
cmd = "ip ospf mtu-ignore"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
if config_data:
config_data_dict[router] = config_data
result = create_common_configurations(
tgen, config_data_dict, "interface_config", build=build
)
logger.debug("Exiting lib API: config_ospf_interface()")
return result
def clear_ospf(tgen, router, ospf=None):
"""
This API is to clear ospf neighborship by running
clear ip ospf interface * command,
Parameters
----------
* `tgen`: topogen object
* `router`: device under test
Usage
-----
clear_ospf(tgen, "r1")
"""
logger.debug("Entering lib API: clear_ospf()")
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
# Clearing OSPF
if ospf:
version = "ipv6"
else:
version = "ip"
cmd = "clear {} ospf interface".format(version)
logger.info("Clearing ospf process on router %s.. using command '%s'", router, cmd)
run_frr_cmd(rnode, cmd)
logger.debug("Exiting lib API: clear_ospf()")
def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
"""
Redstribution of routes inside ospf.
Parameters
----------
* `tgen`: Topogen object
* `topo` : json file data
* `dut`: device under test
* `route_type`: "static" or "connected" or ....
* `kwargs`: pass extra information (see below)
Usage
-----
redistribute_ospf(tgen, topo, "r0", "static", delete=True)
redistribute_ospf(tgen, topo, "r0", "static", route_map="rmap_ipv4")
"""
ospf_red = {dut: {"ospf": {"redistribute": [{"redist_type": route_type}]}}}
for k, v in kwargs.items():
ospf_red[dut]["ospf"]["redistribute"][0][k] = v
result = create_router_ospf(tgen, topo, ospf_red)
assert result is True, "Testcase : Failed \n Error: {}".format(result)
################################
# Verification procs
################################
@retry(retry_timeout=80)
def verify_ospf_neighbor(
tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
):
"""
This API is to verify ospf neighborship by running
show ip ospf neighbour command,
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `lan` : verify neighbors in lan topology
* `expected` : expected results from API, by-default True
Usage
-----
1. To check FULL neighbors.
verify_ospf_neighbor(tgen, topo, dut=dut)
2. To check neighbors with their roles.
input_dict = {
"r0": {
"ospf": {
"neighbors": {
"r1": {
"state": "Full",
"role": "DR"
},
"r2": {
"state": "Full",
"role": "DROther"
},
"r3": {
"state": "Full",
"role": "DROther"
}
}
}
}
}
result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_neighbor()")
result = False
if topo is None:
topo = tgen.json_topo
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf neighbor all json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = input_dict[router]["ospf"]
ospf_nbr_list = ospf_data_list["neighbors"]
for ospf_nbr, nbr_data in ospf_nbr_list.items():
data_ip = topo["routers"][ospf_nbr]["links"]
data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
else:
continue
else:
neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0]
intf_state = show_ospf_json[nbr_rid][0]["state"].split("/")[1]
except KeyError:
errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid)
return errormsg
nbr_state = nbr_data.setdefault("state", None)
nbr_role = nbr_data.setdefault("role", None)
if nbr_state:
if nbr_state == nh_state:
logger.info(
"[DUT: {}] OSPF Nbr is {}:{} State {}".format(
router, ospf_nbr, nbr_rid, nh_state
)
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF is not Converged, neighbor"
" state is {}".format(router, nh_state)
)
return errormsg
if nbr_role:
if nbr_role == intf_state:
logger.info(
"[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
router, ospf_nbr, nbr_rid, nbr_role
)
)
else:
errormsg = (
"[DUT: {}] OSPF is not Converged with rid"
"{}, role is {}".format(router, nbr_rid, intf_state)
)
return errormsg
continue
else:
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf neighbor all json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf"]
ospf_neighbors = ospf_data_list["neighbors"]
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
ospf_nbr_list = ospf_data_list["neighbors"]
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
if nbr_data:
data_ip = topo["routers"][nbr_data["nbr"]]["links"]
data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"]
else:
data_ip = topo["routers"][ospf_nbr]["links"]
data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
else:
continue
else:
neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0]
except KeyError:
errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
router, nbr_rid, ospf_nbr
)
return errormsg
if nh_state == "Full":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF is Converged".format(router))
result = True
else:
errormsg = "[DUT: {}] OSPF is not Converged".format(router)
return errormsg
logger.debug("Exiting API: verify_ospf_neighbor()")
return result
@retry(retry_timeout=50)
def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
"""
This API is to verify ospf neighborship by running
show ipv6 ospf neighbour command,
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `lan` : verify neighbors in lan topology
Usage
-----
1. To check FULL neighbors.
verify_ospf_neighbor(tgen, topo, dut=dut)
2. To check neighbors with their roles.
input_dict = {
"r0": {
"ospf6": {
"neighbors": {
"r1": {
"state": "Full",
"role": "DR"
},
"r2": {
"state": "Full",
"role": "DROther"
},
"r3": {
"state": "Full",
"role": "DROther"
}
}
}
}
}
result = verify_ospf6_neighbor(tgen, topo, dut, input_dict, lan=True)
3. To check there are no neighbors.
input_dict = {
"r0": {
"ospf6": {
"neighbors": []
}
}
}
result = verify_ospf6_neighbor(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if topo is None:
topo = tgen.json_topo
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf neighbor json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
ospf_data_list = input_dict[router]["ospf6"]
ospf_nbr_list = ospf_data_list["neighbors"]
# Check if looking for no neighbors
if ospf_nbr_list == []:
if show_ospf_json["neighbors"] == []:
logger.info("[DUT: {}] OSPF6 no neighbors found".format(router))
return True
else:
errormsg = (
"[DUT: {}] OSPF6 active neighbors found, expected None".format(
router
)
)
return errormsg
for ospf_nbr, nbr_data in ospf_nbr_list.items():
try:
data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
except KeyError:
data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
"router_id"
]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf6" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip
else:
continue
else:
neighbor_ip = data_ip[router]["ipv6"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
get_index_val = dict(
(d["neighborId"], dict(d, index=index))
for (index, d) in enumerate(show_ospf_json["neighbors"])
)
try:
nh_state = get_index_val.get(neighbor_ip)["state"]
intf_state = get_index_val.get(neighbor_ip)["ifState"]
except TypeError:
errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
router, nbr_rid, ospf_nbr
)
return errormsg
nbr_state = nbr_data.setdefault("state", None)
nbr_role = nbr_data.setdefault("role", None)
if nbr_state:
if nbr_state == nh_state:
logger.info(
"[DUT: {}] OSPF6 Nbr is {}:{} State {}".format(
router, ospf_nbr, nbr_rid, nh_state
)
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF6 is not Converged, neighbor"
" state is {} , Expected state is {}".format(
router, nh_state, nbr_state
)
)
return errormsg
if nbr_role:
if nbr_role == intf_state:
logger.info(
"[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format(
router, ospf_nbr, nbr_rid, nbr_role
)
)
else:
errormsg = (
"[DUT: {}] OSPF6 is not Converged with rid"
"{}, role is {}, Expected role is {}".format(
router, nbr_rid, intf_state, nbr_role
)
)
return errormsg
continue
else:
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF6 neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf neighbor json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf6"]
ospf_neighbors = ospf_data_list["neighbors"]
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
ospf_nbr_list = ospf_data_list["neighbors"]
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
try:
data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
except KeyError:
data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
"router_id"
]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf6" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip
else:
continue
else:
neighbor_ip = data_ip
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
get_index_val = dict(
(d["neighborId"], dict(d, index=index))
for (index, d) in enumerate(show_ospf_json["neighbors"])
)
try:
nh_state = get_index_val.get(neighbor_ip)["state"]
intf_state = get_index_val.get(neighbor_ip)["ifState"]
except TypeError:
errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
router, nbr_rid, ospf_nbr
)
return errormsg
if nh_state == "Full":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF6 is Converged".format(router))
result = True
else:
errormsg = "[DUT: {}] OSPF6 is not Converged".format(router)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=40)
def verify_ospf_rib(
tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None, expected=True
):
"""
This API is to verify ospf routes by running
show ip ospf route command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `next_hop` : next to be verified
* `tag` : tag to be verified
* `metric` : metric to be verified
* `fib` : True if the route is installed in FIB.
* `expected` : expected results from API, by-default True
Usage
-----
input_dict = {
"r1": {
"static_routes": [
{
"network": ip_net,
"no_of_ip": 1,
"routeType": "N"
}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
Returns
-------
True or False (Error Message)
"""
logger.info("Entering lib API: verify_ospf_rib()")
result = False
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.items():
if router != dut:
continue
logger.info("Checking router %s RIB:", router)
# Verifying RIB routes
command = "show ip ospf route"
found_routes = []
missing_routes = []
if (
"static_routes" in input_dict[routerInput]
or "prefix" in input_dict[routerInput]
):
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
errormsg = (
"[DUT: {}] No routes found in OSPF route "
"table".format(router)
)
return errormsg
network = static_route["network"]
no_of_ip = static_route.setdefault("no_of_ip", 1)
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
st_found = False
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != "ipv4":
continue
if st_rt in ospf_rib_json:
st_found = True
found_routes.append(st_rt)
if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
if (
"fib"
in ospf_rib_json[st_rt][mnh]["nexthops"][0]
):
found_hops.append(
[
rib_r["ip"]
for rib_r in ospf_rib_json[st_rt][mnh][
"nexthops"
]
]
)
if found_hops[0]:
missing_list_of_nexthops = set(
found_hops[0]
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
"Nexthop "
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
found_hops = [
rib_r["ip"]
for rib_r in ospf_rib_json[st_rt]["nexthops"]
]
if found_hops:
missing_list_of_nexthops = set(
found_hops
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
"Missing nexthop %s for route"
" %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is Missing for "
"route {} in RIB of router {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
if _rtype:
if "routeType" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: routeType missing"
" for route {} in OSPF RIB \n".format(
dut, st_rt
)
)
return errormsg
elif _rtype != ospf_rib_json[st_rt]["routeType"]:
errormsg = (
"[DUT: {}]: routeType mismatch"
" for route {} in OSPF RIB \n".format(
dut, st_rt
)
)
return errormsg
else:
logger.info(
"[DUT: {}]: Found routeType {}"
" for route {}".format(dut, _rtype, st_rt)
)
if tag:
if "tag" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: tag is not"
" present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if _tag != ospf_rib_json[st_rt]["tag"]:
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
" route {} in RIB \n".format(
dut,
_tag,
st_rt,
)
)
return errormsg
if metric is not None:
if "type2cost" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: metric is"
" not present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if metric != ospf_rib_json[st_rt]["type2cost"]:
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
"route {} in RIB \n".format(
dut,
metric,
st_rt,
)
)
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
"[DUT: {}]: Found next_hop {} for all OSPF"
" routes in RIB".format(router, next_hop)
)
if len(missing_routes) > 0:
errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
dut, missing_routes
)
return errormsg
if found_routes:
logger.info(
"[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
dut,
found_routes,
)
result = True
logger.info("Exiting lib API: verify_ospf_rib()")
return result
@retry(retry_timeout=20)
def verify_ospf_interface(
tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `lan`: if set to true this interface belongs to LAN.
* `input_dict` : Input dict data, required when configuring from testcase
* `expected` : expected results from API, by-default True
Usage
-----
input_dict= {
'r0': {
'links':{
's1': {
'ospf':{
'priority':98,
'timerDeadSecs': 4,
'area': '0.0.0.3',
'mcastMemberOspfDesignatedRouters': True,
'mcastMemberOspfAllRouters': True,
'ospfEnabled': True,
}
}
}
}
}
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
if topo is None:
topo = tgen.json_topo
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
intf = topo["routers"][router]["links"][ospf_intf]["interface"]
if intf in show_ospf_json["interfaces"]:
for intf_attribute in intf_data["ospf"]:
if (
intf_data["ospf"][intf_attribute]
== show_ospf_json["interfaces"][intf][intf_attribute]
):
logger.info(
"[DUT: %s] OSPF interface %s: %s is %s",
router,
intf,
intf_attribute,
intf_data["ospf"][intf_attribute],
)
else:
errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \
Expected is {}".format(
router,
intf,
intf_attribute,
intf_data["ospf"][intf_attribute],
show_ospf_json["interfaces"][intf][intf_attribute],
)
return errormsg
result = True
logger.debug("Exiting API: verify_ospf_interface()")
return result
@retry(retry_timeout=40)
def verify_ospf_database(
tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None, expected=True
):
"""
This API is to verify ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
* `expected` : expected results from API, by-default True
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"Router Link States": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"Net Link States": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = verify_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: verify_ospf_database()")
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
if not rid:
rid = "self-originate"
if lsatype:
if vrf is None:
command = "show ip ospf database {} {} json".format(lsatype, rid)
else:
command = "show ip ospf database {} {} vrf {} json".format(
lsatype, rid, vrf
)
else:
if vrf is None:
command = "show ip ospf database json"
else:
command = "show ip ospf database vrf {} json".format(vrf)
show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
# import pdb; pdb.set_trace()
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if ospf_area in show_ospf_json["routerLinkStates"]["areas"]:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
_advrtr = lsa.setdefault("advertisedRouter", None)
_options = lsa.setdefault("options", None)
if (
_options
and lsa["lsaId"]
== show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["linkStateId"]
and lsa["options"]
== show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["options"]
):
result = True
break
else:
errormsg = '[DUT: {}] OSPF LSA options: expected {}, Received Options are {} lsa["options"] {} OSPF LSAID: expected lsaid {}, Received lsaid {}'.format(
dut,
show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["options"],
_options,
lsa["options"],
show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["linkStateId"],
lsa["lsaId"],
)
return errormsg
if "Net Link States" in area_lsa:
for lsa in area_lsa["Net Link States"]:
if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "Summary Link States" in area_lsa:
for lsa in area_lsa["Summary Link States"]:
if (
lsa
in show_ospf_json["areas"][ospf_area]["Summary Link States"]
):
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "ASBR-Summary Link States" in area_lsa:
for lsa in area_lsa["ASBR-Summary Link States"]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"ASBR-Summary Link States"
]
):
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if ospf_external_lsa:
for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
if ospf_ext_lsa in show_ospf_json["AS External Link States"]:
logger.info(
"[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB : expected"
" External LSA is {}".format(router, ospf_ext_lsa)
)
return errormsg
logger.debug("Exiting API: verify_ospf_database()")
return result
@retry(retry_timeout=20)
def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
Usage
-----
input_dict = {
"11.0.0.0/8": {
"Summary address": "11.0.0.0/8",
"Metric-type": "E2",
"Metric": 20,
"Tag": 0,
"External route count": 5
}
}
result = verify_ospf_summary(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
router = dut
logger.info("Verifying OSPF summary on router %s:", router)
rnode = tgen.routers()[dut]
if ospf:
if "ospf6" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
return errormsg
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf summary detail json", isjson=True
)
else:
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
return errormsg
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf summary detail json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# To find neighbor ip type
ospf_summary_data = input_dict
if ospf:
show_ospf_json = show_ospf_json["default"]
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
summary = ospf_summary_data[ospf_summ]["Summary address"]
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
logger.info(
"[DUT: %s] OSPF summary %s:%s is %s",
router,
summary,
summ,
summ_data[summ],
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF summary {} : {} is {}, "
"Expected is {}".format(
router,
summary,
summ,
show_ospf_json[summary][summ],
summ_data[summ],
)
)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=30)
def verify_ospf6_rib(
tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
):
"""
This API is to verify ospf routes by running
show ip ospf route command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `next_hop` : next to be verified
* `tag` : tag to be verified
* `metric` : metric to be verified
* `fib` : True if the route is installed in FIB.
Usage
-----
input_dict = {
"r1": {
"static_routes": [
{
"network": ip_net,
"no_of_ip": 1,
"routeType": "N"
}
]
}
}
result = verify_ospf6_rib(tgen, dut, input_dict,next_hop=nh)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.items():
if router != dut:
continue
logger.info("Checking router %s RIB:", router)
# Verifying RIB routes
command = "show ipv6 ospf route detail"
found_routes = []
missing_routes = []
if (
"static_routes" in input_dict[routerInput]
or "prefix" in input_dict[routerInput]
):
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Fix for PR 2644182
try:
ospf_rib_json = ospf_rib_json["routes"]
except KeyError:
pass
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
errormsg = (
"[DUT: {}] No routes found in OSPF6 route "
"table".format(router)
)
return errormsg
network = static_route["network"]
no_of_ip = static_route.setdefault("no_of_ip", 1)
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
if len(ip_list) == 1:
ip_list = [network]
st_found = False
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != "ipv6":
continue
if st_rt in ospf_rib_json:
st_found = True
found_routes.append(st_rt)
if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
if (
"fib"
in ospf_rib_json[st_rt][mnh]["nextHops"][0]
):
found_hops.append(
[
rib_r["ip"]
for rib_r in ospf_rib_json[st_rt][mnh][
"nextHops"
]
]
)
if found_hops[0]:
missing_list_of_nexthops = set(
found_hops[0]
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
"Nexthop "
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
found_hops = [
rib_r["nextHop"]
for rib_r in ospf_rib_json[st_rt]["nextHops"]
]
if found_hops:
missing_list_of_nexthops = set(
found_hops
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
"Missing nexthop %s for route"
" %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is Missing for "
"route {} in RIB of router {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
if _rtype:
if "destinationType" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: destinationType missing"
"for route {} in OSPF RIB \n".format(dut, st_rt)
)
return errormsg
elif _rtype != ospf_rib_json[st_rt]["destinationType"]:
errormsg = (
"[DUT: {}]: destinationType mismatch"
"for route {} in OSPF RIB \n".format(dut, st_rt)
)
return errormsg
else:
logger.info(
"DUT: {}]: Found destinationType {}"
"for route {}".format(dut, _rtype, st_rt)
)
if tag:
if "tag" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: tag is not"
" present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if _tag != ospf_rib_json[st_rt]["tag"]:
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
" route {} in RIB \n".format(
dut,
_tag,
st_rt,
)
)
return errormsg
if metric is not None:
if "metricCostE2" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: metric is"
" not present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if metric != ospf_rib_json[st_rt]["metricCostE2"]:
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
"route {} in RIB \n".format(
dut,
metric,
st_rt,
)
)
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
"[DUT: {}]: Found next_hop {} for all OSPF"
" routes in RIB".format(router, next_hop)
)
if len(missing_routes) > 0:
errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
dut, missing_routes
)
return errormsg
if found_routes:
logger.info(
"[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
dut,
found_routes,
)
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=6)
def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `lan`: if set to true this interface belongs to LAN.
* `input_dict` : Input dict data, required when configuring from testcase
Usage
-----
input_dict= {
'r0': {
'links':{
's1': {
'ospf6':{
'priority':98,
'timerDeadSecs': 4,
'area': '0.0.0.3',
'mcastMemberOspfDesignatedRouters': True,
'mcastMemberOspfAllRouters': True,
'ospfEnabled': True,
}
}
}
}
}
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf6_interface")
result = False
if topo is None:
topo = tgen.json_topo
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf interface json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
intf = topo["routers"][router]["links"][ospf_intf]["interface"]
if intf in show_ospf_json:
for intf_attribute in intf_data["ospf6"]:
if intf_data["ospf6"][intf_attribute] is not list:
if (
intf_data["ospf6"][intf_attribute]
== show_ospf_json[intf][intf_attribute]
):
logger.info(
"[DUT: %s] OSPF6 interface %s: %s is %s",
router,
intf,
intf_attribute,
intf_data["ospf6"][intf_attribute],
)
elif intf_data["ospf6"][intf_attribute] is list:
for addr_list in len(show_ospf_json[intf][intf_attribute]):
if (
show_ospf_json[intf][intf_attribute][addr_list][
"address"
].split("/")[0]
== intf_data["ospf6"]["internetAddress"][0]["address"]
):
break
else:
errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
Expected is {}".format(
router,
intf,
intf_attribute,
intf_data["ospf6"][intf_attribute],
intf_data["ospf6"][intf_attribute],
)
return errormsg
else:
errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
Expected is {}".format(
router,
intf,
intf_attribute,
intf_data["ospf6"][intf_attribute],
intf_data["ospf6"][intf_attribute],
)
return errormsg
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=20)
def verify_ospf6_database(tgen, topo, dut, input_dict):
"""
This API is to verify ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"routerLinkStates": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"networkLinkStates": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = verify_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if ospf_area in show_ospf_json["areas"]:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
for rtrlsa in show_ospf_json["areas"][ospf_area][
"routerLinkStates"
]:
if (
lsa["lsaId"] == rtrlsa["lsaId"]
and lsa["advertisedRouter"]
== rtrlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "networkLinkStates" in area_lsa:
for lsa in area_lsa["networkLinkStates"]:
for netlsa in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]
):
if (
lsa["lsaId"] == netlsa["lsaId"]
and lsa["advertisedRouter"]
== netlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "summaryLinkStates" in area_lsa:
for lsa in area_lsa["summaryLinkStates"]:
for t3lsa in show_ospf_json["areas"][ospf_area][
"summaryLinkStates"
]:
if (
lsa["lsaId"] == t3lsa["lsaId"]
and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "nssaExternalLinkStates" in area_lsa:
for lsa in area_lsa["nssaExternalLinkStates"]:
for t7lsa in show_ospf_json["areas"][ospf_area][
"nssaExternalLinkStates"
]:
if (
lsa["lsaId"] == t7lsa["lsaId"]
and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "asbrSummaryLinkStates" in area_lsa:
for lsa in area_lsa["asbrSummaryLinkStates"]:
for t4lsa in show_ospf_json["areas"][ospf_area][
"asbrSummaryLinkStates"
]:
if (
lsa["lsaId"] == t4lsa["lsaId"]
and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "linkLocalOpaqueLsa" in area_lsa:
for lsa in area_lsa["linkLocalOpaqueLsa"]:
try:
for lnklsa in show_ospf_json["areas"][ospf_area][
"linkLocalOpaqueLsa"
]:
if (
lsa["lsaId"] in lnklsa["lsaId"]
and "linkLocalOpaqueLsa"
in show_ospf_json["areas"][ospf_area]
):
logger.info(
(
"[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
"%s",
ospf_area,
lsa,
)
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF LSDB area: {} "
"expected Opaque-LSA is {}, Found is {}".format(
ospf_area, lsa, show_ospf_json
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
return errormsg
if ospf_external_lsa:
for lsa in ospf_external_lsa:
try:
for t5lsa in show_ospf_json["asExternalLinkStates"]:
if (
lsa["lsaId"] == t5lsa["lsaId"]
and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
):
result = True
break
except KeyError:
result = False
if result:
logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB : expected"
" External LSA is {}".format(router, lsa)
)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def config_ospf6_interface(
tgen, topo=None, input_dict=None, build=False, load_config=True
):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
r1_ospf_auth = {
"r1": {
"links": {
"r2": {
"ospf": {
"authentication": 'message-digest',
"authentication-key": "ospf",
"message-digest-key": "10"
}
}
}
}
}
result = config_ospf6_interface(tgen, topo, r1_ospf_auth)
Returns
-------
True or False
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if topo is None:
topo = tgen.json_topo
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
config_data_dict = {}
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
if "ospf6" not in input_dict[router]["links"][lnk]:
logger.debug(
"Router %s: ospf6 config is not present in"
"input_dict, passed input_dict %s",
router,
str(input_dict),
)
continue
ospf_data = input_dict[router]["links"][lnk]["ospf6"]
data_ospf_area = ospf_data.setdefault("area", None)
data_ospf_auth = ospf_data.setdefault("hash-algo", None)
data_ospf_keychain = ospf_data.setdefault("keychain", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
try:
intf = topo["routers"][router]["links"][lnk]["interface"]
except KeyError:
intf = topo["switches"][router]["links"][lnk]["interface"]
# interface
cmd = "interface {}".format(intf)
config_data.append(cmd)
# interface area config
if data_ospf_area:
cmd = "ipv6 ospf area {}".format(data_ospf_area)
config_data.append(cmd)
# interface ospf auth
if data_ospf_auth:
cmd = "ipv6 ospf6 authentication"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
if "hash-algo" in ospf_data:
cmd = "{} key-id {} hash-algo {} key {}".format(
cmd,
ospf_data["key-id"],
ospf_data["hash-algo"],
ospf_data["key"],
)
config_data.append(cmd)
# interface ospf auth with keychain
if data_ospf_keychain:
cmd = "ipv6 ospf6 authentication"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
if "keychain" in ospf_data:
cmd = "{} keychain {}".format(cmd, ospf_data["keychain"])
config_data.append(cmd)
# interface ospf dr priority
if data_ospf_dr_priority:
cmd = "ipv6 ospf priority {}".format(ospf_data["priority"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost:
cmd = "ipv6 ospf cost {}".format(ospf_data["cost"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf mtu
if data_ospf_mtu:
cmd = "ipv6 ospf mtu-ignore"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
if config_data:
config_data_dict[router] = config_data
result = create_common_configurations(
tgen, config_data_dict, "interface_config", build=build
)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=20)
def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
"""
This API is used to vreify gr helper using command
show ip ospf graceful-restart helper
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* 'dut' : router
* 'input_dict' - values to be verified
Usage:
-------
input_dict = {
"helperSupport":"Disabled",
"strictLsaCheck":"Enabled",
"restartSupoort":"Planned and Unplanned Restarts",
"supportedGracePeriod":1800
}
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF GR details on router %s:", dut)
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf graceful-restart helper json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
raise ValueError(errormsg)
return errormsg
for ospf_gr, gr_data in input_dict.items():
try:
if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
logger.info(
"[DUT: FRR] OSPF GR Helper: %s is %s",
ospf_gr,
show_ospf_json[ospf_gr],
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
"is {}".format(
ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def get_ospf_database(tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None):
"""
This API is to return ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
* `vrf` : vrf to be checked
* `lsatype` : type of lsa to be checked
* `rid` : router id for lsa to be checked
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"routerLinkStates": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"networkLinkStates": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = get_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
sleep(10)
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
if not rid:
rid = "self-originate"
if lsatype:
if vrf is None:
command = "show ip ospf database {} {} json".format(lsatype, rid)
else:
command = "show ip ospf database {} {} vrf {} json".format(
lsatype, rid, vrf
)
else:
if vrf is None:
command = "show ip ospf database json"
else:
command = "show ip ospf database vrf {} json".format(vrf)
show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if "areas" in show_ospf_json and ospf_area in show_ospf_json["areas"]:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
for rtrlsa in show_ospf_json["areas"][ospf_area][
"routerLinkStates"
]:
_advrtr = lsa.setdefault("advertisedRouter", None)
_options = lsa.setdefault("options", None)
if (
_advrtr
and lsa["lsaId"] == rtrlsa["lsaId"]
and lsa["advertisedRouter"]
== rtrlsa["advertisedRouter"]
):
result = True
break
if (
_options
and lsa["lsaId"] == rtrlsa["lsaId"]
and lsa["options"] == rtrlsa["options"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}\n found Router LSA: {}".format(
router, ospf_area, lsa, rtrlsa
)
)
return errormsg
if "networkLinkStates" in area_lsa:
for lsa in area_lsa["networkLinkStates"]:
for netlsa in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]
):
if (
lsa["lsaId"] == netlsa["lsaId"]
and lsa["advertisedRouter"]
== netlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "summaryLinkStates" in area_lsa:
for lsa in area_lsa["summaryLinkStates"]:
for t3lsa in show_ospf_json["areas"][ospf_area][
"summaryLinkStates"
]:
if (
lsa["lsaId"] == t3lsa["lsaId"]
and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "nssaExternalLinkStates" in area_lsa:
for lsa in area_lsa["nssaExternalLinkStates"]:
for t7lsa in show_ospf_json["areas"][ospf_area][
"nssaExternalLinkStates"
]:
if (
lsa["lsaId"] == t7lsa["lsaId"]
and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "asbrSummaryLinkStates" in area_lsa:
for lsa in area_lsa["asbrSummaryLinkStates"]:
for t4lsa in show_ospf_json["areas"][ospf_area][
"asbrSummaryLinkStates"
]:
if (
lsa["lsaId"] == t4lsa["lsaId"]
and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "linkLocalOpaqueLsa" in area_lsa:
for lsa in area_lsa["linkLocalOpaqueLsa"]:
try:
for lnklsa in show_ospf_json["areas"][ospf_area][
"linkLocalOpaqueLsa"
]:
if (
lsa["lsaId"] in lnklsa["lsaId"]
and "linkLocalOpaqueLsa"
in show_ospf_json["areas"][ospf_area]
):
logger.info(
(
"[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
"%s",
ospf_area,
lsa,
)
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF LSDB area: {} "
"expected Opaque-LSA is {}, Found is {}".format(
ospf_area, lsa, show_ospf_json
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
return errormsg
else:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
for rtrlsa in show_ospf_json["routerLinkStates"]:
_advrtr = lsa.setdefault("advertisedRouter", None)
_options = lsa.setdefault("options", None)
_age = lsa.setdefault("lsaAge", None)
if (
_options
and lsa["options"]
== show_ospf_json["routerLinkStates"][rtrlsa][
ospf_area
][0]["options"]
):
result = True
break
if (
_age is not "get"
and lsa["lsaAge"]
== show_ospf_json["routerLinkStates"][rtrlsa][
ospf_area
][0]["lsaAge"]
):
result = True
break
if _age == "get":
return "{}".format(
show_ospf_json["routerLinkStates"][rtrlsa][
ospf_area
][0]["lsaAge"]
)
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}\n found Router LSA: {}".format(
router,
ospf_area,
lsa,
show_ospf_json["routerLinkStates"],
)
)
return errormsg
if "networkLinkStates" in area_lsa:
for lsa in area_lsa["networkLinkStates"]:
for netlsa in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]
):
if (
lsa["lsaId"] == netlsa["lsaId"]
and lsa["advertisedRouter"]
== netlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "summaryLinkStates" in area_lsa:
for lsa in area_lsa["summaryLinkStates"]:
for t3lsa in show_ospf_json["areas"][ospf_area][
"summaryLinkStates"
]:
if (
lsa["lsaId"] == t3lsa["lsaId"]
and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "nssaExternalLinkStates" in area_lsa:
for lsa in area_lsa["nssaExternalLinkStates"]:
for t7lsa in show_ospf_json["areas"][ospf_area][
"nssaExternalLinkStates"
]:
if (
lsa["lsaId"] == t7lsa["lsaId"]
and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "asbrSummaryLinkStates" in area_lsa:
for lsa in area_lsa["asbrSummaryLinkStates"]:
for t4lsa in show_ospf_json["areas"][ospf_area][
"asbrSummaryLinkStates"
]:
if (
lsa["lsaId"] == t4lsa["lsaId"]
and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "linkLocalOpaqueLsa" in area_lsa:
for lsa in area_lsa["linkLocalOpaqueLsa"]:
try:
for lnklsa in show_ospf_json["areas"][ospf_area][
"linkLocalOpaqueLsa"
]:
if (
lsa["lsaId"] in lnklsa["lsaId"]
and "linkLocalOpaqueLsa"
in show_ospf_json["areas"][ospf_area]
):
logger.info(
(
"[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
"%s",
ospf_area,
lsa,
)
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF LSDB area: {} "
"expected Opaque-LSA is {}, Found is {}".format(
ospf_area, lsa, show_ospf_json
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
return errormsg
if ospf_external_lsa:
for lsa in ospf_external_lsa:
try:
for t5lsa in show_ospf_json["asExternalLinkStates"]:
if (
lsa["lsaId"] == t5lsa["lsaId"]
and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
):
result = True
break
except KeyError:
result = False
if result:
logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB : expected"
" External LSA is {}".format(router, lsa)
)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result