mirror_frr/tests/topotests/lib/snmptest.py
Philippe Guibert 5e7bb79f11 topotests: lib, fix filter out "At line 73 in /usr/share/snmp/mibs/ietf/SNMPv2-PDU" message
When testing SNMP service on FRR, the following error message may
appear on some distros.

> # snmpwalk -v2c -c public .1.3.6 1.1.1.1 <OID>
> Bad operator (INTEGER): At line 73 in /usr/share/snmp/mibs/ietf/SNMPv2-PDU
> [..then result ..]
>

The error message is due to the /etc/snmp/snmp.conf file. By default, this
file is used by both snmp server and client side. The net-snmp MIB parsing
routing loads MIBS, to bind oids with the naming scheme used by the MIBS.

> # cat /etc/frr/snmp.conf
> [snmp]
> mibs +ALL
>

A potential fix would consist in modifying the SNMPv2-PDU.mib file: the
problem is known on ubuntu distros, as the snmp-mibs-downloader package
has not updated the SNMPv2-PDU.mib file.

The choice is done to not modify the original distro where the test is run
on. Fix the topotests by ignoring the 'SNMPv2-PDU line 73" error message, and
keep the other error messages that may happen, for instance, when an
unknown oid name value is requested.

Signed-off-by: Philippe Guibert <philippe.guibert@6wind.com>
2024-04-23 14:24:51 +02:00

287 lines
9.4 KiB
Python

