mirror of
				https://github.com/qemu/qemu.git
				synced 2025-10-20 12:20:11 +00:00 
			
		
		
		
	 fca9d72323
			
		
	
	
		fca9d72323
		
	
	
	
	
		
			
			If the user selects pretty-printing (-p) the contents of any dictionaries in the output are sorted by key. Signed-off-by: David Edmondson <david.edmondson@oracle.com> Message-Id: <20201013141414.18398-1-david.edmondson@oracle.com> Reviewed-by: Philippe Mathieu-Daudé <philmd@redhat.com> Signed-off-by: Markus Armbruster <armbru@redhat.com>
		
			
				
	
	
		
			460 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			460 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| #
 | |
| # Low-level QEMU shell on top of QMP.
 | |
| #
 | |
| # Copyright (C) 2009, 2010 Red Hat Inc.
 | |
| #
 | |
| # Authors:
 | |
| #  Luiz Capitulino <lcapitulino@redhat.com>
 | |
| #
 | |
| # This work is licensed under the terms of the GNU GPL, version 2.  See
 | |
| # the COPYING file in the top-level directory.
 | |
| #
 | |
| # Usage:
 | |
| #
 | |
| # Start QEMU with:
 | |
| #
 | |
| # # qemu [...] -qmp unix:./qmp-sock,server
 | |
| #
 | |
| # Run the shell:
 | |
| #
 | |
| # $ qmp-shell ./qmp-sock
 | |
| #
 | |
| # Commands have the following format:
 | |
| #
 | |
| #    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
 | |
| #
 | |
| # For example:
 | |
| #
 | |
| # (QEMU) device_add driver=e1000 id=net1
 | |
| # {u'return': {}}
 | |
| # (QEMU)
 | |
| #
 | |
| # key=value pairs also support Python or JSON object literal subset notations,
 | |
| # without spaces. Dictionaries/objects {} are supported as are arrays [].
 | |
| #
 | |
| #    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
 | |
| #
 | |
| # Both JSON and Python formatting should work, including both styles of
 | |
| # string literal quotes. Both paradigms of literal values should work,
 | |
| # including null/true/false for JSON and None/True/False for Python.
 | |
| #
 | |
| #
 | |
| # Transactions have the following multi-line format:
 | |
| #
 | |
| #    transaction(
 | |
| #    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
 | |
| #    ...
 | |
| #    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
 | |
| #    )
 | |
| #
 | |
| # One line transactions are also supported:
 | |
| #
 | |
| #    transaction( action-name1 ... )
 | |
| #
 | |
| # For example:
 | |
| #
 | |
| #     (QEMU) transaction(
 | |
| #     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
 | |
| #     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
 | |
| #     TRANS> )
 | |
| #     {"return": {}}
 | |
| #     (QEMU)
 | |
| #
 | |
| # Use the -v and -p options to activate the verbose and pretty-print options,
 | |
| # which will echo back the properly formatted JSON-compliant QMP that is being
 | |
| # sent to QEMU, which is useful for debugging and documentation generation.
 | |
| 
 | |
| import json
 | |
| import ast
 | |
| import readline
 | |
| import sys
 | |
| import os
 | |
| import errno
 | |
| import atexit
 | |
| import re
 | |
| 
 | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
 | |
| from qemu import qmp
 | |
| 
 | |
| class QMPCompleter(list):
 | |
|     def complete(self, text, state):
 | |
|         for cmd in self:
 | |
|             if cmd.startswith(text):
 | |
|                 if not state:
 | |
|                     return cmd
 | |
|                 else:
 | |
|                     state -= 1
 | |
| 
 | |
| class QMPShellError(Exception):
 | |
|     pass
 | |
| 
 | |
| class QMPShellBadPort(QMPShellError):
 | |
|     pass
 | |
| 
 | |
| class FuzzyJSON(ast.NodeTransformer):
 | |
|     '''This extension of ast.NodeTransformer filters literal "true/false/null"
 | |
|     values in an AST and replaces them by proper "True/False/None" values that
 | |
|     Python can properly evaluate.'''
 | |
|     def visit_Name(self, node):
 | |
|         if node.id == 'true':
 | |
|             node.id = 'True'
 | |
|         if node.id == 'false':
 | |
|             node.id = 'False'
 | |
|         if node.id == 'null':
 | |
|             node.id = 'None'
 | |
|         return node
 | |
| 
 | |
| # TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
 | |
| #       _execute_cmd()). Let's design a better one.
 | |
| class QMPShell(qmp.QEMUMonitorProtocol):
 | |
|     def __init__(self, address, pretty=False):
 | |
|         super(QMPShell, self).__init__(self.__get_address(address))
 | |
|         self._greeting = None
 | |
