tests: Verify PIM neighbor and static routes for BSM tests

Verifying and making sure PIM neighbors are
up before sending BSM packet using Scapy.

Verifying static routes are installed before
proceeding fruther.

Signed-off-by: Kuldeep Kashyap <kashyapk@vmware.com>
This commit is contained in:
Kuldeep Kashyap 2022-06-14 01:03:09 -07:00
parent 89f76cd132
commit 697ce62f7a
4 changed files with 140 additions and 8 deletions

View File

@ -3365,8 +3365,9 @@ def verify_rib(
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
st_rt = str(
ipaddress.ip_network(frr_unicode(st_rt), strict=False)
)
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
@ -3528,8 +3529,8 @@ def verify_rib(
if nh_found:
logger.info(
"[DUT: {}]: Found next_hop {} for all bgp"
" routes in RIB".format(router, next_hop)
"[DUT: {}]: Found next_hop {} for"
" RIB routes: {}".format(router, next_hop, found_routes)
)
if len(missing_routes) > 0:
@ -3593,7 +3594,7 @@ def verify_rib(
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
@ -3750,8 +3751,9 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
st_rt = str(
ipaddress.ip_network(frr_unicode(st_rt), strict=False)
)
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:
continue
@ -3855,7 +3857,7 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
_addr_type = validate_ip_address(st_rt)
if _addr_type != addr_type:

View File

@ -525,6 +525,9 @@ def verify_pim_neighbors(tgen, topo, dut=None, iface=None, nbr_ip=None, expected
if "pim" not in data:
continue
if "pim" in data and data["pim"] == "disable":
continue
if "pim" in data and data["pim"] == "enable":
local_interface = data["interface"]

View File

@ -84,6 +84,7 @@ from lib.common_config import (
run_frr_cmd,
required_linux_kernel_version,
topo_daemons,
verify_rib,
)
from lib.pim import (
@ -106,6 +107,7 @@ from lib.pim import (
clear_pim_interface_traffic,
get_pim_interface_traffic,
McastTesterHelper,
verify_pim_neighbors,
)
from lib.topolog import logger
from lib.topojson import build_config_from_json
@ -180,6 +182,10 @@ def setup_module(mod):
# Creating configuration from JSON
build_config_from_json(tgen, topo)
# Verify PIM neighbors
result = verify_pim_neighbors(tgen, topo)
assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result)
# XXX Replace this using "with McastTesterHelper()... " in each test if possible.
global app_helper
app_helper = McastTesterHelper(tgen)
@ -306,6 +312,14 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
for dut, _nexthop in zip([fhr, rp, lhr], [next_hop, next_hop_rp, next_hop_lhr]):
input_routes = {dut: input_dict[dut]}
result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
# RP Mapping
rp_mapping = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["rp_mapping"]
@ -328,11 +342,24 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", fhr, input_dict, next_hop_fhr)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
input_dict = {
lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]},
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", lhr, input_dict, next_hop_lhr)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
return True
@ -443,6 +470,23 @@ def test_BSR_higher_prefer_ip_p0(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
for dut, _nexthop in zip(["i1", "l1"], [next_hop_rp, next_hop_lhr]):
input_routes = {dut: input_dict[dut]}
result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
for bsr_add, next_hop in zip([BSR1_ADDR, BSR2_ADDR], [NEXT_HOP1, NEXT_HOP2]):
input_routes = {
"f1": {"static_routes": [{"network": bsr_add, "next_hop": next_hop}]}
}
result = verify_rib(tgen, "ipv4", "f1", input_routes, next_hop)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
# Use scapy to send pre-defined packet from senser to receiver
step("Send BSR packet from b1 to FHR")
result = scapy_send_bsr_raw_packet(tgen, topo, "b1", "f1", "packet9")
@ -626,6 +670,24 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
for dut, _nexthop in zip(["i1", "l1"], [next_hop_rp, next_hop_lhr]):
input_routes = {dut: input_dict[dut]}
result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
input_routes = {
"f1": {"static_routes": [{"network": CRP, "next_hop": next_hop_fhr}]}
}
result = verify_rib(tgen, "ipv4", "f1", input_routes, expected=False)
assert (
result is not True
), "Testcase {} : Failed \n " "Route is still present \n Error {}".format(
tc_name, result
)
# Use scapy to send pre-defined packet from senser to receiver
group = topo["routers"]["b1"]["bsm"]["bsr_packets"]["packet9"]["group"]
@ -642,6 +704,10 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", "f1", input_dict)
assert result is True, "Testcase {} : Failed \n Error {}".format(tc_name, result)
intf_f1_i1 = topo["routers"]["f1"]["links"]["i1"]["interface"]
step("Verify bsm transit count is not increamented" "show ip pim interface traffic")
state_dict = {"f1": {intf_f1_i1: ["bsmTx"]}}
@ -694,6 +760,28 @@ def test_BSR_CRP_with_blackhole_address_p1(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
input_dict = {
"f1": {"static_routes": [{"network": BSR1_ADDR, "next_hop": NEXT_HOP1}]}
}
result = verify_rib(tgen, "ipv4", "f1", input_dict, NEXT_HOP1)
assert result is True, "Testcase {} : Failed \n Error {}".format(tc_name, result)
input_dict = {
"f1": {
"static_routes": [
{"network": [BSR1_ADDR, CRP], "next_hop": "blackhole", "delete": True}
]
}
}
result = verify_rib(tgen, "ipv4", "f1", input_dict, expected=False)
assert result is not True, (
"Testcase {} : Failed \n "
"Routes:[{}, {}] are still present \n Error {}".format(
tc_name, BSR1_ADDR, CRP, result
)
)
step("Sending BSR after removing black-hole address for BSR and candidate RP")
step("Send BSR packet from b1 to FHR")
result = scapy_send_bsr_raw_packet(tgen, topo, "b1", "f1", "packet9")
@ -1614,6 +1702,14 @@ def test_iif_join_state_p0(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", "l1", input_dict, expected=False)
assert (
result is not True
), "Testcase {} : Failed \n " "Routes:{} are still present \n Error {}".format(
tc_name, rp_ip, result
)
# Check RP unreachable
step("Check RP unreachability")
iif = "Unknown"
@ -1654,6 +1750,10 @@ def test_iif_join_state_p0(request):
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", "l1", input_dict, next_hop_lhr)
assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
# Verify that (*,G) installed in mroute again
iif = "l1-i1-eth0"
result = verify_mroutes(tgen, dut, src_addr, GROUP_ADDRESS, iif, oil)

View File

@ -68,6 +68,7 @@ from lib.common_config import (
run_frr_cmd,
required_linux_kernel_version,
topo_daemons,
verify_rib,
)
from lib.pim import (
@ -86,6 +87,7 @@ from lib.pim import (
clear_mroute,
clear_pim_interface_traffic,
McastTesterHelper,
verify_pim_neighbors,
)
from lib.topolog import logger
from lib.topojson import build_config_from_json
@ -160,6 +162,10 @@ def setup_module(mod):
# Creating configuration from JSON
build_config_from_json(tgen, topo)
# Verify PIM neighbors
result = verify_pim_neighbors(tgen, topo)
assert result is True, " Verify PIM neighbor: Failed Error: {}".format(result)
# XXX Replace this using "with McastTesterHelper()... " in each test if possible.
global app_helper
app_helper = McastTesterHelper(tgen)
@ -247,6 +253,14 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
for dut, _nexthop in zip([fhr, rp, lhr], [next_hop, next_hop_rp, next_hop_lhr]):
input_routes = {dut: input_dict[dut]}
result = verify_rib(tgen, "ipv4", dut, input_routes, _nexthop)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
# Add kernel route for source
group = topo["routers"][bsr]["bsm"]["bsr_packets"][packet]["pkt_dst"]
bsr_interface = topo["routers"][bsr]["links"][fhr]["interface"]
@ -285,11 +299,24 @@ def pre_config_to_bsm(tgen, topo, tc_name, bsr, sender, receiver, fhr, rp, lhr,
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", fhr, input_dict, next_hop_fhr)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
input_dict = {
lhr: {"static_routes": [{"network": rp_list, "next_hop": next_hop_lhr}]},
}
result = create_static_routes(tgen, input_dict)
assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
# Verifying static routes are installed
result = verify_rib(tgen, "ipv4", lhr, input_dict, next_hop_lhr)
assert result is True, "Testcase {} : Failed \n Error {}".format(
tc_name, result
)
return True