# SPDX-License-Identifier: ISC
#
# topogen.py
# Library of helper functions for NetDEF Topology Tests
#
# Copyright (c) 2020 by Volta Networks
#
#
"""
SNMP library to test snmp walks and gets
Basic usage instructions:
* define an SnmpTester class giving a router, address, community and version
* use test_oid or test_walk to check values in MIBS
* see tests/topotest/simple-snmp-test/test_simple_snmp.py for example
"""
from lib.topolog import logger
import re
class SnmpTester(object):
"A helper class for testing SNMP"
def __init__(self, router, iface, community, version, options=""):
self.community = community
self.version = version
self.router = router
self.iface = iface
self.options = options
logger.info(
"created SNMP tester: SNMPv{0} community:{1}".format(
self.version, self.community
)
)
def _snmp_config(self):
"""
Helper function to build a string with SNMP
configuration for commands.
"""
return "-v {0} -c {1} {2} {3}".format(
self.version, self.community, self.options, self.iface
)
@staticmethod
def _get_snmp_value(snmp_output):
tokens = snmp_output.strip().split()
num_value_tokens = len(tokens) - 3
# this copes with the emptys string return
if num_value_tokens == 0:
return tokens[2]
if num_value_tokens > 1:
output = ""
index = 3
while index < len(tokens) - 1:
output += "{} ".format(tokens[index])
index += 1
output += "{}".format(tokens[index])
return output
# third token is the value of the object
return tokens[3]
@staticmethod
def _get_snmp_oid(snmp_output):
tokens = snmp_output.strip().split()
# third token onwards is the value of the object
return tokens[0].split(".", 1)[1]
def _parse_multiline(self, snmp_output):
results = snmp_output.strip().split("\n")
out_dict = {}
out_list = []
for response in results:
out_dict[self._get_snmp_oid(response)] = self._get_snmp_value(response)
out_list.append(self._get_snmp_value(response))
return out_dict, out_list
def get(self, oid):
cmd = "snmpget {0} {1} 2>&1 | grep -v SNMPv2-PDU".format(
self._snmp_config(), oid
)
result = self.router.cmd(cmd)
if "not found" in result:
return None
return self._get_snmp_value(result)
def get_next(self, oid):
cmd = "snmpgetnext {0} {1} 2>&1 | grep -v SNMPv2-PDU".format(
self._snmp_config(), oid
)
result = self.router.cmd(cmd)
print("get_next: {}".format(result))
if "not found" in result:
return None
return self._get_snmp_value(result)
def walk(self, oid):
cmd = "snmpwalk {0} {1} 2>&1 | grep -v SNMPv2-PDU".format(
self._snmp_config(), oid
)
result = self.router.cmd(cmd)
return self._parse_multiline(result)
def parse_notif_ipv4(self, notif):
# normalise values
notif = re.sub(":", "", notif)
notif = re.sub('"([0-9]{2}) ([0-9]{2}) "', r"\1\2", notif)
notif = re.sub('"([0-9]{2}) "', r"\1", notif)
elems = re.findall(r"([0-9,\.]+) = ([0-9,\.]+)", notif)
# remove common part
elems = elems[1:]
return elems
def is_notif_bgp4_valid(self, output_list, address):
oid_notif_type = ".1.3.6.1.6.3.1.1.4.1.0"
peer_notif_established = ".1.3.6.1.2.1.15.0.1"
peer_notif_backward = ".1.3.6.1.2.1.15.0.2"
oid_peer_last_error = ".1.3.6.1.2.1.15.3.1.14"
oid_peer_remote_addr = ".1.3.6.1.2.1.15.3.1.7"
oid_peer_state = ".1.3.6.1.2.1.15.3.1.2"
nb_notif = len(output_list)
for nb in range(0, nb_notif - 1):
# identify type of notification
# established or BackwardTransition
if output_list[nb][0][0] != "{}".format(oid_notif_type):
return False
if output_list[nb][0][1] == "{}".format(peer_notif_established):
logger.info("Established notification")
elif output_list[nb][0][1] == "{}".format(peer_notif_backward):
logger.info("Backward transition notification")
else:
return False
# same behavior for 2 notification type in bgp4
if output_list[nb][1][0] != "{}.{}".format(oid_peer_remote_addr, address):
return False
if output_list[nb][2][0] != "{}.{}".format(oid_peer_last_error, address):
return False
if output_list[nb][3][0] != "{}.{}".format(oid_peer_state, address):
return False
return True
def is_notif_bgp4v2_valid(self, output_list, address, type_requested):
oid_notif_type = ".1.3.6.1.6.3.1.1.4.1.0"
peer_notif_established = ".1.3.6.1.3.5.1.0.1"
peer_notif_backward = ".1.3.6.1.3.5.1.0.2"
oid_peer_state = ".1.3.6.1.3.5.1.1.2.1.13"
oid_peer_local_port = ".1.3.6.1.3.5.1.1.2.1.6"
oid_peer_remote_port = ".1.3.6.1.3.5.1.1.2.1.9"
oid_peer_err_code_recv = ".1.3.6.1.3.5.1.1.3.1.1"
oid_peer_err_sub_code_recv = ".1.3.6.1.3.5.1.1.3.1.2"
oid_peer_err_recv_text = ".1.3.6.1.3.5.1.1.3.1.4"
nb_notif = len(output_list)
for nb in range(nb_notif):
if output_list[nb][0][0] != "{}".format(oid_notif_type):
return False
if output_list[nb][0][1] == "{}".format(peer_notif_established):
logger.info("Established notification")
notif_type = "Estab"
elif output_list[nb][0][1] == "{}".format(peer_notif_backward):
logger.info("Backward transition notification")
notif_type = "Backward"
else:
return False
if notif_type != type_requested:
continue
if output_list[nb][1][0] != "{}.1.{}".format(oid_peer_state, address):
continue
if output_list[nb][2][0] != "{}.1.{}".format(oid_peer_local_port, address):
return False
if output_list[nb][3][0] != "{}.1.{}".format(oid_peer_remote_port, address):
return False
if notif_type == "Estab":
return True
if output_list[nb][4][0] != "{}.1.{}".format(
oid_peer_err_code_recv, address
):
return False
if output_list[nb][5][0] != "{}.1.{}".format(
oid_peer_err_sub_code_recv, address
):
return False
if output_list[nb][6][0] != "{}.1.{}".format(
oid_peer_err_recv_text, address
):
return False
return True
return False
def get_notif_bgp4(self, output_file):
notifs = []
notif_list = []
whitecleanfile = re.sub("\t", " ", output_file)
results = whitecleanfile.strip().split("\n")
# don't consider additional SNMP or application messages
for result in results:
if re.search(r"(\.([0-9]+))+\s", result):
notifs.append(result)
oid_v4 = r"1\.3\.6\.1\.2\.1\.15"
for one_notif in notifs:
is_ipv4_notif = re.search(oid_v4, one_notif)
if is_ipv4_notif != None:
formated_notif = self.parse_notif_ipv4(one_notif)
notif_list.append(formated_notif)
return notif_list
def get_notif_bgp4v2(self, output_file):
notifs = []
notif_list = []
whitecleanfile = re.sub("\t", " ", output_file)
results = whitecleanfile.strip().split("\n")
# don't consider additional SNMP or application messages
for result in results:
if re.search(r"(\.([0-9]+))+\s", result):
notifs.append(result)
oid_v6 = r"1\.3\.6\.1\.3\.5\.1"
for one_notif in notifs:
is_ipv6_notif = re.search(oid_v6, one_notif)
if is_ipv6_notif != None:
formated_notif = self.parse_notif_ipv4(one_notif)
notif_list.append(formated_notif)
return notif_list
def test_oid(self, oid, value):
print("oid: {}".format(self.get_next(oid)))
return self.get_next(oid) == value
def test_oid_walk(self, oid, values, oids=None):
results_dict, results_list = self.walk(oid)
print("test_oid_walk: {} {}".format(oid, results_dict))
if oids is not None:
index = 0
for oid in oids:
# avoid key error for missing keys
if not oid in results_dict.keys():
print("FAIL: missing oid key {}".format(oid))
return False
if results_dict[oid] != values[index]:
print(
"FAIL{} {} |{}| == |{}|".format(
oid, index, results_dict[oid], values[index]
)
)
return False
index += 1
return True
# Return true if 'values' is a subset of 'results_list'
print("test {} == {}".format(results_list[: len(values)], values))
return results_list[: len(values)] == values