|         self._completer = None
 | |
|         self._pretty = pretty
 | |
|         self._transmode = False
 | |
|         self._actions = list()
 | |
|         self._histfile = os.path.join(os.path.expanduser('~'),
 | |
|                                       '.qmp-shell_history')
 | |
| 
 | |
|     def __get_address(self, arg):
 | |
|         """
 | |
|         Figure out if the argument is in the port:host form, if it's not it's
 | |
|         probably a file path.
 | |
|         """
 | |
|         addr = arg.split(':')
 | |
|         if len(addr) == 2:
 | |
|             try:
 | |
|                 port = int(addr[1])
 | |
|             except ValueError:
 | |
|                 raise QMPShellBadPort
 | |
|             return ( addr[0], port )
 | |
|         # socket path
 | |
|         return arg
 | |
| 
 | |
|     def _fill_completion(self):
 | |
|         cmds = self.cmd('query-commands')
 | |
|         if 'error' in cmds:
 | |
|             return
 | |
|         for cmd in cmds['return']:
 | |
|             self._completer.append(cmd['name'])
 | |
| 
 | |
|     def __completer_setup(self):
 | |
|         self._completer = QMPCompleter()
 | |
|         self._fill_completion()
 | |
|         readline.set_history_length(1024)
 | |
|         readline.set_completer(self._completer.complete)
 | |
|         readline.parse_and_bind("tab: complete")
 | |
|         # XXX: default delimiters conflict with some command names (eg. query-),
 | |
|         # clearing everything as it doesn't seem to matter
 | |
|         readline.set_completer_delims('')
 | |
|         try:
 | |
|             readline.read_history_file(self._histfile)
 | |
|         except Exception as e:
 | |
|             if isinstance(e, IOError) and e.errno == errno.ENOENT:
 | |
|                 # File not found. No problem.
 | |
|                 pass
 | |
|             else:
 | |
|                 print("Failed to read history '%s'; %s" % (self._histfile, e))
 | |
|         atexit.register(self.__save_history)
 | |
| 
 | |
|     def __save_history(self):
 | |
|         try:
 | |
|             readline.write_history_file(self._histfile)
 | |
|         except Exception as e:
 | |
|             print("Failed to save history file '%s'; %s" % (self._histfile, e))
 | |
| 
 | |
|     def __parse_value(self, val):
 | |
|         try:
 | |
|             return int(val)
 | |
|         except ValueError:
 | |
|             pass
 | |
| 
 | |
|         if val.lower() == 'true':
 | |
|             return True
 | |
|         if val.lower() == 'false':
 | |
|             return False
 | |
|         if val.startswith(('{', '[')):
 | |
|             # Try first as pure JSON:
 | |
|             try:
 | |
|                 return json.loads(val)
 | |
|             except ValueError:
 | |
|                 pass
 | |
|             # Try once again as FuzzyJSON:
 | |
|             try:
 | |
|                 st = ast.parse(val, mode='eval')
 | |
|                 return ast.literal_eval(FuzzyJSON().visit(st))
 | |
|             except SyntaxError:
 | |
|                 pass
 | |
|             except ValueError:
 | |
|                 pass
 | |
|         return val
 | |
| 
 | |
|     def __cli_expr(self, tokens, parent):
 | |
|         for arg in tokens:
 | |
|             (key, sep, val) = arg.partition('=')
 | |
|             if sep != '=':
 | |
|                 raise QMPShellError("Expected a key=value pair, got '%s'" % arg)
 | |
| 
 | |
|             value = self.__parse_value(val)
 | |
|             optpath = key.split('.')
 | |
|             curpath = []
 | |
|             for p in optpath[:-1]:
 | |
|                 curpath.append(p)
 | |
|                 d = parent.get(p, {})
 | |
|                 if type(d) is not dict:
 | |
|                     raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
 | |
|                 parent[p] = d
 | |
|                 parent = d
 | |
|             if optpath[-1] in parent:
 | |
|                 if type(parent[optpath[-1]]) is dict:
 | |
|                     raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
 | |
|                 else:
 | |
|                     raise QMPShellError('Cannot set "%s" multiple times' % key)
 | |
|             parent[optpath[-1]] = value
 | |
| 
 | |
|     def __build_cmd(self, cmdline):
 | |
|         """
 | |
|         Build a QMP input object from a user provided command-line in the
 | |
|         following format:
 | |
| 
 | |
|             < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
 | |
|         """
 | |
