mirror of
https://git.proxmox.com/git/mirror_frr
synced 2025-08-07 01:40:16 +00:00
Merge pull request #4708 from opensourcerouting/topotest-common-improvement
topotest: attempt to stabilize CI system
This commit is contained in:
commit
c5cdc2b8ea
@ -385,7 +385,7 @@ def test_ip_prefix_lists_out_permit(request):
|
|||||||
assert result is True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
write_test_footer(tc_name)
|
write_test_footer(tc_name)
|
||||||
@ -496,7 +496,7 @@ def test_ip_prefix_lists_in_deny_and_permit_any(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r3"
|
dut = "r3"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
@ -713,7 +713,7 @@ def test_ip_prefix_lists_out_deny_and_permit_any(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r4"
|
dut = "r4"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
@ -858,7 +858,7 @@ def test_modify_prefix_lists_in_permit_to_deny(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r3"
|
dut = "r3"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
@ -971,7 +971,7 @@ def test_modify_prefix_lists_in_deny_to_permit(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r3"
|
dut = "r3"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
@ -1151,7 +1151,7 @@ def test_modify_prefix_lists_out_permit_to_deny(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r4"
|
dut = "r4"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
@ -1264,7 +1264,7 @@ def test_modify_prefix_lists_out_deny_to_permit(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r4"
|
dut = "r4"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
@ -1438,7 +1438,7 @@ def test_ip_prefix_lists_implicit_deny(request):
|
|||||||
# Verifying RIB routes
|
# Verifying RIB routes
|
||||||
dut = "r4"
|
dut = "r4"
|
||||||
protocol = "bgp"
|
protocol = "bgp"
|
||||||
result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol)
|
result = verify_rib(tgen, "ipv4", dut, input_dict_1, protocol=protocol, expected=False)
|
||||||
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
assert result is not True, "Testcase {} : Failed \n Error: {}".format(
|
||||||
tc_name, result)
|
tc_name, result)
|
||||||
|
|
||||||
|
@ -14,12 +14,12 @@ if ret != False and found != None:
|
|||||||
luCommand('ce1', 'vtysh -c "sharp remove routes 10.0.0.0 {}"'.format(num),'.','none','Removing {} routes'.format(num))
|
luCommand('ce1', 'vtysh -c "sharp remove routes 10.0.0.0 {}"'.format(num),'.','none','Removing {} routes'.format(num))
|
||||||
luCommand('ce2', 'vtysh -c "sharp remove routes 10.0.0.0 {}"'.format(num),'.','none','Removing {} routes'.format(num))
|
luCommand('ce2', 'vtysh -c "sharp remove routes 10.0.0.0 {}"'.format(num),'.','none','Removing {} routes'.format(num))
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni" | grep Display',' 10 route', 'wait', 'BGP routes removed', wait)
|
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni" | grep Display',' 10 route', 'wait', 'BGP routes removed', wait, wait_time=10)
|
||||||
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni"','.', 'none', 'BGP routes post remove')
|
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni"','.', 'none', 'BGP routes post remove')
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
luCommand(rtr, 'ip route show | grep -c \\^10\\.','^0$', 'wait', 'Linux routes removed', wait)
|
luCommand(rtr, 'ip route show | grep -c \\^10\\.','^0$', 'wait', 'Linux routes removed', wait, wait_time=10)
|
||||||
luCommand(rtr, 'ip route show','.', 'none', 'Linux routes post remove')
|
luCommand(rtr, 'ip route show','.', 'none', 'Linux routes post remove')
|
||||||
rtrs = ['r1', 'r3', 'r4']
|
rtrs = ['r1', 'r3', 'r4']
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
luCommand(rtr, 'ip route show vrf {}-cust1 | grep -c \\^10\\.'.format(rtr),'^0$','wait','VRF route removed',wait)
|
luCommand(rtr, 'ip route show vrf {}-cust1 | grep -c \\^10\\.'.format(rtr),'^0$','wait','VRF route removed',wait, wait_time=10)
|
||||||
#done
|
#done
|
||||||
|
@ -36,10 +36,10 @@ if doSharp == True:
|
|||||||
luCommand('ce2', 'vtysh -c "sharp install routes 10.0.0.0 nexthop 99.0.0.2 {}"'.format(num),'','pass','Adding {} routes'.format(num))
|
luCommand('ce2', 'vtysh -c "sharp install routes 10.0.0.0 nexthop 99.0.0.2 {}"'.format(num),'','pass','Adding {} routes'.format(num))
|
||||||
rtrs = ['ce1', 'ce2', 'ce3']
|
rtrs = ['ce1', 'ce2', 'ce3']
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni 10.{}.{}.{}"'.format(b,c,d), 'Last update:', 'wait', 'RXed last route, 10.{}.{}.{}'.format(b,c,d), wait)
|
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni 10.{}.{}.{}"'.format(b,c,d), 'Last update:', 'wait', 'RXed last route, 10.{}.{}.{}'.format(b,c,d), wait, wait_time=10)
|
||||||
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni" | grep -c 10\\.\\*/32', str(num), 'wait', 'See all sharp routes in BGP', wait)
|
luCommand(rtr, 'vtysh -c "show bgp ipv4 uni" | grep -c 10\\.\\*/32', str(num), 'wait', 'See all sharp routes in BGP', wait, wait_time=10)
|
||||||
luCommand('r1', 'vtysh -c "show bgp vrf r1-cust1 ipv4 uni 10.{}.{}.{}"'.format(b,c,d),'99.0.0.1','wait','RXed -> 10.{}.{}.{} from CE1'.format(b,c,d), wait)
|
luCommand('r1', 'vtysh -c "show bgp vrf r1-cust1 ipv4 uni 10.{}.{}.{}"'.format(b,c,d),'99.0.0.1','wait','RXed -> 10.{}.{}.{} from CE1'.format(b,c,d), wait, wait_time=10)
|
||||||
luCommand('r3', 'vtysh -c "show bgp vrf r3-cust1 ipv4 uni 10.{}.{}.{}"'.format(b,c,d),'99.0.0.2','wait','RXed -> 10.{}.{}.{} from CE2'.format(b,c,d), wait)
|
luCommand('r3', 'vtysh -c "show bgp vrf r3-cust1 ipv4 uni 10.{}.{}.{}"'.format(b,c,d),'99.0.0.2','wait','RXed -> 10.{}.{}.{} from CE2'.format(b,c,d), wait, wait_time=10)
|
||||||
luCommand('r1', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'99.0.0.1','wait','see VPN safi -> 10.{}.{}.{} from CE1'.format(b,c,d))
|
luCommand('r1', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'99.0.0.1','wait','see VPN safi -> 10.{}.{}.{} from CE1'.format(b,c,d))
|
||||||
luCommand('r3', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'99.0.0.2','wait','see VPN safi -> 10.{}.{}.{} from CE2'.format(b,c,d))
|
luCommand('r3', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'99.0.0.2','wait','see VPN safi -> 10.{}.{}.{} from CE2'.format(b,c,d))
|
||||||
luCommand('r3', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'1.1.1.1','wait','see VPN safi -> 10.{}.{}.{} from CE1'.format(b,c,d))
|
luCommand('r3', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'1.1.1.1','wait','see VPN safi -> 10.{}.{}.{} from CE1'.format(b,c,d))
|
||||||
@ -48,13 +48,13 @@ if doSharp == True:
|
|||||||
luCommand('r4', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'3.3.3.3','wait','see VPN safi -> 10.{}.{}.{} from CE2'.format(b,c,d))
|
luCommand('r4', 'vtysh -c "show bgp ipv4 vpn 10.{}.{}.{}"'.format(b,c,d),'3.3.3.3','wait','see VPN safi -> 10.{}.{}.{} from CE2'.format(b,c,d))
|
||||||
rtrs = ['ce1', 'ce2', 'ce3']
|
rtrs = ['ce1', 'ce2', 'ce3']
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
luCommand(rtr, 'ip route get 10.{}.{}.{}'.format(b,c,d),'dev','wait','Route to 10.{}.{}.{} available'.format(b,c,d), wait)
|
luCommand(rtr, 'ip route get 10.{}.{}.{}'.format(b,c,d),'dev','wait','Route to 10.{}.{}.{} available'.format(b,c,d), wait, wait_time=10)
|
||||||
luCommand(rtr, 'ip route show | grep -c \\^10\\.', str(num), 'wait', 'See {} linux routes'.format(num), wait)
|
luCommand(rtr, 'ip route show | grep -c \\^10\\.', str(num), 'wait', 'See {} linux routes'.format(num), wait, wait_time=10)
|
||||||
|
|
||||||
rtrs = ['r1', 'r3', 'r4']
|
rtrs = ['r1', 'r3', 'r4']
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
luCommand(rtr, 'ip route get vrf {}-cust1 10.{}.{}.{}'.format(rtr,b,c,d),'dev','wait','VRF route available',wait)
|
luCommand(rtr, 'ip route get vrf {}-cust1 10.{}.{}.{}'.format(rtr,b,c,d),'dev','wait','VRF route available',wait, wait_time=10)
|
||||||
luCommand(rtr, 'ip route show vrf {}-cust1 | grep -c \\^10\\.'.format(rtr), str(num), 'wait','See {} linux routes'.format(num), wait)
|
luCommand(rtr, 'ip route show vrf {}-cust1 | grep -c \\^10\\.'.format(rtr), str(num), 'wait','See {} linux routes'.format(num), wait, wait_time=10)
|
||||||
rtrs = ['ce1', 'ce2', 'ce3', 'r1', 'r2', 'r3', 'r4']
|
rtrs = ['ce1', 'ce2', 'ce3', 'r1', 'r2', 'r3', 'r4']
|
||||||
for rtr in rtrs:
|
for rtr in rtrs:
|
||||||
ret = luCommand(rtr, 'vtysh -c "show memory"', 'zebra: System allocator statistics: Total heap allocated: *(\d*) ([A-Za-z]*) .*bgpd: System allocator statistics: Total heap allocated: *(\d*) ([A-Za-z]*)', 'none', 'collect bgpd memory stats')
|
ret = luCommand(rtr, 'vtysh -c "show memory"', 'zebra: System allocator statistics: Total heap allocated: *(\d*) ([A-Za-z]*) .*bgpd: System allocator statistics: Total heap allocated: *(\d*) ([A-Za-z]*)', 'none', 'collect bgpd memory stats')
|
||||||
|
@ -30,6 +30,9 @@ import traceback
|
|||||||
import socket
|
import socket
|
||||||
import ipaddr
|
import ipaddr
|
||||||
|
|
||||||
|
from lib import topotest
|
||||||
|
|
||||||
|
from functools import partial
|
||||||
from lib.topolog import logger, logger_config
|
from lib.topolog import logger, logger_config
|
||||||
from lib.topogen import TopoRouter
|
from lib.topogen import TopoRouter
|
||||||
|
|
||||||
@ -1089,7 +1092,7 @@ def create_route_maps(tgen, input_dict, build=False):
|
|||||||
#############################################
|
#############################################
|
||||||
# Verification APIs
|
# Verification APIs
|
||||||
#############################################
|
#############################################
|
||||||
def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
|
def _verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
|
||||||
"""
|
"""
|
||||||
Data will be read from input_dict or input JSON file, API will generate
|
Data will be read from input_dict or input JSON file, API will generate
|
||||||
same prefixes, which were redistributed by either create_static_routes() or
|
same prefixes, which were redistributed by either create_static_routes() or
|
||||||
@ -1258,6 +1261,26 @@ def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def verify_rib(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None, expected=True):
|
||||||
|
"""
|
||||||
|
Wrapper function for `_verify_rib` that tries multiple time to get results.
|
||||||
|
|
||||||
|
When the expected result is `False` we actually should expect for an string instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Use currying to hide the parameters and create a test function.
|
||||||
|
test_func = partial(_verify_rib, tgen, addr_type, dut, input_dict, next_hop, protocol)
|
||||||
|
|
||||||
|
# Call the test function and expect it to return True, otherwise try it again.
|
||||||
|
if expected is True:
|
||||||
|
_, result = topotest.run_and_expect(test_func, True, count=20, wait=6)
|
||||||
|
else:
|
||||||
|
_, result = topotest.run_and_expect_type(test_func, str, count=20, wait=6)
|
||||||
|
|
||||||
|
# Return as normal.
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def verify_admin_distance_for_static_routes(tgen, input_dict):
|
def verify_admin_distance_for_static_routes(tgen, input_dict):
|
||||||
"""
|
"""
|
||||||
API to verify admin distance for static routes as defined in input_dict/
|
API to verify admin distance for static routes as defined in input_dict/
|
||||||
|
@ -22,6 +22,7 @@ import sys
|
|||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
|
import math
|
||||||
from topolog import logger
|
from topolog import logger
|
||||||
from mininet.net import Mininet
|
from mininet.net import Mininet
|
||||||
|
|
||||||
@ -242,20 +243,27 @@ Total %-4d %-4d %d\n\
|
|||||||
return js
|
return js
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def wait(self, target, command, regexp, op, result, wait, returnJson):
|
def wait(self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5):
|
||||||
self.log('%s:%s WAIT:%s:%s:%s:%s:%s:%s:' % \
|
self.log('%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:' % \
|
||||||
(self.l_filename, self.l_line, target, command, regexp, op, result,wait))
|
(self.l_filename, self.l_line, target, command, regexp, op, result,wait,wait_time))
|
||||||
found = False
|
found = False
|
||||||
n = 0
|
n = 0
|
||||||
startt = time.time()
|
startt = time.time()
|
||||||
delta = time.time() - startt
|
|
||||||
while delta < wait and found is False:
|
# Calculate the amount of `sleep`s we are going to peform.
|
||||||
|
wait_count = int(math.ceil(wait / wait_time)) + 1
|
||||||
|
|
||||||
|
while wait_count > 0:
|
||||||
|
n += 1
|
||||||
found = self.command(target, command, regexp, op, result, returnJson)
|
found = self.command(target, command, regexp, op, result, returnJson)
|
||||||
n+=1
|
if found is not False:
|
||||||
|
break
|
||||||
|
|
||||||
|
wait_count -= 1
|
||||||
|
if wait_count > 0:
|
||||||
|
time.sleep(wait_time)
|
||||||
|
|
||||||
delta = time.time() - startt
|
delta = time.time() - startt
|
||||||
self.log('\tFound: %s n: %s delta: %s and wait: %s' % (found, n, delta, wait))
|
|
||||||
if delta < wait and found is False:
|
|
||||||
time.sleep (0.5)
|
|
||||||
self.log('Done after %d loops, time=%s, Found=%s' % (n, delta, found))
|
self.log('Done after %d loops, time=%s, Found=%s' % (n, delta, found))
|
||||||
found = self.command(target, command, regexp, 'pass', '%s +%4.2f secs' % (result, delta), returnJson)
|
found = self.command(target, command, regexp, 'pass', '%s +%4.2f secs' % (result, delta), returnJson)
|
||||||
return found
|
return found
|
||||||
@ -281,11 +289,11 @@ def luStart(baseScriptDir='.', baseLogDir='.', net='',
|
|||||||
LUtil.l_dotall_experiment = False
|
LUtil.l_dotall_experiment = False
|
||||||
LUtil.l_dotall_experiment = True
|
LUtil.l_dotall_experiment = True
|
||||||
|
|
||||||
def luCommand(target, command, regexp='.', op='none', result='', time=10, returnJson=False):
|
def luCommand(target, command, regexp='.', op='none', result='', time=10, returnJson=False, wait_time=0.5):
|
||||||
if op != 'wait':
|
if op != 'wait':
|
||||||
return LUtil.command(target, command, regexp, op, result, returnJson)
|
return LUtil.command(target, command, regexp, op, result, returnJson)
|
||||||
else:
|
else:
|
||||||
return LUtil.wait(target, command, regexp, op, result, time, returnJson)
|
return LUtil.wait(target, command, regexp, op, result, time, returnJson, wait_time)
|
||||||
|
|
||||||
def luLast(usenl=False):
|
def luLast(usenl=False):
|
||||||
if usenl:
|
if usenl:
|
||||||
|
74
tests/topotests/lib/test/test_run_and_expect.py
Executable file
74
tests/topotests/lib/test/test_run_and_expect.py
Executable file
@ -0,0 +1,74 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
#
|
||||||
|
# test_run_and_expect.py
|
||||||
|
# Tests for library function: run_and_expect(_type)().
|
||||||
|
#
|
||||||
|
# Copyright (c) 2019 by
|
||||||
|
# Network Device Education Foundation, Inc. ("NetDEF")
|
||||||
|
#
|
||||||
|
# Permission to use, copy, modify, and/or distribute this software
|
||||||
|
# for any purpose with or without fee is hereby granted, provided
|
||||||
|
# that the above copyright notice and this permission notice appear
|
||||||
|
# in all copies.
|
||||||
|
#
|
||||||
|
# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
|
||||||
|
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
|
||||||
|
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
|
||||||
|
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
|
||||||
|
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
|
||||||
|
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
|
||||||
|
# OF THIS SOFTWARE.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""
|
||||||
|
Tests for the `run_and_expect(_type)()` functions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Save the Current Working Directory to find lib files.
|
||||||
|
CWD = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
sys.path.append(os.path.join(CWD, '../../'))
|
||||||
|
|
||||||
|
# pylint: disable=C0413
|
||||||
|
from lib.topotest import run_and_expect_type
|
||||||
|
|
||||||
|
def test_run_and_expect_type():
|
||||||
|
"Test basic `run_and_expect_type` functionality."
|
||||||
|
|
||||||
|
def return_true():
|
||||||
|
"Test function that returns `True`."
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Test value success.
|
||||||
|
success, value = run_and_expect_type(return_true, bool, count=1, wait=0, avalue=True)
|
||||||
|
assert success is True
|
||||||
|
assert value is True
|
||||||
|
|
||||||
|
# Test value failure.
|
||||||
|
success, value = run_and_expect_type(return_true, bool, count=1, wait=0, avalue=False)
|
||||||
|
assert success is False
|
||||||
|
assert value is True
|
||||||
|
|
||||||
|
# Test type success.
|
||||||
|
success, value = run_and_expect_type(return_true, bool, count=1, wait=0)
|
||||||
|
assert success is True
|
||||||
|
assert value is True
|
||||||
|
|
||||||
|
# Test type failure.
|
||||||
|
success, value = run_and_expect_type(return_true, str, count=1, wait=0)
|
||||||
|
assert success is False
|
||||||
|
assert value is True
|
||||||
|
|
||||||
|
# Test type failure, return correct type.
|
||||||
|
success, value = run_and_expect_type(return_true, str, count=1, wait=0, avalue=True)
|
||||||
|
assert success is False
|
||||||
|
assert value is True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.exit(pytest.main())
|
@ -252,6 +252,54 @@ def run_and_expect(func, what, count=20, wait=3):
|
|||||||
return (False, result)
|
return (False, result)
|
||||||
|
|
||||||
|
|
||||||
|
def run_and_expect_type(func, etype, count=20, wait=3, avalue=None):
|
||||||
|
"""
|
||||||
|
Run `func` and compare the result with `etype`. Do it for `count` times
|
||||||
|
waiting `wait` seconds between tries. By default it tries 20 times with
|
||||||
|
3 seconds delay between tries.
|
||||||
|
|
||||||
|
This function is used when you want to test the return type and,
|
||||||
|
optionally, the return value.
|
||||||
|
|
||||||
|
Returns (True, func-return) on success or
|
||||||
|
(False, func-return) on failure.
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
func_name = "<unknown>"
|
||||||
|
if func.__class__ == functools.partial:
|
||||||
|
func_name = func.func.__name__
|
||||||
|
else:
|
||||||
|
func_name = func.__name__
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"'{}' polling started (interval {} secs, maximum wait {} secs)".format(
|
||||||
|
func_name, wait, int(wait * count)))
|
||||||
|
|
||||||
|
while count > 0:
|
||||||
|
result = func()
|
||||||
|
if not isinstance(result, etype):
|
||||||
|
logger.debug("Expected result type '{}' got '{}' instead".format(etype, type(result)))
|
||||||
|
time.sleep(wait)
|
||||||
|
count -= 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if etype != type(None) and avalue != None and result != avalue:
|
||||||
|
logger.debug("Expected value '{}' got '{}' instead".format(avalue, result))
|
||||||
|
time.sleep(wait)
|
||||||
|
count -= 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
end_time = time.time()
|
||||||
|
logger.info("'{}' succeeded after {:.2f} seconds".format(
|
||||||
|
func_name, end_time - start_time))
|
||||||
|
return (True, result)
|
||||||
|
|
||||||
|
end_time = time.time()
|
||||||
|
logger.error("'{}' failed after {:.2f} seconds".format(
|
||||||
|
func_name, end_time - start_time))
|
||||||
|
return (False, result)
|
||||||
|
|
||||||
|
|
||||||
def int2dpid(dpid):
|
def int2dpid(dpid):
|
||||||
"Converting Integer to DPID"
|
"Converting Integer to DPID"
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user