Merge pull request #13546 from LabNConsulting/chopps/pylint-fix

tests: fix pylint error, and update style in lib/*.py
This commit is contained in:
Donatas Abraitis 2023-05-17 22:56:03 +03:00 committed by GitHub
commit d66a297fab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 18 additions and 41 deletions

View File

@ -164,7 +164,6 @@ def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config
router, router,
) )
else: else:
ipv4_data = bgp_addr_data.setdefault("ipv4", {}) ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {}) ipv6_data = bgp_addr_data.setdefault("ipv6", {})
l2vpn_data = bgp_addr_data.setdefault("l2vpn", {}) l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
@ -645,7 +644,6 @@ def __create_l2vpn_evpn_address_family(
if advertise_data: if advertise_data:
for address_type, unicast_type in advertise_data.items(): for address_type, unicast_type in advertise_data.items():
if type(unicast_type) is dict: if type(unicast_type) is dict:
for key, value in unicast_type.items(): for key, value in unicast_type.items():
cmd = "advertise {} {}".format(address_type, key) cmd = "advertise {} {}".format(address_type, key)
@ -1651,7 +1649,6 @@ def modify_as_number(tgen, topo, input_dict):
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try: try:
new_topo = deepcopy(topo["routers"]) new_topo = deepcopy(topo["routers"])
router_dict = {} router_dict = {}
for router in input_dict.keys(): for router in input_dict.keys():
@ -1953,7 +1950,6 @@ def clear_bgp_and_verify(tgen, topo, router, rid=None):
total_peer = 0 total_peer = 0
for addr_type in bgp_addr_type.keys(): for addr_type in bgp_addr_type.keys():
if not check_address_types(addr_type): if not check_address_types(addr_type):
continue continue
@ -2022,7 +2018,6 @@ def clear_bgp_and_verify(tgen, topo, router, rid=None):
peer_uptime_after_clear_bgp = {} peer_uptime_after_clear_bgp = {}
# Verifying BGP convergence after bgp clear command # Verifying BGP convergence after bgp clear command
for retry in range(50): for retry in range(50):
# Waiting for BGP to converge # Waiting for BGP to converge
logger.info( logger.info(
"Waiting for %s sec for BGP to converge on router" " %s...", "Waiting for %s sec for BGP to converge on router" " %s...",
@ -3278,7 +3273,6 @@ def verify_graceful_restart(
if lmode is None: if lmode is None:
if "graceful-restart" in input_dict[dut]["bgp"]: if "graceful-restart" in input_dict[dut]["bgp"]:
if ( if (
"graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"] "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
and input_dict[dut]["bgp"]["graceful-restart"][ and input_dict[dut]["bgp"]["graceful-restart"][
@ -3326,7 +3320,6 @@ def verify_graceful_restart(
if rmode is None: if rmode is None:
if "graceful-restart" in input_dict[peer]["bgp"]: if "graceful-restart" in input_dict[peer]["bgp"]:
if ( if (
"graceful-restart" "graceful-restart"
in input_dict[peer]["bgp"]["graceful-restart"] in input_dict[peer]["bgp"]["graceful-restart"]
@ -3628,7 +3621,6 @@ def verify_eor(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"] eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
if "endOfRibSend" in eor_json: if "endOfRibSend" in eor_json:
if eor_json["endOfRibSend"]: if eor_json["endOfRibSend"]:
logger.info( logger.info(
"[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
@ -3918,7 +3910,6 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer)
"timer" "timer"
].items(): ].items():
if rs_timer == "restart-time": if rs_timer == "restart-time":
receivedTimer = value receivedTimer = value
if ( if (
show_bgp_graceful_json_out["timers"][ show_bgp_graceful_json_out["timers"][
@ -4777,7 +4768,6 @@ def get_prefix_count_route(
tgen = get_topogen() tgen = get_topogen()
for router, rnode in tgen.routers().items(): for router, rnode in tgen.routers().items():
if router == dut: if router == dut:
if vrf: if vrf:
ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf) ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True) show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
@ -4871,7 +4861,6 @@ def verify_rib_default_route(
connected_routes = {} connected_routes = {}
for router, rnode in tgen.routers().items(): for router, rnode in tgen.routers().items():
if router == dut: if router == dut:
ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True) ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True) ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
is_ipv4_default_attrib_found = False is_ipv4_default_attrib_found = False

View File

@ -25,6 +25,7 @@ from lib.lutil import luCommand, luResult, LUtil
import json import json
import re import re
# gpz: get rib in json form and compare against desired routes # gpz: get rib in json form and compare against desired routes
class BgpRib: class BgpRib:
def log(self, str): def log(self, str):

View File

@ -243,7 +243,6 @@ def run_frr_cmd(rnode, cmd, isjson=False):
def apply_raw_config(tgen, input_dict): def apply_raw_config(tgen, input_dict):
""" """
API to configure raw configuration on device. This can be used for any cli API to configure raw configuration on device. This can be used for any cli
which has not been implemented in JSON. which has not been implemented in JSON.
@ -447,7 +446,6 @@ def check_router_status(tgen):
try: try:
router_list = tgen.routers() router_list = tgen.routers()
for router, rnode in router_list.items(): for router, rnode in router_list.items():
result = rnode.check_router_running() result = rnode.check_router_running()
if result != "": if result != "":
daemons = [] daemons = []
@ -1914,7 +1912,6 @@ def interface_status(tgen, topo, input_dict):
rlist = [] rlist = []
for router in input_dict.keys(): for router in input_dict.keys():
interface_list = input_dict[router]["interface_list"] interface_list = input_dict[router]["interface_list"]
status = input_dict[router].setdefault("status", "up") status = input_dict[router].setdefault("status", "up")
for intf in interface_list: for intf in interface_list:
@ -2529,7 +2526,6 @@ def create_route_maps(tgen, input_dict, build=False):
continue continue
rmap_data = [] rmap_data = []
for rmap_name, rmap_value in input_dict[router]["route_maps"].items(): for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
for rmap_dict in rmap_value: for rmap_dict in rmap_value:
del_action = rmap_dict.setdefault("delete", False) del_action = rmap_dict.setdefault("delete", False)
@ -3002,7 +2998,6 @@ def addKernelRoute(
group_addr_range = [group_addr_range] group_addr_range = [group_addr_range]
for grp_addr in group_addr_range: for grp_addr in group_addr_range:
addr_type = validate_ip_address(grp_addr) addr_type = validate_ip_address(grp_addr)
if addr_type == "ipv4": if addr_type == "ipv4":
if next_hop is not None: if next_hop is not None:
@ -3193,7 +3188,6 @@ def configure_brctl(tgen, topo, input_dict):
if "brctl" in input_dict[dut]: if "brctl" in input_dict[dut]:
for brctl_dict in input_dict[dut]["brctl"]: for brctl_dict in input_dict[dut]["brctl"]:
brctl_names = brctl_dict.setdefault("brctl_name", []) brctl_names = brctl_dict.setdefault("brctl_name", [])
addvxlans = brctl_dict.setdefault("addvxlan", []) addvxlans = brctl_dict.setdefault("addvxlan", [])
stp_values = brctl_dict.setdefault("stp", []) stp_values = brctl_dict.setdefault("stp", [])
@ -3203,7 +3197,6 @@ def configure_brctl(tgen, topo, input_dict):
for brctl_name, vxlan, vrf, stp in zip( for brctl_name, vxlan, vrf, stp in zip(
brctl_names, addvxlans, vrfs, stp_values brctl_names, addvxlans, vrfs, stp_values
): ):
ip_cmd_list = [] ip_cmd_list = []
cmd = "ip link add name {} type bridge stp_state {}".format( cmd = "ip link add name {} type bridge stp_state {}".format(
brctl_name, stp brctl_name, stp
@ -3614,7 +3607,6 @@ def verify_rib(
for static_route in static_routes: for static_route in static_routes:
if "vrf" in static_route and static_route["vrf"] is not None: if "vrf" in static_route and static_route["vrf"] is not None:
logger.info( logger.info(
"[DUT: {}]: Verifying routes for VRF:" "[DUT: {}]: Verifying routes for VRF:"
" {}".format(router, static_route["vrf"]) " {}".format(router, static_route["vrf"])
@ -4053,7 +4045,6 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=
for static_route in static_routes: for static_route in static_routes:
if "vrf" in static_route and static_route["vrf"] is not None: if "vrf" in static_route and static_route["vrf"] is not None:
logger.info( logger.info(
"[DUT: {}]: Verifying routes for VRF:" "[DUT: {}]: Verifying routes for VRF:"
" {}".format(router, static_route["vrf"]) " {}".format(router, static_route["vrf"])

View File

@ -177,7 +177,17 @@ Total %-4d %-4d %d\n\
self.log("unable to read: " + tstFile) self.log("unable to read: " + tstFile)
sys.exit(1) sys.exit(1)
def command(self, target, command, regexp, op, result, returnJson, startt=None, force_result=False): def command(
self,
target,
command,
regexp,
op,
result,
returnJson,
startt=None,
force_result=False,
):
global net global net
if op == "jsoncmp_pass" or op == "jsoncmp_fail": if op == "jsoncmp_pass" or op == "jsoncmp_fail":
returnJson = True returnJson = True
@ -326,7 +336,9 @@ Total %-4d %-4d %d\n\
if strict and (wait_count == 1): if strict and (wait_count == 1):
force_result = True force_result = True
found = self.command(target, command, regexp, op, result, returnJson, startt, force_result) found = self.command(
target, command, regexp, op, result, returnJson, startt, force_result
)
if found is not False: if found is not False:
break break
@ -342,6 +354,7 @@ Total %-4d %-4d %d\n\
# initialized by luStart # initialized by luStart
LUtil = None LUtil = None
# entry calls # entry calls
def luStart( def luStart(
baseScriptDir=".", baseScriptDir=".",
@ -455,6 +468,7 @@ def luShowFail():
if printed > 0: if printed > 0:
logger.error("See %s for details of errors" % LUtil.fout_name) logger.error("See %s for details of errors" % LUtil.fout_name)
# #
# Sets default wait type for luCommand(op="wait) (may be overridden by # Sets default wait type for luCommand(op="wait) (may be overridden by
# specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")). # specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).

View File

@ -593,7 +593,6 @@ def find_rp_details(tgen, topo):
topo_data = topo["routers"] topo_data = topo["routers"]
for router in router_list.keys(): for router in router_list.keys():
if "pim" not in topo_data[router]: if "pim" not in topo_data[router]:
continue continue
@ -1495,7 +1494,6 @@ def verify_mroutes(
and data["outboundInterface"] in oil and data["outboundInterface"] in oil
): ):
if return_uptime: if return_uptime:
uptime_dict[grp_addr][src_address] = data["upTime"] uptime_dict[grp_addr][src_address] = data["upTime"]
logger.info( logger.info(
@ -1917,7 +1915,6 @@ def get_pim_interface_traffic(tgen, input_dict):
for intf, data in input_dict[dut].items(): for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf] interface_json = show_pim_intf_traffic_json[intf]
for state in data: for state in data:
# Verify Tx/Rx # Verify Tx/Rx
if state in interface_json: if state in interface_json:
output_dict[dut][state] = interface_json[state] output_dict[dut][state] = interface_json[state]
@ -1990,7 +1987,6 @@ def get_pim6_interface_traffic(tgen, input_dict):
for intf, data in input_dict[dut].items(): for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf] interface_json = show_pim_intf_traffic_json[intf]
for state in data: for state in data:
# Verify Tx/Rx # Verify Tx/Rx
if state in interface_json: if state in interface_json:
output_dict[dut][state] = interface_json[state] output_dict[dut][state] = interface_json[state]
@ -3007,7 +3003,6 @@ def verify_pim_upstream_rpf(
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if "pim" in topo["routers"][dut]: if "pim" in topo["routers"][dut]:
logger.info("[DUT: %s]: Verifying ip pim upstream rpf:", dut) logger.info("[DUT: %s]: Verifying ip pim upstream rpf:", dut)
rnode = tgen.routers()[dut] rnode = tgen.routers()[dut]
@ -3245,7 +3240,6 @@ def verify_pim_join(
grp_addr = grp_addr.split("/")[0] grp_addr = grp_addr.split("/")[0]
for source, data in interface_json[grp_addr].items(): for source, data in interface_json[grp_addr].items():
# Verify pim join # Verify pim join
if pim_join: if pim_join:
if data["group"] == grp_addr and data["channelJoinName"] == "JOIN": if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
@ -3338,7 +3332,6 @@ def verify_igmp_config(tgen, input_dict, stats_return=False, expected=True):
rnode = tgen.routers()[dut] rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["igmp"]["interfaces"].items(): for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
statistics = False statistics = False
report = False report = False
if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]: if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
@ -3623,7 +3616,6 @@ def verify_pim_config(tgen, input_dict, expected=True):
rnode = tgen.routers()[dut] rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["pim"]["interfaces"].items(): for interface, data in input_dict[dut]["pim"]["interfaces"].items():
logger.info("[DUT: %s]: Verifying PIM interface %s detail:", dut, interface) logger.info("[DUT: %s]: Verifying PIM interface %s detail:", dut, interface)
show_ip_igmp_intf_json = run_frr_cmd( show_ip_igmp_intf_json = run_frr_cmd(
@ -3772,7 +3764,6 @@ def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=Tr
elif ( elif (
interface_json["pktsIn"] != 0 and interface_json["bytesIn"] != 0 interface_json["pktsIn"] != 0 and interface_json["bytesIn"] != 0
): ):
traffic_dict[traffic_type][interface][ traffic_dict[traffic_type][interface][
"pktsIn" "pktsIn"
] = interface_json["pktsIn"] ] = interface_json["pktsIn"]
@ -3836,7 +3827,6 @@ def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=Tr
interface_json["pktsOut"] != 0 interface_json["pktsOut"] != 0
and interface_json["bytesOut"] != 0 and interface_json["bytesOut"] != 0
): ):
traffic_dict[traffic_type][interface][ traffic_dict[traffic_type][interface][
"pktsOut" "pktsOut"
] = interface_json["pktsOut"] ] = interface_json["pktsOut"]
@ -4232,7 +4222,6 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
group_addresses = [group_addresses] group_addresses = [group_addresses]
if interface not in show_ip_local_igmp_json: if interface not in show_ip_local_igmp_json:
errormsg = ( errormsg = (
"[DUT %s]: Verifying local IGMP group received" "[DUT %s]: Verifying local IGMP group received"
" from interface %s [FAILED]!! " % (dut, interface) " from interface %s [FAILED]!! " % (dut, interface)
@ -4319,7 +4308,6 @@ def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type=
for intf, data in input_dict[dut].items(): for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf] interface_json = show_pim_intf_traffic_json[intf]
for state in data: for state in data:
# Verify Tx/Rx # Verify Tx/Rx
if state in interface_json: if state in interface_json:
output_dict[dut][state] = interface_json[state] output_dict[dut][state] = interface_json[state]
@ -4525,7 +4513,6 @@ def verify_mld_config(tgen, input_dict, stats_return=False, expected=True):
for dut in input_dict.keys(): for dut in input_dict.keys():
rnode = tgen.routers()[dut] rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["mld"]["interfaces"].items(): for interface, data in input_dict[dut]["mld"]["interfaces"].items():
statistics = False statistics = False
report = False report = False
if "statistics" in input_dict[dut]["mld"]["interfaces"][interface]["mld"]: if "statistics" in input_dict[dut]["mld"]["interfaces"][interface]["mld"]:
@ -5040,7 +5027,6 @@ def verify_pim6_config(tgen, input_dict, expected=True):
rnode = tgen.routers()[dut] rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["pim6"]["interfaces"].items(): for interface, data in input_dict[dut]["pim6"]["interfaces"].items():
logger.info( logger.info(
"[DUT: %s]: Verifying PIM6 interface %s detail:", dut, interface "[DUT: %s]: Verifying PIM6 interface %s detail:", dut, interface
) )
@ -5158,7 +5144,6 @@ def verify_local_mld_groups(tgen, dut, interface, group_addresses):
group_addresses = [group_addresses] group_addresses = [group_addresses]
if interface not in show_ipv6_local_mld_json["default"]: if interface not in show_ipv6_local_mld_json["default"]:
errormsg = ( errormsg = (
"[DUT %s]: Verifying local MLD group received" "[DUT %s]: Verifying local MLD group received"
" from interface %s [FAILED]!! " % (dut, interface) " from interface %s [FAILED]!! " % (dut, interface)

View File

@ -616,7 +616,6 @@ def test_json_object_asterisk_matching():
def test_json_list_nested_with_objects(): def test_json_list_nested_with_objects():
dcomplete = [{"key": 1, "list": [123]}, {"key": 2, "list": [123]}] dcomplete = [{"key": 1, "list": [123]}, {"key": 2, "list": [123]}]
dsub1 = [{"key": 2, "list": [123]}, {"key": 1, "list": [123]}] dsub1 = [{"key": 2, "list": [123]}, {"key": 1, "list": [123]}]

View File

@ -206,7 +206,6 @@ def build_topo_from_json(tgen, topo=None):
for destRouterLink, data in sorted( for destRouterLink, data in sorted(
topo["switches"][curSwitch]["links"].items() topo["switches"][curSwitch]["links"].items()
): ):
# Loopback interfaces # Loopback interfaces
if "dst_node" in data: if "dst_node" in data:
destRouter = data["dst_node"] destRouter = data["dst_node"]
@ -220,7 +219,6 @@ def build_topo_from_json(tgen, topo=None):
destRouter = destRouterLink destRouter = destRouterLink
if destRouter in listAllRouters: if destRouter in listAllRouters:
topo["routers"][destRouter]["links"][curSwitch] = deepcopy( topo["routers"][destRouter]["links"][curSwitch] = deepcopy(
topo["switches"][curSwitch]["links"][destRouterLink] topo["switches"][curSwitch]["links"][destRouterLink]
) )
@ -398,7 +396,7 @@ def setup_module_from_json(testfile, json_file=None):
tgen = create_tgen_from_json(testfile, json_file) tgen = create_tgen_from_json(testfile, json_file)
# Start routers (and their daemons) # Start routers (and their daemons)
start_topology(tgen, topo_daemons(tgen)) start_topology(tgen)
# Configure routers # Configure routers
build_config_from_json(tgen) build_config_from_json(tgen)