|         cmdargs = re.findall(r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''', cmdline)
 | |
| 
 | |
|         # Transactional CLI entry/exit:
 | |
|         if cmdargs[0] == 'transaction(':
 | |
|             self._transmode = True
 | |
|             cmdargs.pop(0)
 | |
|         elif cmdargs[0] == ')' and self._transmode:
 | |
|             self._transmode = False
 | |
|             if len(cmdargs) > 1:
 | |
|                 raise QMPShellError("Unexpected input after close of Transaction sub-shell")
 | |
|             qmpcmd = { 'execute': 'transaction',
 | |
|                        'arguments': { 'actions': self._actions } }
 | |
|             self._actions = list()
 | |
|             return qmpcmd
 | |
| 
 | |
|         # Nothing to process?
 | |
|         if not cmdargs:
 | |
|             return None
 | |
| 
 | |
|         # Parse and then cache this Transactional Action
 | |
|         if self._transmode:
 | |
|             finalize = False
 | |
|             action = { 'type': cmdargs[0], 'data': {} }
 | |
|             if cmdargs[-1] == ')':
 | |
|                 cmdargs.pop(-1)
 | |
|                 finalize = True
 | |
|             self.__cli_expr(cmdargs[1:], action['data'])
 | |
|             self._actions.append(action)
 | |
|             return self.__build_cmd(')') if finalize else None
 | |
| 
 | |
|         # Standard command: parse and return it to be executed.
 | |
|         qmpcmd = { 'execute': cmdargs[0], 'arguments': {} }
 | |
|         self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
 | |
|         return qmpcmd
 | |
| 
 | |
|     def _print(self, qmp):
 | |
|         indent = None
 | |
|         if self._pretty:
 | |
|             indent = 4
 | |
|         jsobj = json.dumps(qmp, indent=indent, sort_keys=self._pretty)
 | |
|         print(str(jsobj))
 | |
| 
 | |
|     def _execute_cmd(self, cmdline):
 | |
|         try:
 | |
|             qmpcmd = self.__build_cmd(cmdline)
 | |
|         except Exception as e:
 | |
|             print('Error while parsing command line: %s' % e)
 | |
|             print('command format: <command-name> ', end=' ')
 | |
|             print('[arg-name1=arg1] ... [arg-nameN=argN]')
 | |
|             return True
 | |
|         # For transaction mode, we may have just cached the action:
 | |
|         if qmpcmd is None:
 | |
|             return True
 | |
|         if self._verbose:
 | |
|             self._print(qmpcmd)
 | |
|         resp = self.cmd_obj(qmpcmd)
 | |
|         if resp is None:
 | |
|             print('Disconnected')
 | |
|             return False
 | |
|         self._print(resp)
 | |
|         return True
 | |
| 
 | |
|     def connect(self, negotiate):
 | |
|         self._greeting = super(QMPShell, self).connect(negotiate)
 | |
|         self.__completer_setup()
 | |
| 
 | |
|     def show_banner(self, msg='Welcome to the QMP low-level shell!'):
 | |
|         print(msg)
 | |
|         if not self._greeting:
 | |
|             print('Connected')
 | |
|             return
 | |
|         version = self._greeting['QMP']['version']['qemu']
 | |
|         print('Connected to QEMU %d.%d.%d\n' % (version['major'],version['minor'],version['micro']))
 | |
| 
 | |
|     def get_prompt(self):
 | |
|         if self._transmode:
 | |
|             return "TRANS> "
 | |
|         return "(QEMU) "
 | |
| 
 | |
|     def read_exec_command(self, prompt):
 | |
|         """
 | |
|         Read and execute a command.
 | |
| 
 | |
|         @return True if execution was ok, return False if disconnected.
 | |
|         """
 | |
|         try:
 | |
|             cmdline = input(prompt)
 | |
|         except EOFError:
 | |
|             print()
 | |
|             return False
 | |
|         if cmdline == '':
 | |
|             for ev in self.get_events():
 | |
|                 print(ev)
 | |
|             self.clear_events()
 | |
|             return True
 | |
|         else:
 | |
|             return self._execute_cmd(cmdline)
 | |
| 
 | |
|     def set_verbosity(self, verbose):
 | |
|         self._verbose = verbose
 | |
| 
 | |
| class HMPShell(QMPShell):
 | |
|     def __init__(self, address):
 | |
|         QMPShell.__init__(self, address)
 | |
|         self.__cpu_index = 0
 | |
| 
 | |
|     def __cmd_completion(self):
 | |
|         for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
 | |
|             if cmd and cmd[0] != '[' and cmd[0] != '\t':
 | |
|                 name = cmd.split()[0] # drop help text
 | |
|                 if name == 'info':
 | |
|                     continue
 | |
|                 if name.find('|') != -1:
 | |
|                     # Command in the form 'foobar|f' or 'f|foobar', take the
 | |
|                     # full name
 | |
|                     opt = name.split('|')
 | |
|                     if len(opt[0]) == 1:
 | |
|                         name = opt[1]
 | |
|                     else:
 | |
|                         name = opt[0]
 | |
|                 self._completer.append(name)
 | |
|                 self._completer.append('help ' + name) # help completion
 | |
| 
 | |
|     def __info_completion(self):
 | |
|         for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
 | |
|             if cmd:
 | |
|                 self._completer.append('info ' + cmd.split()[1])
 | |
| 
 | |
|     def __other_completion(self):
 | |
|         # special cases
 | |
|         self._completer.append('help info')
 | |
| 
 | |
|     def _fill_completion(self):
 | |
|         self.__cmd_completion()
 | |
|         self.__info_completion()
 | |
|         self.__other_completion()
 | |
| 
 | |
|     def __cmd_passthrough(self, cmdline, cpu_index = 0):
 | |
|         return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments':
 | |
|                               { 'command-line': cmdline,
 | |
|                                 'cpu-index': cpu_index } })
 | |
| 
 | |
|     def _execute_cmd(self, cmdline):
 | |
|         if cmdline.split()[0] == "cpu":
 | |
|             # trap the cpu command, it requires special setting
 | |
|             try:
 | |
|                 idx = int(cmdline.split()[1])
 | |
|                 if not 'return' in self.__cmd_passthrough('info version', idx):
 | |
|                     print('bad CPU index')
 | |
|                     return True
 | |
|                 self.__cpu_index = idx
 | |
|             except ValueError:
 | |
|                 print('cpu command takes an integer argument')
 | |
|                 return True
 | |
|         resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
 | |
|         if resp is None:
 | |
|             print('Disconnected')
 | |
|             return False
 | |
|         assert 'return' in resp or 'error' in resp
 | |
|         if 'return' in resp:
 | |
|             # Success
 | |
|             if len(resp['return']) > 0:
 | |
|                 print(resp['return'], end=' ')
 | |
|         else:
 | |
|             # Error
 | |
|             print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
 | |
|         return True
 | |
| 
 | |
|     def show_banner(self):
 | |
|         QMPShell.show_banner(self, msg='Welcome to the HMP shell!')
 | |
| 
 | |
| def die(msg):
 | |
|     sys.stderr.write('ERROR: %s\n' % msg)
 | |
|     sys.exit(1)
 | |
| 
 | |
| def fail_cmdline(option=None):
 | |
|     if option:
 | |
|         sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option)
 | |
|     sys.stderr.write('qmp-shell [ -v ] [ -p ] [ -H ] [ -N ] < UNIX socket path> | < TCP address:port >\n')
 | |
|     sys.stderr.write('    -v     Verbose (echo command sent and received)\n')
 | |
|     sys.stderr.write('    -p     Pretty-print JSON\n')
 | |
|     sys.stderr.write('    -H     Use HMP interface\n')
 | |
|     sys.stderr.write('    -N     Skip negotiate (for qemu-ga)\n')
 | |
|     sys.exit(1)
 | |
| 
 | |
| def main():
 | |
|     addr = ''
 | |
|     qemu = None
 | |
|     hmp = False
 | |
|     pretty = False
 | |
|     verbose = False
 | |
|     negotiate = True
 | |
| 
 | |
|     try:
 | |
|         for arg in sys.argv[1:]:
 | |
|             if arg == "-H":
 | |
|                 if qemu is not None:
 | |
|                     fail_cmdline(arg)
 | |
|                 hmp = True
 | |
|             elif arg == "-p":
 | |
|                 pretty = True
 | |
|             elif arg == "-N":
 | |
|                 negotiate = False
 | |
|             elif arg == "-v":
 | |
|                 verbose = True
 | |
|             else:
 | |
|                 if qemu is not None:
 | |
|                     fail_cmdline(arg)
 | |
|                 if hmp:
 | |
|                     qemu = HMPShell(arg)
 | |
|                 else:
 | |
|                     qemu = QMPShell(arg, pretty)
 | |
|                 addr = arg
 | |
| 
 | |
|         if qemu is None:
 | |
|             fail_cmdline()
 | |
|     except QMPShellBadPort:
 | |
|         die('bad port number in command-line')
 | |
| 
 | |
|     try:
 | |
|         qemu.connect(negotiate)
 | |
|     except qmp.QMPConnectError:
 | |
|         die('Didn\'t get QMP greeting message')
 | |
|     except qmp.QMPCapabilitiesError:
 | |
|         die('Could not negotiate capabilities')
 | |
|     except qemu.error:
 | |
|         die('Could not connect to %s' % addr)
 | |
| 
 | |
|     qemu.show_banner()
 | |
|     qemu.set_verbosity(verbose)
 | |
|     while qemu.read_exec_command(qemu.get_prompt()):
 | |
|         pass
 | |
|     qemu.close()
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |