mirror of
				https://git.proxmox.com/git/mirror_frr
				synced 2025-11-04 01:38:20 +00:00 
			
		
		
		
	Done with a combination of regex'ing and banging my head against a wall. Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
		
			
				
	
	
		
			307 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			307 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
 | 
						|
# SPDX-License-Identifier: GPL-2.0-or-later
 | 
						|
#
 | 
						|
# July 24 2021, Christian Hopps <chopps@labn.net>
 | 
						|
#
 | 
						|
# Copyright (c) 2021, LabN Consulting, L.L.C.
 | 
						|
#
 | 
						|
import argparse
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import pty
 | 
						|
import re
 | 
						|
import readline
 | 
						|
import select
 | 
						|
import socket
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
import termios
 | 
						|
import tty
 | 
						|
 | 
						|
 | 
						|
ENDMARKER = b"\x00END\x00"
 | 
						|
 | 
						|
 | 
						|
def lineiter(sock):
 | 
						|
    s = ""
 | 
						|
    while True:
 | 
						|
        sb = sock.recv(256)
 | 
						|
        if not sb:
 | 
						|
            return
 | 
						|
 | 
						|
        s += sb.decode("utf-8")
 | 
						|
        i = s.find("\n")
 | 
						|
        if i != -1:
 | 
						|
            yield s[:i]
 | 
						|
            s = s[i + 1 :]
 | 
						|
 | 
						|
 | 
						|
def spawn(unet, host, cmd):
 | 
						|
    if sys.stdin.isatty():
 | 
						|
        old_tty = termios.tcgetattr(sys.stdin)
 | 
						|
        tty.setraw(sys.stdin.fileno())
 | 
						|
    try:
 | 
						|
        master_fd, slave_fd = pty.openpty()
 | 
						|
 | 
						|
        # use os.setsid() make it run in a new process group, or bash job
 | 
						|
        # control will not be enabled
 | 
						|
        p = unet.hosts[host].popen(
 | 
						|
            cmd,
 | 
						|
            preexec_fn=os.setsid,
 | 
						|
            stdin=slave_fd,
 | 
						|
            stdout=slave_fd,
 | 
						|
            stderr=slave_fd,
 | 
						|
            universal_newlines=True,
 | 
						|
        )
 | 
						|
 | 
						|
        while p.poll() is None:
 | 
						|
            r, w, e = select.select([sys.stdin, master_fd], [], [], 0.25)
 | 
						|
            if sys.stdin in r:
 | 
						|
                d = os.read(sys.stdin.fileno(), 10240)
 | 
						|
                os.write(master_fd, d)
 | 
						|
            elif master_fd in r:
 | 
						|
                o = os.read(master_fd, 10240)
 | 
						|
                if o:
 | 
						|
                    os.write(sys.stdout.fileno(), o)
 | 
						|
    finally:
 | 
						|
        # restore tty settings back
 | 
						|
        if sys.stdin.isatty():
 | 
						|
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
 | 
						|
 | 
						|
 | 
						|
