tests: Add global pim join-prune-interval json config option

- cleanup pim configuration code

Signed-off-by: Christian Hopps <chopps@labn.net>
This commit is contained in:
Christian Hopps 2021-07-29 03:32:33 +00:00
parent e8b7548c0d
commit 0a76e764c8
3 changed files with 104 additions and 102 deletions

View File

@ -55,7 +55,7 @@ def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True
input_dict = {
"r1": {
"pim": {
"disable" : ["l1-i1-eth1"],
"join-prune-interval": "5",
"rp": [{
"rp_addr" : "1.0.3.17".
"keep-alive-timer": "100"
@ -90,7 +90,7 @@ def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True
if "rp" not in input_dict[router]["pim"]:
continue
result = _create_pim_config(
result = _create_pim_rp_config(
tgen, topo, input_dict, router, build, load_config
)
if result is not True:
@ -100,9 +100,9 @@ def create_pim_config(tgen, topo, input_dict=None, build=False, load_config=True
return result
def _create_pim_config(tgen, topo, input_dict, router, build=False, load_config=False):
def _create_pim_rp_config(tgen, topo, input_dict, router, build=False, load_config=False):
"""
Helper API to create pim configuration.
Helper API to create pim RP configurations.
Parameters
----------
@ -119,96 +119,93 @@ def _create_pim_config(tgen, topo, input_dict, router, build=False, load_config=
result = False
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
pim_data = input_dict[router]["pim"]
pim_data = input_dict[router]["pim"]
rp_data = pim_data["rp"]
for dut in tgen.routers():
if "pim" not in input_dict[router]:
continue
# Configure this RP on every router.
for dut in tgen.routers():
for destLink, data in topo[dut]["links"].items():
if "pim" not in data:
continue
# At least one interface must be enabled for PIM on the router
pim_if_enabled = False
for destLink, data in topo[dut]["links"].items():
if "pim" in data:
pim_if_enabled = True
if not pim_if_enabled:
continue
if "rp" in pim_data:
config_data = []
rp_data = pim_data["rp"]
config_data = []
for rp_dict in deepcopy(rp_data):
# ip address of RP
if "rp_addr" not in rp_dict and build:
logger.error(
"Router %s: 'ip address of RP' not "
"present in input_dict/JSON",
router,
)
for rp_dict in deepcopy(rp_data):
# ip address of RP
if "rp_addr" not in rp_dict and build:
logger.error(
"Router %s: 'ip address of RP' not "
"present in input_dict/JSON",
router,
)
return False
rp_addr = rp_dict.setdefault("rp_addr", None)
return False
rp_addr = rp_dict.setdefault("rp_addr", None)
# Keep alive Timer
keep_alive_timer = rp_dict.setdefault("keep_alive_timer", None)
# Keep alive Timer
keep_alive_timer = rp_dict.setdefault("keep_alive_timer", None)
# Group Address range to cover
if "group_addr_range" not in rp_dict and build:
logger.error(
"Router %s:'Group Address range to cover'"
" not present in input_dict/JSON",
router,
)
# Group Address range to cover
if "group_addr_range" not in rp_dict and build:
logger.error(
"Router %s:'Group Address range to cover'"
" not present in input_dict/JSON",
router,
)
return False
group_addr_range = rp_dict.setdefault("group_addr_range", None)
return False
group_addr_range = rp_dict.setdefault("group_addr_range", None)
# Group prefix-list filter
prefix_list = rp_dict.setdefault("prefix_list", None)
# Group prefix-list filter
prefix_list = rp_dict.setdefault("prefix_list", None)
# Delete rp config
del_action = rp_dict.setdefault("delete", False)
# Delete rp config
del_action = rp_dict.setdefault("delete", False)
if keep_alive_timer:
cmd = "ip pim rp keep-alive-timer {}".format(keep_alive_timer)
config_data.append(cmd)
if keep_alive_timer:
cmd = "ip pim rp keep-alive-timer {}".format(keep_alive_timer)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if rp_addr:
if group_addr_range:
if type(group_addr_range) is not list:
group_addr_range = [group_addr_range]
for grp_addr in group_addr_range:
cmd = "ip pim rp {} {}".format(rp_addr, grp_addr)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
config_data.append(cmd)
if rp_addr:
if group_addr_range:
if type(group_addr_range) is not list:
group_addr_range = [group_addr_range]
for grp_addr in group_addr_range:
cmd = "ip pim rp {} {}".format(rp_addr, grp_addr)
config_data.append(cmd)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if prefix_list:
cmd = "ip pim rp {} prefix-list {}".format(
rp_addr, prefix_list
)
config_data.append(cmd)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if prefix_list:
cmd = "ip pim rp {} prefix-list {}".format(
rp_addr, prefix_list
)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
try:
result = create_common_configuration(
tgen, dut, config_data, "pim", build, load_config
)
if result is not True:
logger.error("Error applying PIM config", exc_info=True)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return False
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()
logger.error(errormsg)
return errormsg
except InvalidCLIError as error:
logger.error("Error applying PIM config: %s", error, exc_info=error)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return False
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@ -338,35 +335,19 @@ def _enable_disable_pim(tgen, topo, input_dict, router, build=False):
try:
config_data = []
enable_flag = True
# Disable pim on interface
if "pim" in input_dict[router]:
if "disable" in input_dict[router]["pim"]:
enable_flag = False
interfaces = input_dict[router]["pim"]["disable"]
# Enable pim on interfaces
for destRouterLink, data in sorted(topo[router]["links"].items()):
if "pim" in data and data["pim"] == "enable":
if type(interfaces) is not list:
interfaces = [interfaces]
# Loopback interfaces
if "type" in data and data["type"] == "loopback":
interface_name = destRouterLink
else:
interface_name = data["interface"]
for interface in interfaces:
cmd = "interface {}".format(interface)
config_data.append(cmd)
config_data.append("no ip pim")
# Enable pim on interface
if enable_flag:
for destRouterLink, data in sorted(topo[router]["links"].items()):
if "pim" in data and data["pim"] == "enable":
# Loopback interfaces
if "type" in data and data["type"] == "loopback":
interface_name = destRouterLink
else:
interface_name = data["interface"]
cmd = "interface {}".format(interface_name)
config_data.append(cmd)
config_data.append("ip pim")
cmd = "interface {}".format(interface_name)
config_data.append(cmd)
config_data.append("ip pim")
result = create_common_configuration(
tgen, router, config_data, "interface_config", build=build
@ -374,6 +355,22 @@ def _enable_disable_pim(tgen, topo, input_dict, router, build=False):
if result is not True:
return False
config_data = []
if "pim" in input_dict[router]:
pim_data = input_dict[router]["pim"]
for t in [
"join-prune-interval",
"keep-alive-timer",
"register-suppress-time",
]:
if t in pim_data:
cmd = "ip pim {} {}".format(t, pim_data[t])
config_data.append(cmd)
if config_data:
result = create_common_configuration(
tgen, router, config_data, "pim", build=build
)
except InvalidCLIError:
# Traceback
errormsg = traceback.format_exc()

View File

@ -78,6 +78,7 @@ from lib.common_config import (
step,
iperfSendIGMPJoin,
addKernelRoute,
apply_raw_config,
reset_config_on_routers,
iperfSendTraffic,
kill_iperf,
@ -1553,8 +1554,10 @@ def test_modify_igmp_max_query_response_timer_p0(request):
assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
step("Delete the PIM and IGMP on FRR1")
input_dict_1 = {"l1": {"pim": {"disable": ["l1-i1-eth1"]}}}
result = create_pim_config(tgen, topo, input_dict_1)
raw_config = {
"l1": {"raw_config": ["interface l1-i1-eth1", "no ip pim"]}
}
result = apply_raw_config(tgen, raw_config)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
input_dict_2 = {

View File

@ -2231,8 +2231,10 @@ def test_verify_remove_add_pim_commands_when_igmp_configured_p1(request):
step("Remove 'no ip pim' on receiver interface on FRR1")
intf_l1_i1 = topo["routers"]["l1"]["links"]["i1"]["interface"]
input_dict_1 = {"l1": {"pim": {"disable": intf_l1_i1}}}
result = create_pim_config(tgen, topo, input_dict_1)
raw_config = {
"l1": {"raw_config": ["interface {}".format(intf_l1_i1), "no ip pim"]}
}
result = apply_raw_config(tgen, raw_config)
assert result is True, "Testcase {}: Failed Error: {}".format(tc_name, result)
step("Verify that no core is observed")