mirror of
https://github.com/qemu/qemu.git
synced 2025-07-27 20:29:44 +00:00
python/qemu: Add mypy type annotations
These should all be purely annotations with no changes in behavior at all. You need to be in the python folder, but you should be able to confirm that these annotations are correct (or at least self-consistent) by running `mypy --strict qemu`. Signed-off-by: John Snow <jsnow@redhat.com> Reviewed-by: Kevin Wolf <kwolf@redhat.com> Message-id: 20201006235817.3280413-12-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
parent
090744d552
commit
f12a282ff4
@ -17,6 +17,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -30,7 +31,7 @@
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def list_accel(qemu_bin):
|
def list_accel(qemu_bin: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
List accelerators enabled in the QEMU binary.
|
List accelerators enabled in the QEMU binary.
|
||||||
|
|
||||||
@ -50,7 +51,8 @@ def list_accel(qemu_bin):
|
|||||||
return [acc.strip() for acc in out.splitlines()[1:]]
|
return [acc.strip() for acc in out.splitlines()[1:]]
|
||||||
|
|
||||||
|
|
||||||
def kvm_available(target_arch=None, qemu_bin=None):
|
def kvm_available(target_arch: Optional[str] = None,
|
||||||
|
qemu_bin: Optional[str] = None) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if KVM is available using the following heuristic:
|
Check if KVM is available using the following heuristic:
|
||||||
- Kernel module is present in the host;
|
- Kernel module is present in the host;
|
||||||
@ -73,7 +75,7 @@ def kvm_available(target_arch=None, qemu_bin=None):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def tcg_available(qemu_bin):
|
def tcg_available(qemu_bin: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if TCG is available.
|
Check if TCG is available.
|
||||||
|
|
||||||
|
@ -23,11 +23,13 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import signal
|
import signal
|
||||||
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
BinaryIO,
|
||||||
Dict,
|
Dict,
|
||||||
List,
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
@ -37,7 +39,7 @@
|
|||||||
)
|
)
|
||||||
|
|
||||||
from . import console_socket, qmp
|
from . import console_socket, qmp
|
||||||
from .qmp import QMPMessage, SocketAddrT
|
from .qmp import QMPMessage, QMPReturnValue, SocketAddrT
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -67,7 +69,7 @@ class AbnormalShutdown(QEMUMachineError):
|
|||||||
|
|
||||||
class QEMUMachine:
|
class QEMUMachine:
|
||||||
"""
|
"""
|
||||||
A QEMU VM
|
A QEMU VM.
|
||||||
|
|
||||||
Use this object as a context manager to ensure
|
Use this object as a context manager to ensure
|
||||||
the QEMU process terminates::
|
the QEMU process terminates::
|
||||||
@ -84,8 +86,10 @@ def __init__(self,
|
|||||||
name: Optional[str] = None,
|
name: Optional[str] = None,
|
||||||
test_dir: str = "/var/tmp",
|
test_dir: str = "/var/tmp",
|
||||||
monitor_address: Optional[SocketAddrT] = None,
|
monitor_address: Optional[SocketAddrT] = None,
|
||||||
socket_scm_helper=None, sock_dir=None,
|
socket_scm_helper: Optional[str] = None,
|
||||||
drain_console=False, console_log=None):
|
sock_dir: Optional[str] = None,
|
||||||
|
drain_console: bool = False,
|
||||||
|
console_log: Optional[str] = None):
|
||||||
'''
|
'''
|
||||||
Initialize a QEMUMachine
|
Initialize a QEMUMachine
|
||||||
|
|
||||||
@ -129,28 +133,28 @@ def __init__(self,
|
|||||||
self._drain_console = drain_console
|
self._drain_console = drain_console
|
||||||
|
|
||||||
# Runstate
|
# Runstate
|
||||||
self._qemu_log_path = None
|
self._qemu_log_path: Optional[str] = None
|
||||||
self._qemu_log_file = None
|
self._qemu_log_file: Optional[BinaryIO] = None
|
||||||
self._popen: Optional['subprocess.Popen[bytes]'] = None
|
self._popen: Optional['subprocess.Popen[bytes]'] = None
|
||||||
self._events = []
|
self._events: List[QMPMessage] = []
|
||||||
self._iolog = None
|
self._iolog: Optional[str] = None
|
||||||
self._qmp_set = True # Enable QMP monitor by default.
|
self._qmp_set = True # Enable QMP monitor by default.
|
||||||
self._qmp_connection: Optional[qmp.QEMUMonitorProtocol] = None
|
self._qmp_connection: Optional[qmp.QEMUMonitorProtocol] = None
|
||||||
self._qemu_full_args: Tuple[str, ...] = ()
|
self._qemu_full_args: Tuple[str, ...] = ()
|
||||||
self._temp_dir = None
|
self._temp_dir: Optional[str] = None
|
||||||
self._launched = False
|
self._launched = False
|
||||||
self._machine = None
|
self._machine: Optional[str] = None
|
||||||
self._console_index = 0
|
self._console_index = 0
|
||||||
self._console_set = False
|
self._console_set = False
|
||||||
self._console_device_type = None
|
self._console_device_type: Optional[str] = None
|
||||||
self._console_address = os.path.join(
|
self._console_address = os.path.join(
|
||||||
self._sock_dir, f"{self._name}-console.sock"
|
self._sock_dir, f"{self._name}-console.sock"
|
||||||
)
|
)
|
||||||
self._console_socket = None
|
self._console_socket: Optional[socket.socket] = None
|
||||||
self._remove_files = []
|
self._remove_files: List[str] = []
|
||||||
self._user_killed = False
|
self._user_killed = False
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self) -> 'QEMUMachine':
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self,
|
def __exit__(self,
|
||||||
@ -159,14 +163,15 @@ def __exit__(self,
|
|||||||
exc_tb: Optional[TracebackType]) -> None:
|
exc_tb: Optional[TracebackType]) -> None:
|
||||||
self.shutdown()
|
self.shutdown()
|
||||||
|
|
||||||
def add_monitor_null(self):
|
def add_monitor_null(self) -> None:
|
||||||
"""
|
"""
|
||||||
This can be used to add an unused monitor instance.
|
This can be used to add an unused monitor instance.
|
||||||
"""
|
"""
|
||||||
self._args.append('-monitor')
|
self._args.append('-monitor')
|
||||||
self._args.append('null')
|
self._args.append('null')
|
||||||
|
|
||||||
def add_fd(self, fd, fdset, opaque, opts=''):
|
def add_fd(self, fd: int, fdset: int,
|
||||||
|
opaque: str, opts: str = '') -> 'QEMUMachine':
|
||||||
"""
|
"""
|
||||||
Pass a file descriptor to the VM
|
Pass a file descriptor to the VM
|
||||||
"""
|
"""
|
||||||
@ -185,7 +190,8 @@ def add_fd(self, fd, fdset, opaque, opts=''):
|
|||||||
self._args.append(','.join(options))
|
self._args.append(','.join(options))
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def send_fd_scm(self, fd=None, file_path=None):
|
def send_fd_scm(self, fd: Optional[int] = None,
|
||||||
|
file_path: Optional[str] = None) -> int:
|
||||||
"""
|
"""
|
||||||
Send an fd or file_path to socket_scm_helper.
|
Send an fd or file_path to socket_scm_helper.
|
||||||
|
|
||||||
@ -229,7 +235,7 @@ def send_fd_scm(self, fd=None, file_path=None):
|
|||||||
return proc.returncode
|
return proc.returncode
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _remove_if_exists(path):
|
def _remove_if_exists(path: str) -> None:
|
||||||
"""
|
"""
|
||||||
Remove file object at path if it exists
|
Remove file object at path if it exists
|
||||||
"""
|
"""
|
||||||
@ -240,7 +246,7 @@ def _remove_if_exists(path):
|
|||||||
return
|
return
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def is_running(self):
|
def is_running(self) -> bool:
|
||||||
"""Returns true if the VM is running."""
|
"""Returns true if the VM is running."""
|
||||||
return self._popen is not None and self._popen.poll() is None
|
return self._popen is not None and self._popen.poll() is None
|
||||||
|
|
||||||
@ -250,19 +256,19 @@ def _subp(self) -> 'subprocess.Popen[bytes]':
|
|||||||
raise QEMUMachineError('Subprocess pipe not present')
|
raise QEMUMachineError('Subprocess pipe not present')
|
||||||
return self._popen
|
return self._popen
|
||||||
|
|
||||||
def exitcode(self):
|
def exitcode(self) -> Optional[int]:
|
||||||
"""Returns the exit code if possible, or None."""
|
"""Returns the exit code if possible, or None."""
|
||||||
if self._popen is None:
|
if self._popen is None:
|
||||||
return None
|
return None
|
||||||
return self._popen.poll()
|
return self._popen.poll()
|
||||||
|
|
||||||
def get_pid(self):
|
def get_pid(self) -> Optional[int]:
|
||||||
"""Returns the PID of the running process, or None."""
|
"""Returns the PID of the running process, or None."""
|
||||||
if not self.is_running():
|
if not self.is_running():
|
||||||
return None
|
return None
|
||||||
return self._subp.pid
|
return self._subp.pid
|
||||||
|
|
||||||
def _load_io_log(self):
|
def _load_io_log(self) -> None:
|
||||||
if self._qemu_log_path is not None:
|
if self._qemu_log_path is not None:
|
||||||
with open(self._qemu_log_path, "r") as iolog:
|
with open(self._qemu_log_path, "r") as iolog:
|
||||||
self._iolog = iolog.read()
|
self._iolog = iolog.read()
|
||||||
@ -296,7 +302,7 @@ def _base_args(self) -> List[str]:
|
|||||||
args.extend(['-device', device])
|
args.extend(['-device', device])
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def _pre_launch(self):
|
def _pre_launch(self) -> None:
|
||||||
self._temp_dir = tempfile.mkdtemp(dir=self._test_dir)
|
self._temp_dir = tempfile.mkdtemp(dir=self._test_dir)
|
||||||
self._qemu_log_path = os.path.join(self._temp_dir, self._name + ".log")
|
self._qemu_log_path = os.path.join(self._temp_dir, self._name + ".log")
|
||||||
self._qemu_log_file = open(self._qemu_log_path, 'wb')
|
self._qemu_log_file = open(self._qemu_log_path, 'wb')
|
||||||
@ -314,11 +320,11 @@ def _pre_launch(self):
|
|||||||
nickname=self._name
|
nickname=self._name
|
||||||
)
|
)
|
||||||
|
|
||||||
def _post_launch(self):
|
def _post_launch(self) -> None:
|
||||||
if self._qmp_connection:
|
if self._qmp_connection:
|
||||||
self._qmp.accept()
|
self._qmp.accept()
|
||||||
|
|
||||||
def _post_shutdown(self):
|
def _post_shutdown(self) -> None:
|
||||||
"""
|
"""
|
||||||
Called to cleanup the VM instance after the process has exited.
|
Called to cleanup the VM instance after the process has exited.
|
||||||
May also be called after a failed launch.
|
May also be called after a failed launch.
|
||||||
@ -358,7 +364,7 @@ def _post_shutdown(self):
|
|||||||
self._user_killed = False
|
self._user_killed = False
|
||||||
self._launched = False
|
self._launched = False
|
||||||
|
|
||||||
def launch(self):
|
def launch(self) -> None:
|
||||||
"""
|
"""
|
||||||
Launch the VM and make sure we cleanup and expose the
|
Launch the VM and make sure we cleanup and expose the
|
||||||
command line/output in case of exception
|
command line/output in case of exception
|
||||||
@ -382,7 +388,7 @@ def launch(self):
|
|||||||
LOG.debug('Output: %r', self._iolog)
|
LOG.debug('Output: %r', self._iolog)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _launch(self):
|
def _launch(self) -> None:
|
||||||
"""
|
"""
|
||||||
Launch the VM and establish a QMP connection
|
Launch the VM and establish a QMP connection
|
||||||
"""
|
"""
|
||||||
@ -501,7 +507,7 @@ def shutdown(self, has_quit: bool = False,
|
|||||||
finally:
|
finally:
|
||||||
self._post_shutdown()
|
self._post_shutdown()
|
||||||
|
|
||||||
def kill(self):
|
def kill(self) -> None:
|
||||||
"""
|
"""
|
||||||
Terminate the VM forcefully, wait for it to exit, and perform cleanup.
|
Terminate the VM forcefully, wait for it to exit, and perform cleanup.
|
||||||
"""
|
"""
|
||||||
@ -516,7 +522,7 @@ def wait(self, timeout: Optional[int] = 30) -> None:
|
|||||||
"""
|
"""
|
||||||
self.shutdown(has_quit=True, timeout=timeout)
|
self.shutdown(has_quit=True, timeout=timeout)
|
||||||
|
|
||||||
def set_qmp_monitor(self, enabled=True):
|
def set_qmp_monitor(self, enabled: bool = True) -> None:
|
||||||
"""
|
"""
|
||||||
Set the QMP monitor.
|
Set the QMP monitor.
|
||||||
|
|
||||||
@ -552,7 +558,9 @@ def qmp(self, cmd: str,
|
|||||||
qmp_args = self._qmp_args(conv_keys, **args)
|
qmp_args = self._qmp_args(conv_keys, **args)
|
||||||
return self._qmp.cmd(cmd, args=qmp_args)
|
return self._qmp.cmd(cmd, args=qmp_args)
|
||||||
|
|
||||||
def command(self, cmd, conv_keys=True, **args):
|
def command(self, cmd: str,
|
||||||
|
conv_keys: bool = True,
|
||||||
|
**args: Any) -> QMPReturnValue:
|
||||||
"""
|
"""
|
||||||
Invoke a QMP command.
|
Invoke a QMP command.
|
||||||
On success return the response dict.
|
On success return the response dict.
|
||||||
@ -561,7 +569,7 @@ def command(self, cmd, conv_keys=True, **args):
|
|||||||
qmp_args = self._qmp_args(conv_keys, **args)
|
qmp_args = self._qmp_args(conv_keys, **args)
|
||||||
return self._qmp.command(cmd, **qmp_args)
|
return self._qmp.command(cmd, **qmp_args)
|
||||||
|
|
||||||
def get_qmp_event(self, wait=False):
|
def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
Poll for one queued QMP events and return it
|
Poll for one queued QMP events and return it
|
||||||
"""
|
"""
|
||||||
@ -569,7 +577,7 @@ def get_qmp_event(self, wait=False):
|
|||||||
return self._events.pop(0)
|
return self._events.pop(0)
|
||||||
return self._qmp.pull_event(wait=wait)
|
return self._qmp.pull_event(wait=wait)
|
||||||
|
|
||||||
def get_qmp_events(self, wait=False):
|
def get_qmp_events(self, wait: bool = False) -> List[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
Poll for queued QMP events and return a list of dicts
|
Poll for queued QMP events and return a list of dicts
|
||||||
"""
|
"""
|
||||||
@ -580,7 +588,7 @@ def get_qmp_events(self, wait=False):
|
|||||||
return events
|
return events
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def event_match(event, match=None):
|
def event_match(event: Any, match: Optional[Any]) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if an event matches optional match criteria.
|
Check if an event matches optional match criteria.
|
||||||
|
|
||||||
@ -610,9 +618,11 @@ def event_match(event, match=None):
|
|||||||
return True
|
return True
|
||||||
except TypeError:
|
except TypeError:
|
||||||
# either match or event wasn't iterable (not a dict)
|
# either match or event wasn't iterable (not a dict)
|
||||||
return match == event
|
return bool(match == event)
|
||||||
|
|
||||||
def event_wait(self, name, timeout=60.0, match=None):
|
def event_wait(self, name: str,
|
||||||
|
timeout: float = 60.0,
|
||||||
|
match: Optional[QMPMessage] = None) -> Optional[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
event_wait waits for and returns a named event from QMP with a timeout.
|
event_wait waits for and returns a named event from QMP with a timeout.
|
||||||
|
|
||||||
@ -622,7 +632,9 @@ def event_wait(self, name, timeout=60.0, match=None):
|
|||||||
"""
|
"""
|
||||||
return self.events_wait([(name, match)], timeout)
|
return self.events_wait([(name, match)], timeout)
|
||||||
|
|
||||||
def events_wait(self, events, timeout=60.0):
|
def events_wait(self,
|
||||||
|
events: Sequence[Tuple[str, Any]],
|
||||||
|
timeout: float = 60.0) -> Optional[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
events_wait waits for and returns a single named event from QMP.
|
events_wait waits for and returns a single named event from QMP.
|
||||||
In the case of multiple qualifying events, this function returns the
|
In the case of multiple qualifying events, this function returns the
|
||||||
@ -639,7 +651,7 @@ def events_wait(self, events, timeout=60.0):
|
|||||||
:return: A QMP event matching the filter criteria.
|
:return: A QMP event matching the filter criteria.
|
||||||
If timeout was 0 and no event matched, None.
|
If timeout was 0 and no event matched, None.
|
||||||
"""
|
"""
|
||||||
def _match(event):
|
def _match(event: QMPMessage) -> bool:
|
||||||
for name, match in events:
|
for name, match in events:
|
||||||
if event['event'] == name and self.event_match(event, match):
|
if event['event'] == name and self.event_match(event, match):
|
||||||
return True
|
return True
|
||||||
@ -666,20 +678,20 @@ def _match(event):
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_log(self):
|
def get_log(self) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
After self.shutdown or failed qemu execution, this returns the output
|
After self.shutdown or failed qemu execution, this returns the output
|
||||||
of the qemu process.
|
of the qemu process.
|
||||||
"""
|
"""
|
||||||
return self._iolog
|
return self._iolog
|
||||||
|
|
||||||
def add_args(self, *args):
|
def add_args(self, *args: str) -> None:
|
||||||
"""
|
"""
|
||||||
Adds to the list of extra arguments to be given to the QEMU binary
|
Adds to the list of extra arguments to be given to the QEMU binary
|
||||||
"""
|
"""
|
||||||
self._args.extend(args)
|
self._args.extend(args)
|
||||||
|
|
||||||
def set_machine(self, machine_type):
|
def set_machine(self, machine_type: str) -> None:
|
||||||
"""
|
"""
|
||||||
Sets the machine type
|
Sets the machine type
|
||||||
|
|
||||||
@ -688,7 +700,9 @@ def set_machine(self, machine_type):
|
|||||||
"""
|
"""
|
||||||
self._machine = machine_type
|
self._machine = machine_type
|
||||||
|
|
||||||
def set_console(self, device_type=None, console_index=0):
|
def set_console(self,
|
||||||
|
device_type: Optional[str] = None,
|
||||||
|
console_index: int = 0) -> None:
|
||||||
"""
|
"""
|
||||||
Sets the device type for a console device
|
Sets the device type for a console device
|
||||||
|
|
||||||
@ -719,7 +733,7 @@ def set_console(self, device_type=None, console_index=0):
|
|||||||
self._console_index = console_index
|
self._console_index = console_index
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def console_socket(self):
|
def console_socket(self) -> socket.socket:
|
||||||
"""
|
"""
|
||||||
Returns a socket connected to the console
|
Returns a socket connected to the console
|
||||||
"""
|
"""
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Dict,
|
Dict,
|
||||||
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
TextIO,
|
TextIO,
|
||||||
Tuple,
|
Tuple,
|
||||||
@ -90,7 +91,9 @@ class QEMUMonitorProtocol:
|
|||||||
#: Logger object for debugging messages
|
#: Logger object for debugging messages
|
||||||
logger = logging.getLogger('QMP')
|
logger = logging.getLogger('QMP')
|
||||||
|
|
||||||
def __init__(self, address, server=False, nickname=None):
|
def __init__(self, address: SocketAddrT,
|
||||||
|
server: bool = False,
|
||||||
|
nickname: Optional[str] = None):
|
||||||
"""
|
"""
|
||||||
Create a QEMUMonitorProtocol class.
|
Create a QEMUMonitorProtocol class.
|
||||||
|
|
||||||
@ -102,7 +105,7 @@ def __init__(self, address, server=False, nickname=None):
|
|||||||
@note No connection is established, this is done by the connect() or
|
@note No connection is established, this is done by the connect() or
|
||||||
accept() methods
|
accept() methods
|
||||||
"""
|
"""
|
||||||
self.__events = []
|
self.__events: List[QMPMessage] = []
|
||||||
self.__address = address
|
self.__address = address
|
||||||
self.__sock = self.__get_sock()
|
self.__sock = self.__get_sock()
|
||||||
self.__sockfile: Optional[TextIO] = None
|
self.__sockfile: Optional[TextIO] = None
|
||||||
@ -114,14 +117,14 @@ def __init__(self, address, server=False, nickname=None):
|
|||||||
self.__sock.bind(self.__address)
|
self.__sock.bind(self.__address)
|
||||||
self.__sock.listen(1)
|
self.__sock.listen(1)
|
||||||
|
|
||||||
def __get_sock(self):
|
def __get_sock(self) -> socket.socket:
|
||||||
if isinstance(self.__address, tuple):
|
if isinstance(self.__address, tuple):
|
||||||
family = socket.AF_INET
|
family = socket.AF_INET
|
||||||
else:
|
else:
|
||||||
family = socket.AF_UNIX
|
family = socket.AF_UNIX
|
||||||
return socket.socket(family, socket.SOCK_STREAM)
|
return socket.socket(family, socket.SOCK_STREAM)
|
||||||
|
|
||||||
def __negotiate_capabilities(self):
|
def __negotiate_capabilities(self) -> QMPMessage:
|
||||||
greeting = self.__json_read()
|
greeting = self.__json_read()
|
||||||
if greeting is None or "QMP" not in greeting:
|
if greeting is None or "QMP" not in greeting:
|
||||||
raise QMPConnectError
|
raise QMPConnectError
|
||||||
@ -131,7 +134,7 @@ def __negotiate_capabilities(self):
|
|||||||
return greeting
|
return greeting
|
||||||
raise QMPCapabilitiesError
|
raise QMPCapabilitiesError
|
||||||
|
|
||||||
def __json_read(self, only_event=False):
|
def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
|
||||||
assert self.__sockfile is not None
|
assert self.__sockfile is not None
|
||||||
while True:
|
while True:
|
||||||
data = self.__sockfile.readline()
|
data = self.__sockfile.readline()
|
||||||
@ -148,7 +151,7 @@ def __json_read(self, only_event=False):
|
|||||||
continue
|
continue
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
def __get_events(self, wait=False):
|
def __get_events(self, wait: Union[bool, float] = False) -> None:
|
||||||
"""
|
"""
|
||||||
Check for new events in the stream and cache them in __events.
|
Check for new events in the stream and cache them in __events.
|
||||||
|
|
||||||
@ -186,7 +189,7 @@ def __get_events(self, wait=False):
|
|||||||
raise QMPConnectError("Error while reading from socket")
|
raise QMPConnectError("Error while reading from socket")
|
||||||
self.__sock.settimeout(None)
|
self.__sock.settimeout(None)
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self) -> 'QEMUMonitorProtocol':
|
||||||
# Implement context manager enter function.
|
# Implement context manager enter function.
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@ -199,7 +202,7 @@ def __exit__(self,
|
|||||||
# Implement context manager exit function.
|
# Implement context manager exit function.
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def connect(self, negotiate=True):
|
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
Connect to the QMP Monitor and perform capabilities negotiation.
|
Connect to the QMP Monitor and perform capabilities negotiation.
|
||||||
|
|
||||||
@ -214,7 +217,7 @@ def connect(self, negotiate=True):
|
|||||||
return self.__negotiate_capabilities()
|
return self.__negotiate_capabilities()
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def accept(self, timeout=15.0):
|
def accept(self, timeout: float = 15.0) -> QMPMessage:
|
||||||
"""
|
"""
|
||||||
Await connection from QMP Monitor and perform capabilities negotiation.
|
Await connection from QMP Monitor and perform capabilities negotiation.
|
||||||
|
|
||||||
@ -250,7 +253,9 @@ def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
|
|||||||
self.logger.debug("<<< %s", resp)
|
self.logger.debug("<<< %s", resp)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
def cmd(self, name, args=None, cmd_id=None):
|
def cmd(self, name: str,
|
||||||
|
args: Optional[Dict[str, Any]] = None,
|
||||||
|
cmd_id: Optional[Any] = None) -> QMPMessage:
|
||||||
"""
|
"""
|
||||||
Build a QMP command and send it to the QMP Monitor.
|
Build a QMP command and send it to the QMP Monitor.
|
||||||
|
|
||||||
@ -258,14 +263,14 @@ def cmd(self, name, args=None, cmd_id=None):
|
|||||||
@param args: command arguments (dict)
|
@param args: command arguments (dict)
|
||||||
@param cmd_id: command id (dict, list, string or int)
|
@param cmd_id: command id (dict, list, string or int)
|
||||||
"""
|
"""
|
||||||
qmp_cmd = {'execute': name}
|
qmp_cmd: QMPMessage = {'execute': name}
|
||||||
if args:
|
if args:
|
||||||
qmp_cmd['arguments'] = args
|
qmp_cmd['arguments'] = args
|
||||||
if cmd_id:
|
if cmd_id:
|
||||||
qmp_cmd['id'] = cmd_id
|
qmp_cmd['id'] = cmd_id
|
||||||
return self.cmd_obj(qmp_cmd)
|
return self.cmd_obj(qmp_cmd)
|
||||||
|
|
||||||
def command(self, cmd, **kwds):
|
def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
|
||||||
"""
|
"""
|
||||||
Build and send a QMP command to the monitor, report errors if any
|
Build and send a QMP command to the monitor, report errors if any
|
||||||
"""
|
"""
|
||||||
@ -278,7 +283,8 @@ def command(self, cmd, **kwds):
|
|||||||
)
|
)
|
||||||
return cast(QMPReturnValue, ret['return'])
|
return cast(QMPReturnValue, ret['return'])
|
||||||
|
|
||||||
def pull_event(self, wait=False):
|
def pull_event(self,
|
||||||
|
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
Pulls a single event.
|
Pulls a single event.
|
||||||
|
|
||||||
@ -298,7 +304,7 @@ def pull_event(self, wait=False):
|
|||||||
return self.__events.pop(0)
|
return self.__events.pop(0)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_events(self, wait=False):
|
def get_events(self, wait: bool = False) -> List[QMPMessage]:
|
||||||
"""
|
"""
|
||||||
Get a list of available QMP events.
|
Get a list of available QMP events.
|
||||||
|
|
||||||
@ -315,13 +321,13 @@ def get_events(self, wait=False):
|
|||||||
self.__get_events(wait)
|
self.__get_events(wait)
|
||||||
return self.__events
|
return self.__events
|
||||||
|
|
||||||
def clear_events(self):
|
def clear_events(self) -> None:
|
||||||
"""
|
"""
|
||||||
Clear current list of pending events.
|
Clear current list of pending events.
|
||||||
"""
|
"""
|
||||||
self.__events = []
|
self.__events = []
|
||||||
|
|
||||||
def close(self):
|
def close(self) -> None:
|
||||||
"""
|
"""
|
||||||
Close the socket and socket file.
|
Close the socket and socket file.
|
||||||
"""
|
"""
|
||||||
@ -330,7 +336,7 @@ def close(self):
|
|||||||
if self.__sockfile:
|
if self.__sockfile:
|
||||||
self.__sockfile.close()
|
self.__sockfile.close()
|
||||||
|
|
||||||
def settimeout(self, timeout):
|
def settimeout(self, timeout: float) -> None:
|
||||||
"""
|
"""
|
||||||
Set the socket timeout.
|
Set the socket timeout.
|
||||||
|
|
||||||
@ -339,7 +345,7 @@ def settimeout(self, timeout):
|
|||||||
"""
|
"""
|
||||||
self.__sock.settimeout(timeout)
|
self.__sock.settimeout(timeout)
|
||||||
|
|
||||||
def get_sock_fd(self):
|
def get_sock_fd(self) -> int:
|
||||||
"""
|
"""
|
||||||
Get the socket file descriptor.
|
Get the socket file descriptor.
|
||||||
|
|
||||||
@ -347,7 +353,7 @@ def get_sock_fd(self):
|
|||||||
"""
|
"""
|
||||||
return self.__sock.fileno()
|
return self.__sock.fileno()
|
||||||
|
|
||||||
def is_scm_available(self):
|
def is_scm_available(self) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if the socket allows for SCM_RIGHTS.
|
Check if the socket allows for SCM_RIGHTS.
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
)
|
)
|
||||||
|
|
||||||
from .machine import QEMUMachine
|
from .machine import QEMUMachine
|
||||||
|
from .qmp import SocketAddrT
|
||||||
|
|
||||||
|
|
||||||
class QEMUQtestProtocol:
|
class QEMUQtestProtocol:
|
||||||
@ -43,7 +44,8 @@ class QEMUQtestProtocol:
|
|||||||
No conection is estabalished by __init__(), this is done
|
No conection is estabalished by __init__(), this is done
|
||||||
by the connect() or accept() methods.
|
by the connect() or accept() methods.
|
||||||
"""
|
"""
|
||||||
def __init__(self, address, server=False):
|
def __init__(self, address: SocketAddrT,
|
||||||
|
server: bool = False):
|
||||||
self._address = address
|
self._address = address
|
||||||
self._sock = self._get_sock()
|
self._sock = self._get_sock()
|
||||||
self._sockfile: Optional[TextIO] = None
|
self._sockfile: Optional[TextIO] = None
|
||||||
@ -51,14 +53,14 @@ def __init__(self, address, server=False):
|
|||||||
self._sock.bind(self._address)
|
self._sock.bind(self._address)
|
||||||
self._sock.listen(1)
|
self._sock.listen(1)
|
||||||
|
|
||||||
def _get_sock(self):
|
def _get_sock(self) -> socket.socket:
|
||||||
if isinstance(self._address, tuple):
|
if isinstance(self._address, tuple):
|
||||||
family = socket.AF_INET
|
family = socket.AF_INET
|
||||||
else:
|
else:
|
||||||
family = socket.AF_UNIX
|
family = socket.AF_UNIX
|
||||||
return socket.socket(family, socket.SOCK_STREAM)
|
return socket.socket(family, socket.SOCK_STREAM)
|
||||||
|
|
||||||
def connect(self):
|
def connect(self) -> None:
|
||||||
"""
|
"""
|
||||||
Connect to the qtest socket.
|
Connect to the qtest socket.
|
||||||
|
|
||||||
@ -67,7 +69,7 @@ def connect(self):
|
|||||||
self._sock.connect(self._address)
|
self._sock.connect(self._address)
|
||||||
self._sockfile = self._sock.makefile(mode='r')
|
self._sockfile = self._sock.makefile(mode='r')
|
||||||
|
|
||||||
def accept(self):
|
def accept(self) -> None:
|
||||||
"""
|
"""
|
||||||
Await connection from QEMU.
|
Await connection from QEMU.
|
||||||
|
|
||||||
@ -76,7 +78,7 @@ def accept(self):
|
|||||||
self._sock, _ = self._sock.accept()
|
self._sock, _ = self._sock.accept()
|
||||||
self._sockfile = self._sock.makefile(mode='r')
|
self._sockfile = self._sock.makefile(mode='r')
|
||||||
|
|
||||||
def cmd(self, qtest_cmd):
|
def cmd(self, qtest_cmd: str) -> str:
|
||||||
"""
|
"""
|
||||||
Send a qtest command on the wire.
|
Send a qtest command on the wire.
|
||||||
|
|
||||||
@ -87,14 +89,16 @@ def cmd(self, qtest_cmd):
|
|||||||
resp = self._sockfile.readline()
|
resp = self._sockfile.readline()
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
def close(self):
|
def close(self) -> None:
|
||||||
"""Close this socket."""
|
"""
|
||||||
|
Close this socket.
|
||||||
|
"""
|
||||||
self._sock.close()
|
self._sock.close()
|
||||||
if self._sockfile:
|
if self._sockfile:
|
||||||
self._sockfile.close()
|
self._sockfile.close()
|
||||||
self._sockfile = None
|
self._sockfile = None
|
||||||
|
|
||||||
def settimeout(self, timeout):
|
def settimeout(self, timeout: Optional[float]) -> None:
|
||||||
"""Set a timeout, in seconds."""
|
"""Set a timeout, in seconds."""
|
||||||
self._sock.settimeout(timeout)
|
self._sock.settimeout(timeout)
|
||||||
|
|
||||||
@ -118,7 +122,7 @@ def __init__(self,
|
|||||||
super().__init__(binary, args, name=name, test_dir=test_dir,
|
super().__init__(binary, args, name=name, test_dir=test_dir,
|
||||||
socket_scm_helper=socket_scm_helper,
|
socket_scm_helper=socket_scm_helper,
|
||||||
sock_dir=sock_dir)
|
sock_dir=sock_dir)
|
||||||
self._qtest = None
|
self._qtest: Optional[QEMUQtestProtocol] = None
|
||||||
self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
|
self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -130,7 +134,7 @@ def _base_args(self) -> List[str]:
|
|||||||
])
|
])
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def _pre_launch(self):
|
def _pre_launch(self) -> None:
|
||||||
super()._pre_launch()
|
super()._pre_launch()
|
||||||
self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
|
self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
|
||||||
|
|
||||||
@ -139,7 +143,7 @@ def _post_launch(self) -> None:
|
|||||||
super()._post_launch()
|
super()._post_launch()
|
||||||
self._qtest.accept()
|
self._qtest.accept()
|
||||||
|
|
||||||
def _post_shutdown(self):
|
def _post_shutdown(self) -> None:
|
||||||
super()._post_shutdown()
|
super()._post_shutdown()
|
||||||
self._remove_if_exists(self._qtest_path)
|
self._remove_if_exists(self._qtest_path)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user