Merge pull request #14934 from LabNConsulting/chopps/new-munet-0.13.10

tests: import munet 0.13.10
This commit is contained in:
Donald Sharp 2023-12-05 13:24:39 -05:00 committed by GitHub
commit 83018e2178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 481 additions and 138 deletions

View File

@ -513,9 +513,8 @@ class Commander: # pylint: disable=R0904
self.logger.debug('%s("%s") [no precmd]', method, shlex.join(cmd_list)) self.logger.debug('%s("%s") [no precmd]', method, shlex.join(cmd_list))
else: else:
self.logger.debug( self.logger.debug(
'%s: %s %s("%s", pre_cmd: "%s" use_pty: %s kwargs: %.120s)', '%s: %s("%s", pre_cmd: "%s" use_pty: %s kwargs: %.120s)',
self, self,
"XXX" if method == "_spawn" else "",
method, method,
cmd_list, cmd_list,
pre_cmd_list if not skip_pre_cmd else "", pre_cmd_list if not skip_pre_cmd else "",
@ -566,7 +565,7 @@ class Commander: # pylint: disable=R0904
def _spawn(self, cmd, skip_pre_cmd=False, use_pty=False, echo=False, **kwargs): def _spawn(self, cmd, skip_pre_cmd=False, use_pty=False, echo=False, **kwargs):
logging.debug( logging.debug(
'%s: XXX _spawn: cmd "%s" skip_pre_cmd %s use_pty %s echo %s kwargs %s', '%s: _spawn: cmd "%s" skip_pre_cmd %s use_pty %s echo %s kwargs %s',
self, self,
cmd, cmd,
skip_pre_cmd, skip_pre_cmd,
@ -579,7 +578,7 @@ class Commander: # pylint: disable=R0904
) )
self.logger.debug( self.logger.debug(
'%s: XXX %s("%s", use_pty %s echo %s defaults: %s)', '%s: %s("%s", use_pty %s echo %s defaults: %s)',
self, self,
"PopenSpawn" if not use_pty else "pexpect.spawn", "PopenSpawn" if not use_pty else "pexpect.spawn",
actual_cmd, actual_cmd,
@ -865,14 +864,18 @@ class Commander: # pylint: disable=R0904
else: else:
o, e = await p.communicate() o, e = await p.communicate()
self.logger.debug( self.logger.debug(
"%s: cmd_p already exited status: %s", self, proc_error(p, o, e) "%s: [cleanup_proc] proc already exited status: %s",
self,
proc_error(p, o, e),
) )
return None return None
if pid is None: if pid is None:
pid = p.pid pid = p.pid
self.logger.debug("%s: terminate process: %s (pid %s)", self, proc_str(p), pid) self.logger.debug(
"%s: [cleanup_proc] terminate process: %s (pid %s)", self, proc_str(p), pid
)
try: try:
# This will SIGHUP and wait a while then SIGKILL and return immediately # This will SIGHUP and wait a while then SIGKILL and return immediately
await self.cleanup_pid(p.pid, pid) await self.cleanup_pid(p.pid, pid)
@ -885,14 +888,19 @@ class Commander: # pylint: disable=R0904
else: else:
o, e = await asyncio.wait_for(p.communicate(), timeout=wait_secs) o, e = await asyncio.wait_for(p.communicate(), timeout=wait_secs)
self.logger.debug( self.logger.debug(
"%s: cmd_p exited after kill, status: %s", self, proc_error(p, o, e) "%s: [cleanup_proc] exited after kill, status: %s",
self,
proc_error(p, o, e),
) )
except (asyncio.TimeoutError, subprocess.TimeoutExpired): except (asyncio.TimeoutError, subprocess.TimeoutExpired):
self.logger.warning("%s: SIGKILL timeout", self) self.logger.warning("%s: [cleanup_proc] SIGKILL timeout", self)
return p return p
except Exception as error: except Exception as error:
self.logger.warning( self.logger.warning(
"%s: kill unexpected exception: %s", self, error, exc_info=True "%s: [cleanup_proc] kill unexpected exception: %s",
self,
error,
exc_info=True,
) )
return p return p
return None return None
@ -1206,7 +1214,7 @@ class Commander: # pylint: disable=R0904
# XXX need to test ssh in Xterm # XXX need to test ssh in Xterm
sudo_path = get_exec_path_host(["sudo"]) sudo_path = get_exec_path_host(["sudo"])
# This first test case seems same as last but using list instead of string? # This first test case seems same as last but using list instead of string?
if self.is_vm and self.use_ssh: # pylint: disable=E1101 if self.is_vm and self.use_ssh and not ns_only: # pylint: disable=E1101
if isinstance(cmd, str): if isinstance(cmd, str):
cmd = shlex.split(cmd) cmd = shlex.split(cmd)
cmd = ["/usr/bin/env", f"MUNET_NODENAME={self.name}"] + cmd cmd = ["/usr/bin/env", f"MUNET_NODENAME={self.name}"] + cmd
@ -1332,6 +1340,14 @@ class Commander: # pylint: disable=R0904
# Re-adjust the layout # Re-adjust the layout
if "TMUX" in os.environ: if "TMUX" in os.environ:
cmd = [
get_exec_path_host("tmux"),
"select-layout",
"-t",
pane_info if not tmux_target else tmux_target,
"even-horizontal",
]
commander.cmd_status(cmd)
cmd = [ cmd = [
get_exec_path_host("tmux"), get_exec_path_host("tmux"),
"select-layout", "select-layout",
@ -2005,8 +2021,10 @@ class LinuxNamespace(Commander, InterfaceMixin):
stdout=stdout, stdout=stdout,
stderr=stderr, stderr=stderr,
text=True, text=True,
start_new_session=not unet,
shell=False, shell=False,
# start_new_session=not unet
# preexec_fn=os.setsid if not unet else None,
preexec_fn=os.setsid,
) )
# The pid number returned is in the global pid namespace. For unshare_inline # The pid number returned is in the global pid namespace. For unshare_inline
@ -2345,14 +2363,14 @@ class LinuxNamespace(Commander, InterfaceMixin):
and self.pid != our_pid and self.pid != our_pid
): ):
self.logger.debug( self.logger.debug(
"cleanup pid on separate pid %s from proc pid %s", "cleanup separate pid %s from namespace proc pid %s",
self.pid, self.pid,
self.p.pid if self.p else None, self.p.pid if self.p else None,
) )
await self.cleanup_pid(self.pid) await self.cleanup_pid(self.pid)
if self.p is not None: if self.p is not None:
self.logger.debug("cleanup proc pid %s", self.p.pid) self.logger.debug("cleanup namespace proc pid %s", self.p.pid)
await self.async_cleanup_proc(self.p) await self.async_cleanup_proc(self.p)
# return to the previous namespace, need to do this in case anothe munet # return to the previous namespace, need to do this in case anothe munet
@ -2937,7 +2955,7 @@ if True: # pylint: disable=using-constant-test
) )
logging.debug( logging.debug(
'ShellWraper: XXX prompt "%s" will_echo %s child.echo %s', 'ShellWraper: prompt "%s" will_echo %s child.echo %s',
prompt, prompt,
will_echo, will_echo,
spawn.echo, spawn.echo,

View File

@ -325,13 +325,14 @@ def get_shcmd(unet, host, kinds, execfmt, ucmd):
if not execfmt: if not execfmt:
return "" return ""
# Do substitutions for {} in string # Do substitutions for {} and {N} in string
numfmt = len(re.findall(r"{\d*}", execfmt)) numfmt = len(re.findall(r"{\d*}", execfmt))
if numfmt > 1: if numfmt > 1:
ucmd = execfmt.format(*shlex.split(ucmd)) ucmd = execfmt.format(*shlex.split(ucmd))
elif numfmt: elif numfmt:
ucmd = execfmt.format(ucmd) ucmd = execfmt.format(ucmd)
elif len(re.findall(r"{[a-zA-Z_][0-9a-zA-Z_\.]*}", execfmt)): # look for any pair of {}s but do not count escaped {{ or }}
elif len(re.findall(r"{[^}]+}", execfmt.replace("{{", "").replace("}}", ""))):
if execfmt.endswith('"'): if execfmt.endswith('"'):
fstring = "f'''" + execfmt + "'''" fstring = "f'''" + execfmt + "'''"
else: else:

View File

@ -144,7 +144,6 @@ class TestCase:
result_logger: logging.Logger = None, result_logger: logging.Logger = None,
full_summary: bool = False, full_summary: bool = False,
): ):
self.info = TestCaseInfo(tag, name, path) self.info = TestCaseInfo(tag, name, path)
self.__saved_info = [] self.__saved_info = []
self.__short_doc_header = not full_summary self.__short_doc_header = not full_summary
@ -248,7 +247,6 @@ class TestCase:
self.rlog.info("%s. %s", tag, header) self.rlog.info("%s. %s", tag, header)
def __exec_script(self, path, print_header, add_newline): def __exec_script(self, path, print_header, add_newline):
# Below was the original method to avoid the global TestCase # Below was the original method to avoid the global TestCase
# variable; however, we need global functions so we can import them # variable; however, we need global functions so we can import them
# into test scripts. Without imports pylint will complain about undefined # into test scripts. Without imports pylint will complain about undefined
@ -393,12 +391,12 @@ class TestCase:
self, self,
target: str, target: str,
cmd: str, cmd: str,
) -> dict: ) -> Union[list, dict]:
"""Execute a json ``cmd`` and return json result. """Execute a json ``cmd`` and return json result.
Args: Args:
target: the target to execute the command on. target: the target to execute the command on.
cmd: string to execut on the target. cmd: string to execute on the target.
""" """
out = self.targets[target].cmd_nostatus(cmd, warn=False) out = self.targets[target].cmd_nostatus(cmd, warn=False)
self.last = out = out.rstrip() self.last = out = out.rstrip()
@ -420,6 +418,7 @@ class TestCase:
match: str, match: str,
expect_fail: bool, expect_fail: bool,
flags: int, flags: int,
exact_match: bool,
) -> (bool, Union[str, list]): ) -> (bool, Union[str, list]):
"""Execute a ``cmd`` and check result. """Execute a ``cmd`` and check result.
@ -429,6 +428,8 @@ class TestCase:
match: regex to ``re.search()`` for in output. match: regex to ``re.search()`` for in output.
expect_fail: if True then succeed when the regexp doesn't match. expect_fail: if True then succeed when the regexp doesn't match.
flags: python regex flags to modify matching behavior flags: python regex flags to modify matching behavior
exact_match: if True then ``match`` must be exactly matched somewhere
in the output of ``cmd`` using ``str.find()``.
Returns: Returns:
(success, matches): if the match fails then "matches" will be None, (success, matches): if the match fails then "matches" will be None,
@ -436,6 +437,17 @@ class TestCase:
``matches`` otherwise group(0) (i.e., the matching text). ``matches`` otherwise group(0) (i.e., the matching text).
""" """
out = self._command(target, cmd) out = self._command(target, cmd)
if exact_match:
if match not in out:
success = expect_fail
ret = None
else:
success = not expect_fail
ret = match
level = logging.DEBUG if success else logging.WARNING
self.olog.log(level, "exactly matched:%s:", ret)
return success, ret
search = re.search(match, out, flags) search = re.search(match, out, flags)
self.last_m = search self.last_m = search
if search is None: if search is None:
@ -455,17 +467,19 @@ class TestCase:
self, self,
target: str, target: str,
cmd: str, cmd: str,
match: Union[str, dict], match: Union[str, list, dict],
expect_fail: bool, expect_fail: bool,
) -> Union[str, dict]: exact_match: bool,
) -> (bool, Union[list, dict]):
"""Execute a json ``cmd`` and check result. """Execute a json ``cmd`` and check result.
Args: Args:
target: the target to execute the command on. target: the target to execute the command on.
cmd: string to execut on the target. cmd: string to execut on the target.
match: A json ``str`` or object (``dict``) to compare against the json match: A json ``str``, object (``dict``), or array (``list``) to
output from ``cmd``. compare against the json output from ``cmd``.
expect_fail: if True then succeed when the json doesn't match. expect_fail: if True then succeed when the json doesn't match.
exact_match: if True then the json must exactly match.
""" """
js = self._command_json(target, cmd) js = self._command_json(target, cmd)
try: try:
@ -476,7 +490,27 @@ class TestCase:
"JSON load failed. Check match value is in JSON format: %s", error "JSON load failed. Check match value is in JSON format: %s", error
) )
if json_diff := json_cmp(expect, js): if exact_match:
deep_diff = json_cmp(expect, js)
# Convert DeepDiff completely into dicts or lists at all levels
json_diff = json.loads(deep_diff.to_json())
else:
deep_diff = json_cmp(expect, js, ignore_order=True)
# Convert DeepDiff completely into dicts or lists at all levels
json_diff = json.loads(deep_diff.to_json())
# Remove new fields in json object from diff
if json_diff.get("dictionary_item_added") is not None:
del json_diff["dictionary_item_added"]
# Remove new json objects in json array from diff
if (new_items := json_diff.get("iterable_item_added")) is not None:
new_item_paths = list(new_items.keys())
for path in new_item_paths:
if type(new_items[path]) is dict:
del new_items[path]
if len(new_items) == 0:
del json_diff["iterable_item_added"]
if json_diff:
success = expect_fail success = expect_fail
if not success: if not success:
self.logf("JSON DIFF:%s:" % json_diff) self.logf("JSON DIFF:%s:" % json_diff)
@ -489,14 +523,24 @@ class TestCase:
self, self,
target: str, target: str,
cmd: str, cmd: str,
match: Union[str, dict], match: Union[str, list, dict],
is_json: bool, is_json: bool,
timeout: float, timeout: float,
interval: float, interval: float,
expect_fail: bool, expect_fail: bool,
flags: int, flags: int,
) -> Union[str, dict]: exact_match: bool,
"""Execute a command repeatedly waiting for result until timeout.""" ) -> Union[str, list, dict]:
"""Execute a command repeatedly waiting for result until timeout.
``match`` is a regular expression to search for in the output of ``cmd``
when ``is_json`` is False.
When ``is_json`` is True ``match`` must be a json object, a json array,
or a ``str`` which parses into a json object. Likewise, the ``cmd`` output
is parsed into a json object or array and then a comparison is done between
the two json objects or arrays.
"""
startt = time.time() startt = time.time()
endt = startt + timeout endt = startt + timeout
@ -504,10 +548,12 @@ class TestCase:
ret = None ret = None
while not success and time.time() < endt: while not success and time.time() < endt:
if is_json: if is_json:
success, ret = self._match_command_json(target, cmd, match, expect_fail) success, ret = self._match_command_json(
target, cmd, match, expect_fail, exact_match
)
else: else:
success, ret = self._match_command( success, ret = self._match_command(
target, cmd, match, expect_fail, flags target, cmd, match, expect_fail, flags, exact_match
) )
if not success: if not success:
time.sleep(interval) time.sleep(interval)
@ -626,7 +672,7 @@ class TestCase:
) )
return self._command(target, cmd) return self._command(target, cmd)
def step_json(self, target: str, cmd: str) -> dict: def step_json(self, target: str, cmd: str) -> Union[list, dict]:
"""See :py:func:`~munet.mutest.userapi.step_json`. """See :py:func:`~munet.mutest.userapi.step_json`.
:meta private: :meta private:
@ -649,13 +695,14 @@ class TestCase:
desc: str = "", desc: str = "",
expect_fail: bool = False, expect_fail: bool = False,
flags: int = re.DOTALL, flags: int = re.DOTALL,
exact_match: bool = False,
) -> (bool, Union[str, list]): ) -> (bool, Union[str, list]):
"""See :py:func:`~munet.mutest.userapi.match_step`. """See :py:func:`~munet.mutest.userapi.match_step`.
:meta private: :meta private:
""" """
self.logf( self.logf(
"#%s.%s:%s:MATCH_STEP:%s:%s:%s:%s:%s:%s", "#%s.%s:%s:MATCH_STEP:%s:%s:%s:%s:%s:%s:%s",
self.tag, self.tag,
self.steps + 1, self.steps + 1,
self.info.path, self.info.path,
@ -665,8 +712,11 @@ class TestCase:
desc, desc,
expect_fail, expect_fail,
flags, flags,
exact_match,
)
success, ret = self._match_command(
target, cmd, match, expect_fail, flags, exact_match
) )
success, ret = self._match_command(target, cmd, match, expect_fail, flags)
if desc: if desc:
self.__post_result(target, success, desc) self.__post_result(target, success, desc)
return success, ret return success, ret
@ -684,16 +734,17 @@ class TestCase:
self, self,
target: str, target: str,
cmd: str, cmd: str,
match: Union[str, dict], match: Union[str, list, dict],
desc: str = "", desc: str = "",
expect_fail: bool = False, expect_fail: bool = False,
) -> (bool, Union[str, dict]): exact_match: bool = False,
) -> (bool, Union[list, dict]):
"""See :py:func:`~munet.mutest.userapi.match_step_json`. """See :py:func:`~munet.mutest.userapi.match_step_json`.
:meta private: :meta private:
""" """
self.logf( self.logf(
"#%s.%s:%s:MATCH_STEP_JSON:%s:%s:%s:%s:%s", "#%s.%s:%s:MATCH_STEP_JSON:%s:%s:%s:%s:%s:%s",
self.tag, self.tag,
self.steps + 1, self.steps + 1,
self.info.path, self.info.path,
@ -702,8 +753,11 @@ class TestCase:
match, match,
desc, desc,
expect_fail, expect_fail,
exact_match,
)
success, ret = self._match_command_json(
target, cmd, match, expect_fail, exact_match
) )
success, ret = self._match_command_json(target, cmd, match, expect_fail)
if desc: if desc:
self.__post_result(target, success, desc) self.__post_result(target, success, desc)
return success, ret return success, ret
@ -718,9 +772,57 @@ class TestCase:
interval=0.5, interval=0.5,
expect_fail: bool = False, expect_fail: bool = False,
flags: int = re.DOTALL, flags: int = re.DOTALL,
exact_match: bool = False,
) -> (bool, Union[str, list]): ) -> (bool, Union[str, list]):
"""See :py:func:`~munet.mutest.userapi.wait_step`. """See :py:func:`~munet.mutest.userapi.wait_step`.
:meta private:
"""
if interval is None:
interval = min(timeout / 20, 0.25)
self.logf(
"#%s.%s:%s:WAIT_STEP:%s:%s:%s:%s:%s:%s:%s:%s:%s",
self.tag,
self.steps + 1,
self.info.path,
target,
cmd,
match,
timeout,
interval,
desc,
expect_fail,
flags,
exact_match,
)
success, ret = self._wait(
target,
cmd,
match,
False,
timeout,
interval,
expect_fail,
flags,
exact_match,
)
if desc:
self.__post_result(target, success, desc)
return success, ret
def wait_step_json(
self,
target: str,
cmd: str,
match: Union[str, list, dict],
desc: str = "",
timeout=10,
interval=None,
expect_fail: bool = False,
exact_match: bool = False,
) -> (bool, Union[list, dict]):
"""See :py:func:`~munet.mutest.userapi.wait_step_json`.
:meta private: :meta private:
""" """
if interval is None: if interval is None:
@ -737,46 +839,10 @@ class TestCase:
interval, interval,
desc, desc,
expect_fail, expect_fail,
flags, exact_match,
) )
success, ret = self._wait( success, ret = self._wait(
target, cmd, match, False, timeout, interval, expect_fail, flags target, cmd, match, True, timeout, interval, expect_fail, 0, exact_match
)
if desc:
self.__post_result(target, success, desc)
return success, ret
def wait_step_json(
self,
target: str,
cmd: str,
match: Union[str, dict],
desc: str = "",
timeout=10,
interval=None,
expect_fail: bool = False,
) -> (bool, Union[str, dict]):
"""See :py:func:`~munet.mutest.userapi.wait_step_json`.
:meta private:
"""
if interval is None:
interval = min(timeout / 20, 0.25)
self.logf(
"#%s.%s:%s:WAIT_STEP:%s:%s:%s:%s:%s:%s:%s",
self.tag,
self.steps + 1,
self.info.path,
target,
cmd,
match,
timeout,
interval,
desc,
expect_fail,
)
success, ret = self._wait(
target, cmd, match, True, timeout, interval, expect_fail, 0
) )
if desc: if desc:
self.__post_result(target, success, desc) self.__post_result(target, success, desc)
@ -864,15 +930,15 @@ def step(target: str, cmd: str) -> str:
return TestCase.g_tc.step(target, cmd) return TestCase.g_tc.step(target, cmd)
def step_json(target: str, cmd: str) -> dict: def step_json(target: str, cmd: str) -> Union[list, dict]:
"""Execute a json ``cmd`` on a ``target`` and return the json object. """Execute a json ``cmd`` on a ``target`` and return the json object or array.
Args: Args:
target: the target to execute the ``cmd`` on. target: the target to execute the ``cmd`` on.
cmd: string to execute on the target. cmd: string to execute on the target.
Returns: Returns:
Returns the json object after parsing the ``cmd`` output. Returns the json object or array after parsing the ``cmd`` output.
If json parse fails, a warning is logged and an empty ``dict`` is used. If json parse fails, a warning is logged and an empty ``dict`` is used.
""" """
@ -904,6 +970,7 @@ def match_step(
desc: str = "", desc: str = "",
expect_fail: bool = False, expect_fail: bool = False,
flags: int = re.DOTALL, flags: int = re.DOTALL,
exact_match: bool = False,
) -> (bool, Union[str, list]): ) -> (bool, Union[str, list]):
"""Execute a ``cmd`` on a ``target`` check result. """Execute a ``cmd`` on a ``target`` check result.
@ -922,44 +989,53 @@ def match_step(
desc: description of test, if no description then no result is logged. desc: description of test, if no description then no result is logged.
expect_fail: if True then succeed when the regexp doesn't match. expect_fail: if True then succeed when the regexp doesn't match.
flags: python regex flags to modify matching behavior flags: python regex flags to modify matching behavior
exact_match: if True then ``match`` must be exactly matched somewhere
in the output of ``cmd`` using ``str.find()``.
Returns: Returns:
Returns a 2-tuple. The first value is a bool indicating ``success``. Returns a 2-tuple. The first value is a bool indicating ``success``.
The second value will be a list from ``re.Match.groups()`` if non-empty, The second value will be a list from ``re.Match.groups()`` if non-empty,
otherwise ``re.Match.group(0)`` if there was a match otherwise None. otherwise ``re.Match.group(0)`` if there was a match otherwise None.
""" """
return TestCase.g_tc.match_step(target, cmd, match, desc, expect_fail, flags) return TestCase.g_tc.match_step(
target, cmd, match, desc, expect_fail, flags, exact_match
)
def match_step_json( def match_step_json(
target: str, target: str,
cmd: str, cmd: str,
match: Union[str, dict], match: Union[str, list, dict],
desc: str = "", desc: str = "",
expect_fail: bool = False, expect_fail: bool = False,
) -> (bool, Union[str, dict]): exact_match: bool = False,
) -> (bool, Union[list, dict]):
"""Execute a ``cmd`` on a ``target`` check result. """Execute a ``cmd`` on a ``target`` check result.
Execute ``cmd`` on ``target`` and check if the json object in ``match`` Execute ``cmd`` on ``target`` and check if the json object or array in ``match``
matches or doesn't match (according to the ``expect_fail`` value) the matches or doesn't match (according to the ``expect_fail`` value) the
json output from ``cmd``. json output from ``cmd``.
Args: Args:
target: the target to execute the ``cmd`` on. target: the target to execute the ``cmd`` on.
cmd: string to execut on the ``target``. cmd: string to execut on the ``target``.
match: A json ``str`` or object (``dict``) to compare against the json match: A json ``str``, object (``dict``), or array (``list``) to compare
output from ``cmd``. against the json output from ``cmd``.
desc: description of test, if no description then no result is logged. desc: description of test, if no description then no result is logged.
expect_fail: if True then succeed if the a json doesn't match. expect_fail: if True then succeed if the a json doesn't match.
exact_match: if True then the json must exactly match.
Returns: Returns:
Returns a 2-tuple. The first value is a bool indicating ``success``. The Returns a 2-tuple. The first value is a bool indicating ``success``. The
second value is a ``str`` diff if there is a difference found in the json second value is a ``dict`` of the diff if there is a difference found in
compare, otherwise the value is the json object (``dict``) from the ``cmd``. the json compare, otherwise the value is the json object (``dict``) or
array (``list``) from the ``cmd``.
If json parse fails, a warning is logged and an empty ``dict`` is used. If json parse fails, a warning is logged and an empty ``dict`` is used.
""" """
return TestCase.g_tc.match_step_json(target, cmd, match, desc, expect_fail) return TestCase.g_tc.match_step_json(
target, cmd, match, desc, expect_fail, exact_match
)
def wait_step( def wait_step(
@ -971,6 +1047,7 @@ def wait_step(
interval: float = 0.5, interval: float = 0.5,
expect_fail: bool = False, expect_fail: bool = False,
flags: int = re.DOTALL, flags: int = re.DOTALL,
exact_match: bool = False,
) -> (bool, Union[str, list]): ) -> (bool, Union[str, list]):
"""Execute a ``cmd`` on a ``target`` repeatedly, looking for a result. """Execute a ``cmd`` on a ``target`` repeatedly, looking for a result.
@ -991,6 +1068,8 @@ def wait_step(
desc: description of test, if no description then no result is logged. desc: description of test, if no description then no result is logged.
expect_fail: if True then succeed when the regexp *doesn't* match. expect_fail: if True then succeed when the regexp *doesn't* match.
flags: python regex flags to modify matching behavior flags: python regex flags to modify matching behavior
exact_match: if True then ``match`` must be exactly matched somewhere
in the output of ``cmd`` using ``str.find()``.
Returns: Returns:
Returns a 2-tuple. The first value is a bool indicating ``success``. Returns a 2-tuple. The first value is a bool indicating ``success``.
@ -998,37 +1077,31 @@ def wait_step(
otherwise ``re.Match.group(0)`` if there was a match otherwise None. otherwise ``re.Match.group(0)`` if there was a match otherwise None.
""" """
return TestCase.g_tc.wait_step( return TestCase.g_tc.wait_step(
target, cmd, match, desc, timeout, interval, expect_fail, flags target, cmd, match, desc, timeout, interval, expect_fail, flags, exact_match
) )
def wait_step_json( def wait_step_json(
target: str, target: str,
cmd: str, cmd: str,
match: Union[str, dict], match: Union[str, list, dict],
desc: str = "", desc: str = "",
timeout=10, timeout=10,
interval=None, interval=None,
expect_fail: bool = False, expect_fail: bool = False,
) -> (bool, Union[str, dict]): exact_match: bool = False,
) -> (bool, Union[list, dict]):
"""Execute a cmd repeatedly and wait for matching result. """Execute a cmd repeatedly and wait for matching result.
Execute ``cmd`` on ``target``, every ``interval`` seconds until Execute ``cmd`` on ``target``, every ``interval`` seconds until
the output of ``cmd`` matches or doesn't match (according to the the output of ``cmd`` matches or doesn't match (according to the
``expect_fail`` value) ``match``, for up to ``timeout`` seconds. ``expect_fail`` value) ``match``, for up to ``timeout`` seconds.
``match`` is a regular expression to search for in the output of ``cmd`` when
``is_json`` is False.
When ``is_json`` is True ``match`` must be a json object or a ``str`` which
parses into a json object. Likewise, the ``cmd`` output is parsed into a json
object and then a comparison is done between the two json objects.
Args: Args:
target: the target to execute the ``cmd`` on. target: the target to execute the ``cmd`` on.
cmd: string to execut on the ``target``. cmd: string to execut on the ``target``.
match: A json object or str representation of one to compare against json match: A json object, json array, or str representation of json to compare
output from ``cmd``. against json output from ``cmd``.
desc: description of test, if no description then no result is logged. desc: description of test, if no description then no result is logged.
timeout: The number of seconds to repeat the ``cmd`` looking for a match timeout: The number of seconds to repeat the ``cmd`` looking for a match
(or non-match if ``expect_fail`` is True). (or non-match if ``expect_fail`` is True).
@ -1037,17 +1110,18 @@ def wait_step_json(
average the cmd will execute 10 times. The minimum calculated interval average the cmd will execute 10 times. The minimum calculated interval
is .25s, shorter values can be passed explicitly. is .25s, shorter values can be passed explicitly.
expect_fail: if True then succeed if the a json doesn't match. expect_fail: if True then succeed if the a json doesn't match.
exact_match: if True then the json must exactly match.
Returns: Returns:
Returns a 2-tuple. The first value is a bool indicating ``success``. Returns a 2-tuple. The first value is a bool indicating ``success``.
The second value is a ``str`` diff if there is a difference found in the The second value is a ``dict`` of the diff if there is a difference
json compare, otherwise the value is a json object (dict) from the ``cmd`` found in the json compare, otherwise the value is a json object (``dict``)
output. or array (``list``) from the ``cmd`` output.
If json parse fails, a warning is logged and an empty ``dict`` is used. If json parse fails, a warning is logged and an empty ``dict`` is used.
""" """
return TestCase.g_tc.wait_step_json( return TestCase.g_tc.wait_step_json(
target, cmd, match, desc, timeout, interval, expect_fail target, cmd, match, desc, timeout, interval, expect_fail, exact_match
) )

View File

@ -20,6 +20,8 @@ import socket
import subprocess import subprocess
import time import time
from pathlib import Path
from . import cli from . import cli
from .base import BaseMunet from .base import BaseMunet
from .base import Bridge from .base import Bridge
@ -38,6 +40,7 @@ from .config import config_to_dict_with_key
from .config import find_matching_net_config from .config import find_matching_net_config
from .config import find_with_kv from .config import find_with_kv
from .config import merge_kind_config from .config import merge_kind_config
from .watchlog import WatchLog
class L3ContainerNotRunningError(MunetError): class L3ContainerNotRunningError(MunetError):
@ -455,13 +458,14 @@ class NodeMixin:
bps = self.unet.cfgopt.getoption("--gdb-breakpoints", "").split(",") bps = self.unet.cfgopt.getoption("--gdb-breakpoints", "").split(",")
for bp in bps: for bp in bps:
gdbcmd += f" '-ex=b {bp}'" if bp:
gdbcmd += f" '-ex=b {bp}'"
cmds = self.config.get("gdb-run-cmd", []) cmds = self.config.get("gdb-run-cmds", [])
for cmd in cmds: for cmd in cmds:
gdbcmd += f" '-ex={cmd}'" gdbcmd += f" '-ex={cmd}'"
self.run_in_window(gdbcmd) self.run_in_window(gdbcmd, ns_only=True)
elif should_gdb and use_emacs: elif should_gdb and use_emacs:
gdbcmd = gdbcmd.replace("gdb ", "gdb -i=mi ") gdbcmd = gdbcmd.replace("gdb ", "gdb -i=mi ")
ecbin = self.get_exec_path("emacsclient") ecbin = self.get_exec_path("emacsclient")
@ -664,6 +668,7 @@ class L3NodeMixin(NodeMixin):
self.phycount = 0 self.phycount = 0
self.phy_odrivers = {} self.phy_odrivers = {}
self.tapmacs = {} self.tapmacs = {}
self.watched_logs = {}
self.intf_tc_count = 0 self.intf_tc_count = 0
@ -723,6 +728,26 @@ ff02::2\tip6-allrouters
if hasattr(self, "bind_mount"): if hasattr(self, "bind_mount"):
self.bind_mount(hosts_file, "/etc/hosts") self.bind_mount(hosts_file, "/etc/hosts")
def add_watch_log(self, path, watchfor_re=None):
"""Add a WatchLog to this nodes watched logs.
Args:
path: If relative is relative to the nodes ``rundir``
watchfor_re: Regular expression to watch the log for and raise an exception
if found.
Return:
The watching task if request or None otherwise.
"""
path = Path(path)
if not path.is_absolute():
path = self.rundir.joinpath(path)
wl = WatchLog(path)
self.watched_logs[wl.path] = wl
task = wl.raise_if_match_task(watchfor_re) if watchfor_re else None
return task
async def console( async def console(
self, self,
concmd, concmd,
@ -938,8 +963,32 @@ ff02::2\tip6-allrouters
if hname in self.host_intfs: if hname in self.host_intfs:
return return
self.host_intfs[hname] = lname self.host_intfs[hname] = lname
self.unet.rootcmd.cmd_nostatus(f"ip link set {hname} down ")
self.unet.rootcmd.cmd_raises(f"ip link set {hname} netns {self.pid}") # See if this interace is missing and needs to be fixed
rc, o, _ = self.unet.rootcmd.cmd_status("ip -o link show")
m = re.search(rf"\d+:\s+(\S+):.*altname {re.escape(hname)}\W", o)
if m:
# need to rename
dname = m.group(1)
self.logger.info("Fixing misnamed %s to %s", dname, hname)
self.unet.rootcmd.cmd_status(
f"ip link property del dev {dname} altname {hname}"
)
self.unet.rootcmd.cmd_status(f"ip link set {dname} name {hname}")
rc, o, _ = self.unet.rootcmd.cmd_status("ip -o link show")
m = re.search(rf"\d+:\s+{re.escape(hname)}:.*", o)
if m:
self.unet.rootcmd.cmd_nostatus(f"ip link set {hname} down ")
self.unet.rootcmd.cmd_raises(f"ip link set {hname} netns {self.pid}")
# Wait for interface to show up in namespace
for retry in range(0, 10):
rc, o, _ = self.cmd_status(f"ip -o link show {hname}")
if not rc:
if re.search(rf"\d+: {re.escape(hname)}:.*", o):
break
if retry > 0:
await asyncio.sleep(1)
self.cmd_raises(f"ip link set {hname} name {lname}") self.cmd_raises(f"ip link set {hname} name {lname}")
if mtu: if mtu:
self.cmd_raises(f"ip link set {lname} mtu {mtu}") self.cmd_raises(f"ip link set {lname} mtu {mtu}")
@ -949,7 +998,12 @@ ff02::2\tip6-allrouters
lname = self.host_intfs[hname] lname = self.host_intfs[hname]
self.cmd_raises(f"ip link set {lname} down") self.cmd_raises(f"ip link set {lname} down")
self.cmd_raises(f"ip link set {lname} name {hname}") self.cmd_raises(f"ip link set {lname} name {hname}")
self.cmd_raises(f"ip link set {hname} netns 1") self.cmd_status(f"ip link set netns 1 dev {hname}")
# The above is failing sometimes and not sure why
# logging.error(
# "XXX after setns %s",
# self.unet.rootcmd.cmd_nostatus(f"ip link show {hname}"),
# )
del self.host_intfs[hname] del self.host_intfs[hname]
async def add_phy_intf(self, devaddr, lname): async def add_phy_intf(self, devaddr, lname):
@ -1019,12 +1073,13 @@ ff02::2\tip6-allrouters
"Physical PCI device %s already bound to vfio-pci", devaddr "Physical PCI device %s already bound to vfio-pci", devaddr
) )
return return
self.logger.info( self.logger.info(
"Unbinding physical PCI device %s from driver %s", devaddr, driver "Unbinding physical PCI device %s from driver %s", devaddr, driver
) )
self.phy_odrivers[devaddr] = driver self.phy_odrivers[devaddr] = driver
self.unet.rootcmd.cmd_raises( self.unet.rootcmd.cmd_raises(
f"echo {devaddr} > /sys/bus/pci/drivers/{driver}/unbind" f"echo {devaddr} | timeout 10 tee /sys/bus/pci/drivers/{driver}/unbind"
) )
# Add the device vendor and device id to vfio-pci in case it's the first time # Add the device vendor and device id to vfio-pci in case it's the first time
@ -1035,7 +1090,14 @@ ff02::2\tip6-allrouters
f"echo {vendor} {devid} > /sys/bus/pci/drivers/vfio-pci/new_id", warn=False f"echo {vendor} {devid} > /sys/bus/pci/drivers/vfio-pci/new_id", warn=False
) )
if not self.unet.rootcmd.path_exists(f"/sys/bus/pci/driver/vfio-pci/{devaddr}"): for retry in range(0, 10):
if self.unet.rootcmd.path_exists(
f"/sys/bus/pci/drivers/vfio-pci/{devaddr}"
):
break
if retry > 0:
await asyncio.sleep(1)
# Bind to vfio-pci if wasn't added with new_id # Bind to vfio-pci if wasn't added with new_id
self.logger.info("Binding physical PCI device %s to vfio-pci", devaddr) self.logger.info("Binding physical PCI device %s to vfio-pci", devaddr)
ec, _, _ = self.unet.rootcmd.cmd_status( ec, _, _ = self.unet.rootcmd.cmd_status(
@ -1066,7 +1128,7 @@ ff02::2\tip6-allrouters
"Unbinding physical PCI device %s from driver vfio-pci", devaddr "Unbinding physical PCI device %s from driver vfio-pci", devaddr
) )
self.unet.rootcmd.cmd_status( self.unet.rootcmd.cmd_status(
f"echo {devaddr} > /sys/bus/pci/drivers/vfio-pci/unbind" f"echo {devaddr} | timeout 10 tee /sys/bus/pci/drivers/vfio-pci/unbind"
) )
self.logger.info("Binding physical PCI device %s to driver %s", devaddr, driver) self.logger.info("Binding physical PCI device %s to driver %s", devaddr, driver)
@ -1085,13 +1147,13 @@ ff02::2\tip6-allrouters
for hname in list(self.host_intfs): for hname in list(self.host_intfs):
await self.rem_host_intf(hname) await self.rem_host_intf(hname)
# remove any hostintf interfaces
for devaddr in list(self.phy_intfs):
await self.rem_phy_intf(devaddr)
# delete the LinuxNamespace/InterfaceMixin # delete the LinuxNamespace/InterfaceMixin
await super()._async_delete() await super()._async_delete()
# remove any hostintf interfaces, needs to come after normal exits
for devaddr in list(self.phy_intfs):
await self.rem_phy_intf(devaddr)
class L3NamespaceNode(L3NodeMixin, LinuxNamespace): class L3NamespaceNode(L3NodeMixin, LinuxNamespace):
"""A namespace L3 node.""" """A namespace L3 node."""
@ -1123,6 +1185,7 @@ class L3ContainerNode(L3NodeMixin, LinuxNamespace):
assert self.container_image assert self.container_image
self.cmd_p = None self.cmd_p = None
self.cmd_pid = None
self.__base_cmd = [] self.__base_cmd = []
self.__base_cmd_pty = [] self.__base_cmd_pty = []
@ -1393,7 +1456,13 @@ class L3ContainerNode(L3NodeMixin, LinuxNamespace):
start_new_session=True, # keeps main tty signals away from podman start_new_session=True, # keeps main tty signals away from podman
) )
self.logger.debug("%s: async_popen => %s", self, self.cmd_p.pid) # If our process is actually the child of an nsenter fetch its pid.
if self.nsenter_fork:
self.cmd_pid = await self.get_proc_child_pid(self.cmd_p)
self.logger.debug(
"%s: async_popen => %s (%s)", self, self.cmd_p.pid, self.cmd_pid
)
self.pytest_hook_run_cmd(stdout, stderr) self.pytest_hook_run_cmd(stdout, stderr)
@ -1542,6 +1611,7 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
"""Create a Container Node.""" """Create a Container Node."""
self.cont_exec_paths = {} self.cont_exec_paths = {}
self.launch_p = None self.launch_p = None
self.launch_pid = None
self.qemu_config = config["qemu"] self.qemu_config = config["qemu"]
self.extra_mounts = [] self.extra_mounts = []
assert self.qemu_config assert self.qemu_config
@ -1968,8 +2038,9 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
con.cmd_raises(f"ip -6 route add default via {switch.ip6_address}") con.cmd_raises(f"ip -6 route add default via {switch.ip6_address}")
con.cmd_raises("ip link set lo up") con.cmd_raises("ip link set lo up")
if self.unet.cfgopt.getoption("--coverage"): # This is already mounted now
con.cmd_raises("mount -t debugfs none /sys/kernel/debug") # if self.unet.cfgopt.getoption("--coverage"):
# con.cmd_raises("mount -t debugfs none /sys/kernel/debug")
async def gather_coverage_data(self): async def gather_coverage_data(self):
con = self.conrepl con = self.conrepl
@ -2261,25 +2332,29 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
stdout = open(os.path.join(self.rundir, "qemu.out"), "wb") stdout = open(os.path.join(self.rundir, "qemu.out"), "wb")
stderr = open(os.path.join(self.rundir, "qemu.err"), "wb") stderr = open(os.path.join(self.rundir, "qemu.err"), "wb")
self.launch_p = await self.async_popen( self.launch_p = await self.async_popen_nsonly(
args, args,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=stdout, stdout=stdout,
stderr=stderr, stderr=stderr,
pass_fds=pass_fds, pass_fds=pass_fds,
# We don't need this here b/c we are only ever running qemu and that's all # Don't want Keybaord interrupt etc to pass to child.
# we need to kill for cleanup # start_new_session=True,
# XXX reconcile this preexec_fn=os.setsid,
start_new_session=True, # allows us to signal all children to exit
) )
if self.nsenter_fork:
self.launch_pid = await self.get_proc_child_pid(self.launch_p)
self.pytest_hook_run_cmd(stdout, stderr) self.pytest_hook_run_cmd(stdout, stderr)
# We've passed these on, so don't need these open here anymore. # We've passed these on, so don't need these open here anymore.
for fd in pass_fds: for fd in pass_fds:
os.close(fd) os.close(fd)
self.logger.debug("%s: async_popen => %s", self, self.launch_p.pid) self.logger.debug(
"%s: popen => %s (%s)", self, self.launch_p.pid, self.launch_pid
)
confiles = ["_console"] confiles = ["_console"]
if use_cmdcon: if use_cmdcon:
@ -2307,10 +2382,10 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
# the monitor output has super annoying ANSI escapes in it # the monitor output has super annoying ANSI escapes in it
output = self.monrepl.cmd_nostatus("info status") output = self.monrepl.cmd_nostatus("info status")
self.logger.info("VM status: %s", output) self.logger.debug("VM status: %s", output)
output = self.monrepl.cmd_nostatus("info kvm") output = self.monrepl.cmd_nostatus("info kvm")
self.logger.info("KVM status: %s", output) self.logger.debug("KVM status: %s", output)
# #
# Set thread affinity # Set thread affinity
@ -2348,11 +2423,6 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
"%s: node launch (qemu) cmd wait() canceled: %s", future, error "%s: node launch (qemu) cmd wait() canceled: %s", future, error
) )
async def cleanup_qemu(self):
"""Launch qemu."""
if self.launch_p:
await self.async_cleanup_proc(self.launch_p)
async def async_cleanup_cmd(self): async def async_cleanup_cmd(self):
"""Run the configured cleanup commands for this node.""" """Run the configured cleanup commands for this node."""
self.cleanup_called = True self.cleanup_called = True
@ -2372,7 +2442,7 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
# Need to cleanup early b/c it is running on the VM # Need to cleanup early b/c it is running on the VM
if self.cmd_p: if self.cmd_p:
await self.async_cleanup_proc(self.cmd_p) await self.async_cleanup_proc(self.cmd_p, self.cmd_pid)
self.cmd_p = None self.cmd_p = None
try: try:
@ -2388,9 +2458,9 @@ class L3QemuVM(L3NodeMixin, LinuxNamespace):
if not self.launch_p: if not self.launch_p:
self.logger.warning("async_delete: qemu is not running") self.logger.warning("async_delete: qemu is not running")
else: else:
await self.cleanup_qemu() await self.async_cleanup_proc(self.launch_p, self.launch_pid)
except Exception as error: except Exception as error:
self.logger.warning("%s: failued to cleanup qemu process: %s", self, error) self.logger.warning("%s: failed to cleanup qemu process: %s", self, error)
await super()._async_delete() await super()._async_delete()
@ -2814,6 +2884,8 @@ ff02::2\tip6-allrouters
logging.debug("Launching nodes") logging.debug("Launching nodes")
await asyncio.gather(*[x.launch() for x in launch_nodes]) await asyncio.gather(*[x.launch() for x in launch_nodes])
logging.debug("Launched nodes -- Queueing Waits")
# Watch for launched processes to exit # Watch for launched processes to exit
for node in launch_nodes: for node in launch_nodes:
task = asyncio.create_task( task = asyncio.create_task(
@ -2822,17 +2894,23 @@ ff02::2\tip6-allrouters
task.add_done_callback(node.launch_completed) task.add_done_callback(node.launch_completed)
tasks.append(task) tasks.append(task)
logging.debug("Wait complete queued, running cmd")
if run_nodes: if run_nodes:
# would like a info when verbose here. # would like a info when verbose here.
logging.debug("Running `cmd` on nodes") logging.debug("Running `cmd` on nodes")
await asyncio.gather(*[x.run_cmd() for x in run_nodes]) await asyncio.gather(*[x.run_cmd() for x in run_nodes])
logging.debug("Ran cmds -- Queueing Waits")
# Watch for run_cmd processes to exit # Watch for run_cmd processes to exit
for node in run_nodes: for node in run_nodes:
task = asyncio.create_task(node.cmd_p.wait(), name=f"Node-{node.name}-cmd") task = asyncio.create_task(node.cmd_p.wait(), name=f"Node-{node.name}-cmd")
task.add_done_callback(node.cmd_completed) task.add_done_callback(node.cmd_completed)
tasks.append(task) tasks.append(task)
logging.debug("Wait complete queued, waiting for ready")
# Wait for nodes to be ready # Wait for nodes to be ready
if ready_nodes: if ready_nodes:
@ -2853,6 +2931,8 @@ ff02::2\tip6-allrouters
raise asyncio.TimeoutError() raise asyncio.TimeoutError()
logging.debug("All nodes ready") logging.debug("All nodes ready")
logging.debug("All done returning tasks: %s", tasks)
return tasks return tasks
async def _async_delete(self): async def _async_delete(self):

View File

@ -95,7 +95,7 @@ def _push_log_handler(desc, logpath):
logging.debug("conftest: adding %s logging at %s", desc, logpath) logging.debug("conftest: adding %s logging at %s", desc, logpath)
root_logger = logging.getLogger() root_logger = logging.getLogger()
handler = logging.FileHandler(logpath, mode="w") handler = logging.FileHandler(logpath, mode="w")
fmt = logging.Formatter("%(asctime)s %(levelname)5s: %(message)s") fmt = logging.Formatter("%(asctime)s %(levelname)5s: %(name)s: %(message)s")
handler.setFormatter(fmt) handler.setFormatter(fmt)
root_logger.addHandler(handler) root_logger.addHandler(handler)
return handler return handler

View File

@ -196,10 +196,10 @@ def pytest_runtest_makereport(item, call):
if error: if error:
item.skip_more_pause = True item.skip_more_pause = True
# we can't asyncio.run() (which pause does) if we are unhsare_inline # we can't asyncio.run() (which pause does) if we are not unhsare_inline
# at this point, count on an autouse fixture to pause instead in this # at this point, count on an autouse fixture to pause instead in this
# case # case
if not BaseMunet.g_unet or not BaseMunet.g_unet.unshare_inline: if BaseMunet.g_unet and BaseMunet.g_unet.unshare_inline:
pause_test(f"before test '{item.nodeid}'") pause_test(f"before test '{item.nodeid}'")
# check for a result to try and catch setup (or module setup) failure # check for a result to try and catch setup (or module setup) failure

View File

@ -0,0 +1,170 @@
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# August 21 2023, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2023, LabN Consulting, L.L.C.
#
"""A module supporting an object for watching a logfile."""
import asyncio
import logging
import re
from pathlib import Path
class MatchFoundError(Exception):
"""An error raised when a match is not found."""
def __init__(self, watchlog, match):
self.watchlog = watchlog
self.match = match
super().__init__(watchlog, match)
class WatchLog:
"""An object for watching a logfile."""
def __init__(self, path, encoding="utf-8"):
"""Watch a logfile.
Args:
path: that path of the logfile to watch
encoding: the encoding of the logfile
"""
# Immutable
self.path = Path(path)
self.encoding = encoding
# Mutable
self.content = ""
self.last_snap_mark = 0
self.last_user_mark = 0
self.stat = None
if self.path.exists():
self.snapshot()
def _stat_snapshot(self):
ostat = self.stat
if not self.path.exists():
self.stat = None
return ostat is not None
stat = self.path.stat()
self.stat = stat
if ostat is None:
return True
return (
stat.st_mtime_ns != ostat.st_mtime_ns
or stat.st_ctime_ns != ostat.st_ctime_ns
or stat.st_ino != ostat.st_ino
or stat.st_size != ostat.st_size
)
def reset(self):
self.content = ""
self.last_user_mark = 0
self.last_snap_mark = 0
def update_content(self):
ostat = self.stat
osize = ostat.st_size if ostat else 0
oino = ostat.st_ino if ostat else -1
if not self._stat_snapshot():
logging.debug("XXX logfile %s no stat change", self.path)
return ""
nino = self.stat.st_ino
# If the inode changed and we had content previously warn
if oino != -1 and oino != nino and self.content:
logging.warning(
"logfile %s replaced (new inode) resetting content", self.path
)
self.reset()
osize = 0
nsize = self.stat.st_size
if osize > nsize:
logging.warning("logfile %s shrunk resetting content", self.path)
self.reset()
osize = 0
if osize == nsize:
logging.debug(
"XXX watchlog: %s no update, osize == nsize == %s", self.path, osize
)
return ""
# Read non-blocking
with open(self.path, "r", encoding=self.encoding) as f:
if osize:
f.seek(osize)
logging.debug(
"XXX watchlog: %s reading new content from %s to %s",
self.path,
osize,
nsize,
)
newcontent = f.read(nsize - osize)
self.content += newcontent
return newcontent
def raise_if_match_task(self, match):
"""Start an async task that searches for a match.
This doesn't work well with pytest as the task must be awaited for the exception
to propagate.
"""
async def scan_for_match(wl, regex):
while True:
logging.debug("watchlog: %s scan for updating content", wl.path)
wl.update_content()
if m := regex.search(wl.content):
logging.error(
"XXX watchlog: %s regexp FOUND raising exception!", wl.path
)
raise MatchFoundError(wl, m)
await asyncio.sleep(2)
aw = scan_for_match(self, re.compile(match))
return asyncio.create_task(aw)
def from_mark(self, mark=None):
"""Return the file content starting from ``mark``.
If ``mark`` is None then return content since last ``set_mark`` was called.
Args:
mark: the mark in the content to return file content from.
Return:
returns the content between ``mark`` and the end of content.
"""
return self.content[mark:]
def set_mark(self):
"""Set a mark for later use."""
last_mark = self.last_user_mark
self.last_user_mark = len(self.content)
return last_mark
def snapshot(self):
"""Update the file content and return new text.
Returns any new text added since the last snapshot,
also updates the snapshot mark.
Return:
Newly added text.
"""
# Update the content which may reset marks
self.update_content()
last_mark = self.last_snap_mark
self.last_snap_mark = len(self.content)
return self.content[last_mark:]