def doline(unet, line, writef):
 | 
						|
    def host_cmd_split(unet, cmd):
 | 
						|
        csplit = cmd.split()
 | 
						|
        for i, e in enumerate(csplit):
 | 
						|
            if e not in unet.hosts:
 | 
						|
                break
 | 
						|
        hosts = csplit[:i]
 | 
						|
        if not hosts:
 | 
						|
            hosts = sorted(unet.hosts.keys())
 | 
						|
        cmd = " ".join(csplit[i:])
 | 
						|
        return hosts, cmd
 | 
						|
 | 
						|
    line = line.strip()
 | 
						|
    m = re.match(r"^(\S+)(?:\s+(.*))?$", line)
 | 
						|
    if not m:
 | 
						|
        return True
 | 
						|
 | 
						|
    cmd = m.group(1)
 | 
						|
    oargs = m.group(2) if m.group(2) else ""
 | 
						|
    if cmd == "q" or cmd == "quit":
 | 
						|
        return False
 | 
						|
    if cmd == "hosts":
 | 
						|
        writef("%% hosts: %s\n" % " ".join(sorted(unet.hosts.keys())))
 | 
						|
    elif cmd in ["term", "vtysh", "xterm"]:
 | 
						|
        args = oargs.split()
 | 
						|
        if not args or (len(args) == 1 and args[0] == "*"):
 | 
						|
            args = sorted(unet.hosts.keys())
 | 
						|
        hosts = [unet.hosts[x] for x in args if x in unet.hosts]
 | 
						|
        for host in hosts:
 | 
						|
            if cmd == "t" or cmd == "term":
 | 
						|
                host.run_in_window("bash", title="sh-%s" % host)
 | 
						|
            elif cmd == "v" or cmd == "vtysh":
 | 
						|
                host.run_in_window("vtysh", title="vt-%s" % host)
 | 
						|
            elif cmd == "x" or cmd == "xterm":
 | 
						|
                host.run_in_window("bash", title="sh-%s" % host, forcex=True)
 | 
						|
    elif cmd == "sh":
 | 
						|
        hosts, cmd = host_cmd_split(unet, oargs)
 | 
						|
        for host in hosts:
 | 
						|
            if sys.stdin.isatty():
 | 
						|
                spawn(unet, host, cmd)
 | 
						|
            else:
 | 
						|
                if len(hosts) > 1:
 | 
						|
                    writef("------ Host: %s ------\n" % host)
 | 
						|
                output = unet.hosts[host].cmd_legacy(cmd)
 | 
						|
                writef(output)
 | 
						|
                if len(hosts) > 1:
 | 
						|
                    writef("------- End: %s ------\n" % host)
 | 
						|
        writef("\n")
 | 
						|
    elif cmd == "h" or cmd == "help":
 | 
						|
        writef(
 | 
						|
            """
 | 
						|
Commands:
 | 
						|
  help                       :: this help
 | 
						|
  sh [hosts] <shell-command> :: execute <shell-command> on <host>
 | 
						|
  term [hosts]               :: open shell terminals for hosts
 | 
						|
  vtysh [hosts]              :: open vtysh terminals for hosts
 | 
						|
  [hosts] <vtysh-command>    :: execute vtysh-command on hosts\n\n"""
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        hosts, cmd = host_cmd_split(unet, line)
 | 
						|
        for host in hosts:
 | 
						|
            if len(hosts) > 1:
 | 
						|
                writef("------ Host: %s ------\n" % host)
 | 
						|
            output = unet.hosts[host].cmd_legacy('vtysh -c "{}"'.format(cmd))
 | 
						|
            writef(output)
 | 
						|
            if len(hosts) > 1:
 | 
						|
                writef("------- End: %s ------\n" % host)
 | 
						|
        writef("\n")
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def cli_server_setup(unet):
 | 
						|
    sockdir = tempfile.mkdtemp("-sockdir", "pyt")
 | 
						|
    sockpath = os.path.join(sockdir, "cli-server.sock")
 | 
						|
    try:
 | 
						|
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
						|
        sock.settimeout(10)
 | 
						|
        sock.bind(sockpath)
 | 
						|
        sock.listen(1)
 | 
						|
        return sock, sockdir, sockpath
 | 
						|
    except Exception:
 | 
						|
        unet.cmd_status("rm -rf " + sockdir)
 | 
						|
        raise
 | 
						|
 | 
						|
 | 
						|
def cli_server(unet, server_sock):
 | 
						|
    sock, addr = server_sock.accept()
 | 
						|
 | 
						|
    # Go into full non-blocking mode now
 | 
						|
    sock.settimeout(None)
 | 
						|
 | 
						|
    for line in lineiter(sock):
 | 
						|
        line = line.strip()
 | 
						|
 | 
						|
        def writef(x):
 | 
						|
            xb = x.encode("utf-8")
 | 
						|
            sock.send(xb)
 | 
						|
 | 
						|
        if not doline(unet, line, writef):
 | 
						|
            return
 | 
						|
        sock.send(ENDMARKER)
 | 
						|
 | 
						|
 | 
						|
def cli_client(sockpath, prompt="unet> "):
 | 
						|
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | 
						|
    sock.settimeout(10)
 | 
						|
    sock.connect(sockpath)
 | 
						|
 | 
						|
    # Go into full non-blocking mode now
 | 
						|
    sock.settimeout(None)
 | 
						|
 | 
						|
    print("\n--- Micronet CLI Starting ---\n\n")
 | 
						|
    while True:
 | 
						|
        if sys.version_info[0] == 2:
 | 
						|
            line = raw_input(prompt)  # pylint: disable=E0602
 | 
						|
        else:
 | 
						|
            line = input(prompt)
 | 
						|
        if line is None:
 | 
						|
            return
 | 
						|
 | 
						|
        # Need to put \n back
 | 
						|
        line += "\n"
 | 
						|
 | 
						|
        # Send the CLI command
 | 
						|
        sock.send(line.encode("utf-8"))
 | 
						|
 | 
						|
        def bendswith(b, sentinel):
 | 
						|
            slen = len(sentinel)
 | 
						|
            return len(b) >= slen and b[-slen:] == sentinel
 | 
						|
 | 
						|
        # Collect the output
 | 
						|
        rb = b""
 | 
						|
        while not bendswith(rb, ENDMARKER):
 | 
						|
            lb = sock.recv(4096)
 | 
						|
            if not lb:
 | 
						|
                return
 | 
						|
            rb += lb
 | 
						|
 | 
						|
        # Remove the marker
 | 
						|
        rb = rb[: -len(ENDMARKER)]
 | 
						|
 | 
						|
        # Write the output
 | 
						|
        sys.stdout.write(rb.decode("utf-8"))
 | 
						|
 | 
						|
 | 
						|
def local_cli(unet, outf, prompt="unet> "):
 | 
						|
    print("\n--- Micronet CLI Starting ---\n\n")
 | 
						|
    while True:
 | 
						|
        if sys.version_info[0] == 2:
 | 
						|
            line = raw_input(prompt)  # pylint: disable=E0602
 | 
						|
        else:
 | 
						|
            line = input(prompt)
 | 
						|
        if line is None:
 | 
						|
            return
 | 
						|
        if not doline(unet, line, outf.write):
 | 
						|
            return
 | 
						|
 | 
						|
 | 
						|
def cli(
 | 
						|
    unet,
 | 
						|
    histfile=None,
 | 
						|
    sockpath=None,
 | 
						|
    force_window=False,
 | 
						|
    title=None,
 | 
						|
    prompt=None,
 | 
						|
    background=True,
 | 
						|
):
 | 
						|
    logger = logging.getLogger("cli-client")
 | 
						|
 | 
						|
    if prompt is None:
 | 
						|
        prompt = "unet> "
 | 
						|
 | 
						|
    if force_window or not sys.stdin.isatty():
 | 
						|
        # Run CLI in another window b/c we have no tty.
 | 
						|
        sock, sockdir, sockpath = cli_server_setup(unet)
 | 
						|
 | 
						|
        python_path = unet.get_exec_path(["python3", "python"])
 | 
						|
        us = os.path.realpath(__file__)
 | 
						|
        cmd = "{} {}".format(python_path, us)
 | 
						|
        if histfile:
 | 
						|
            cmd += " --histfile=" + histfile
 | 
						|
        if title:
 | 
						|
            cmd += " --prompt={}".format(title)
 | 
						|
        cmd += " " + sockpath
 | 
						|
 | 
						|
        try:
 | 
						|
            unet.run_in_window(cmd, new_window=True, title=title, background=background)
 | 
						|
            return cli_server(unet, sock)
 | 
						|
        finally:
 | 
						|
            unet.cmd_status("rm -rf " + sockdir)
 | 
						|
 | 
						|
    if not unet:
 | 
						|
        logger.debug("client-cli using sockpath %s", sockpath)
 | 
						|
 | 
						|
    try:
 | 
						|
        if histfile is None:
 | 
						|
            histfile = os.path.expanduser("~/.micronet-history.txt")
 | 
						|
            if not os.path.exists(histfile):
 | 
						|
                if unet:
 | 
						|
                    unet.cmd("touch " + histfile)
 | 
						|
                else:
 | 
						|
                    subprocess.run("touch " + histfile)
 | 
						|
        if histfile:
 | 
						|
            readline.read_history_file(histfile)
 | 
						|
    except Exception:
 | 
						|
        pass
 | 
						|
 | 
						|
    try:
 | 
						|
        if sockpath:
 | 
						|
            cli_client(sockpath, prompt=prompt)
 | 
						|
        else:
 | 
						|
            local_cli(unet, sys.stdout, prompt=prompt)
 | 
						|
    except EOFError:
 | 
						|
        pass
 | 
						|
    except Exception as ex:
 | 
						|
        logger.critical("cli: got exception: %s", ex, exc_info=True)
 | 
						|
        raise
 | 
						|
    finally:
 | 
						|
        readline.write_history_file(histfile)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    logging.basicConfig(level=logging.DEBUG, filename="/tmp/topotests/cli-client.log")
 | 
						|
    logger = logging.getLogger("cli-client")
 | 
						|
    logger.info("Start logging cli-client")
 | 
						|
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
    parser.add_argument("--histfile", help="file to user for history")
 | 
						|
    parser.add_argument("--prompt-text", help="prompt string to use")
 | 
						|
    parser.add_argument("socket", help="path to pair of sockets to communicate over")
 | 
						|
    args = parser.parse_args()
 | 
						|
 | 
						|
    prompt = "{}> ".format(args.prompt_text) if args.prompt_text else "unet> "
 | 
						|
    cli(None, args.histfile, args.socket, prompt=prompt)
 |