mirror_frr/tests/topotests/mgmt_notif/test_notif.py
Igor Ryzhov d94f80fbc4 lib, mgmtd: fix processing of yang notifications
Current code assumes that notification is always sent in stripped JSON
format and therefore notification xpath starts at the third symbol of
notification data. Assuming JSON is more or less fine, because this
representation is internal to FRR, but the assumption about the xpath is
wrong, because it won't work for not top-level notifications. YANG
allows to define notification as a child for some data node deep into
the tree and in this case notification data contains not only the
notification node itself, but also all its parents.

To fix the issue, parse the notification data and get its xpath from its
schema node.

Signed-off-by: Igor Ryzhov <iryzhov@nfware.com>
2024-02-10 01:00:24 +02:00

103 lines
2.6 KiB
Python

# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: ISC
#
# January 23 2024, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2024, LabN Consulting, L.L.C.
#
"""
Test YANG Notifications
"""
import json
import logging
import os
import pytest
from lib.topogen import Topogen
from lib.topotest import json_cmp
from oper import check_kernel_32
pytestmark = [pytest.mark.ripd, pytest.mark.staticd, pytest.mark.mgmtd]
CWD = os.path.dirname(os.path.realpath(__file__))
@pytest.fixture(scope="module")
def tgen(request):
"Setup/Teardown the environment and provide tgen argument to tests"
topodef = {
"s1": ("r1", "r2"),
}
tgen = Topogen(topodef, request.module.__name__)
tgen.start_topology()
router_list = tgen.routers()
for rname, router in router_list.items():
router.load_frr_config("frr.conf")
tgen.start_router()
yield tgen
tgen.stop_topology()
def test_frontend_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
fe_client_path = CWD + "/../lib/fe_client.py"
rc, _, _ = r1.cmd_status(fe_client_path + " --help")
if rc:
pytest.skip("No protoc or present cannot run test")
# The first notifications is a frr-ripd:authentication-type-failure
# So we filter to avoid that, all the rest are frr-ripd:authentication-failure
# making our test deterministic
output = r1.cmd_raises(
fe_client_path + " --listen frr-ripd:authentication-failure"
)
jsout = json.loads(output)
expected = {"frr-ripd:authentication-failure": {"interface-name": "r1-eth0"}}
result = json_cmp(jsout, expected)
assert result is None
output = r1.cmd_raises(fe_client_path + " --listen")
jsout = json.loads(output)
expected = {"frr-ripd:authentication-failure": {"interface-name": "r1-eth0"}}
result = json_cmp(jsout, expected)
assert result is None
def test_backend_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"].net
check_kernel_32(r1, "11.11.11.11", 1, "")
be_client_path = "/usr/lib/frr/mgmtd_testc"
rc, _, _ = r1.cmd_status(be_client_path + " --help")
if rc:
pytest.skip("No mgmtd_testc")
output = r1.cmd_raises(
be_client_path + " --timeout 20 --log file:mgmt_testc.log --listen /frr-ripd"
)
jsout = json.loads(output)
expected = {"frr-ripd:authentication-failure": {"interface-name": "r1-eth0"}}
result = json_cmp(jsout, expected)
assert result is None