tests: NEW micronet replacement for mininet

Signed-off-by: Christian Hopps <chopps@labn.net>
This commit is contained in:
Christian Hopps 2021-07-14 20:51:15 +00:00
parent bc51ce6810
commit 6a5433ef0b
3 changed files with 1640 additions and 0 deletions

View File

@ -0,0 +1,949 @@
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
#
# July 9 2021, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2021, LabN Consulting, L.L.C.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; see the file COPYING; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
import datetime
import logging
import os
import pdb
import re
import readline
import shlex
import subprocess
import sys
import tempfile
import time as time_mod
import traceback
root_hostname = subprocess.check_output("hostname")
# This allows us to cleanup any leftovers later on
os.environ["MICRONET_PID"] = str(os.getpid())
class Timeout(object):
def __init__(self, delta):
self.started_on = datetime.datetime.now()
self.expires_on = self.started_on + datetime.timedelta(seconds=delta)
def elapsed(self):
elapsed = datetime.datetime.now() - self.started_on
return elapsed.total_seconds()
def is_expired(self):
return datetime.datetime.now() > self.expires_on
def is_string(value):
"""Return True if value is a string."""
try:
return isinstance(value, basestring) # type: ignore
except NameError:
return isinstance(value, str)
def shell_quote(command):
"""Return command wrapped in single quotes."""
if sys.version_info[0] >= 3:
return shlex.quote(command)
return "'{}'".format(command.replace("'", "'\"'\"'")) # type: ignore
def cmd_error(rc, o, e):
s = "rc {}".format(rc)
o = "\n\tstdout: " + o.strip() if o and o.strip() else ""
e = "\n\tstderr: " + e.strip() if e and e.strip() else ""
return s + o + e
def proc_error(p, o, e):
args = p.args if is_string(p.args) else " ".join(p.args)
s = "rc {} pid {}\n\targs: {}".format(p.returncode, p.pid, args)
o = "\n\tstdout: " + o.strip() if o and o.strip() else ""
e = "\n\tstderr: " + e.strip() if e and e.strip() else ""
return s + o + e
def comm_error(p):
rc = p.poll()
assert rc is not None
if not hasattr(p, "saved_output"):
p.saved_output = p.communicate()
return proc_error(p, *p.saved_output)
class Commander(object): # pylint: disable=R0205
"""
Commander.
An object that can execute commands.
"""
tmux_wait_gen = 0
def __init__(self, name, logger=None):
"""Create a Commander."""
self.name = name
self.last = None
self.exec_paths = {}
self.pre_cmd = []
self.pre_cmd_str = ""
if not logger:
self.logger = logging.getLogger(__name__ + ".commander." + name)
else:
self.logger = logger
self.cwd = self.cmd_raises("pwd").strip()
def set_logger(self, logfile):
self.logger = logging.getLogger(__name__ + ".commander." + self.name)
if is_string(logfile):
handler = logging.FileHandler(logfile, mode="w")
else:
handler = logging.StreamHandler(logfile)
fmtstr = "%(asctime)s.%(msecs)03d %(levelname)s: {}({}): %(message)s".format(
self.__class__.__name__, self.name
)
handler.setFormatter(logging.Formatter(fmt=fmtstr))
self.logger.addHandler(handler)
def set_pre_cmd(self, pre_cmd=None):
if not pre_cmd:
self.pre_cmd = []
self.pre_cmd_str = ""
else:
self.pre_cmd = pre_cmd
self.pre_cmd_str = " ".join(self.pre_cmd) + " "
def __str__(self):
return "Commander({})".format(self.name)
def get_exec_path(self, binary):
"""Return the full path to the binary executable.
`binary` :: binary name or list of binary names
"""
if is_string(binary):
bins = [binary]
else:
bins = binary
for b in bins:
if b in self.exec_paths:
return self.exec_paths[b]
rc, output, _ = self.cmd_status("which " + b, warn=False)
if not rc:
return os.path.abspath(output.strip())
return None
def get_tmp_dir(self, uniq):
return os.path.join(tempfile.mkdtemp(), uniq)
def test(self, flags, arg):
"""Run test binary, with flags and arg"""
test_path = self.get_exec_path(["test"])
rc, output, _ = self.cmd_status([test_path, flags, arg], warn=False)
return not rc
def path_exists(self, path):
"""Check if path exists."""
return self.test("-e", path)
def _get_cmd_str(self, cmd):
if is_string(cmd):
return self.pre_cmd_str + cmd
cmd = self.pre_cmd + cmd
return " ".join(cmd)
def _get_sub_args(self, cmd, defaults, **kwargs):
if is_string(cmd):
defaults["shell"] = True
pre_cmd = self.pre_cmd_str
else:
defaults["shell"] = False
pre_cmd = self.pre_cmd
cmd = [str(x) for x in cmd]
defaults.update(kwargs)
return pre_cmd, cmd, defaults
def _popen(self, method, cmd, skip_pre_cmd=False, **kwargs):
if sys.version_info[0] >= 3:
defaults = {
"encoding": "utf-8",
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
}
else:
defaults = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
}
pre_cmd, cmd, defaults = self._get_sub_args(cmd, defaults, **kwargs)
self.logger.debug('%s: %s("%s", kwargs: %s)', self, method, cmd, defaults)
actual_cmd = cmd if skip_pre_cmd else pre_cmd + cmd
p = subprocess.Popen(actual_cmd, **defaults)
if not hasattr(p, "args"):
p.args = actual_cmd
return p, actual_cmd
def set_cwd(self, cwd):
self.logger.warning("%s: 'cd' (%s) does not work outside namespaces", self, cwd)
self.cwd = cwd
def popen(self, cmd, **kwargs):
"""
Creates a pipe with the given `command`.
Args:
command: `str` or `list` of command to open a pipe with.
**kwargs: kwargs is eventually passed on to Popen. If `command` is a string
then will be invoked with shell=True, otherwise `command` is a list and
will be invoked with shell=False.
Returns:
a subprocess.Popen object.
"""
p, _ = self._popen("popen", cmd, **kwargs)
return p
def cmd_status(self, cmd, raises=False, warn=True, stdin=None, **kwargs):
"""Execute a command."""
# We are not a shell like mininet, so we need to intercept this
chdir = False
if not is_string(cmd):
cmds = cmd
else:
# XXX we can drop this when the code stops assuming it works
m = re.match(r"cd(\s*|\s+(\S+))$", cmd)
if m and m.group(2):
self.logger.warning(
"Bad call to 'cd' (chdir) emulating, use self.set_cwd():\n%s",
"".join(traceback.format_stack(limit=12)),
)
assert is_string(cmd)
chdir = True
cmd += " && pwd"
# If we are going to run under bash then we don't need shell=True!
cmds = ["/bin/bash", "-c", cmd]
pinput = None
if is_string(stdin) or isinstance(stdin, bytes):
pinput = stdin
stdin = subprocess.PIPE
p, actual_cmd = self._popen("cmd_status", cmds, stdin=stdin, **kwargs)
stdout, stderr = p.communicate(input=pinput)
rc = p.wait()
# For debugging purposes.
self.last = (rc, actual_cmd, cmd, stdout, stderr)
if rc:
if warn:
self.logger.warning(
"%s: proc failed: %s:", self, proc_error(p, stdout, stderr)
)
if raises:
# error = Exception("stderr: {}".format(stderr))
# This annoyingly doesnt' show stderr when printed normally
error = subprocess.CalledProcessError(rc, actual_cmd)
error.stdout, error.stderr = stdout, stderr
raise error
elif chdir:
self.set_cwd(stdout.strip())
return rc, stdout, stderr
def cmd_legacy(self, cmd, **kwargs):
"""Execute a command with stdout and stderr joined, *IGNORES ERROR*."""
defaults = {"stderr": subprocess.STDOUT}
defaults.update(kwargs)
_, stdout, _ = self.cmd_status(cmd, raises=False, **defaults)
return stdout
def cmd_raises(self, cmd, **kwargs):
"""Execute a command. Raise an exception on errors"""
rc, stdout, _ = self.cmd_status(cmd, raises=True, **kwargs)
assert rc == 0
return stdout
# Run a command in a new window (gnome-terminal, screen, tmux, xterm)
def run_in_window(
self,
cmd,
wait_for=False,
background=False,
name=None,
title=None,
forcex=False,
new_window=False,
tmux_target=None,
):
"""
Run a command in a new window (TMUX, Screen or XTerm).
Args:
wait_for: True to wait for exit from command or `str` as channel neme to signal on exit, otherwise False
background: Do not change focus to new window.
title: Title for new pane (tmux) or window (xterm).
name: Name of the new window (tmux)
forcex: Force use of X11.
new_window: Open new window (instead of pane) in TMUX
tmux_target: Target for tmux pane.
Returns:
the pane/window identifier from TMUX (depends on `new_window`)
"""
channel = None
if is_string(wait_for):
channel = wait_for
elif wait_for is True:
channel = "{}-wait-{}".format(os.getpid(), Commander.tmux_wait_gen)
Commander.tmux_wait_gen += 1
sudo_path = self.get_exec_path(["sudo"])
nscmd = sudo_path + " " + self.pre_cmd_str + cmd
if "TMUX" in os.environ and not forcex:
cmd = [self.get_exec_path("tmux")]
if new_window:
cmd.append("new-window")
cmd.append("-P")
if name:
cmd.append("-n")
cmd.append(name)
if tmux_target:
cmd.append("-t")
cmd.append(tmux_target)
else:
cmd.append("split-window")
cmd.append("-P")
cmd.append("-h")
if not tmux_target:
tmux_target = os.getenv("TMUX_PANE", "")
if background:
cmd.append("-d")
if tmux_target:
cmd.append("-t")
cmd.append(tmux_target)
if title:
nscmd = "printf '\033]2;{}\033\\'; {}".format(title, nscmd)
if channel:
nscmd = 'trap "tmux wait -S {}; exit 0" EXIT; {}'.format(channel, nscmd)
cmd.append(nscmd)
elif "STY" in os.environ and not forcex:
# wait for not supported in screen for now
channel = None
cmd = [self.get_exec_path("screen")]
if not os.path.exists(
"/run/screen/S-{}/{}".format(os.environ["USER"], os.environ["STY"])
):
cmd = ["sudo", "-u", os.environ["SUDO_USER"]] + cmd
cmd.append(nscmd)
elif "DISPLAY" in os.environ:
# We need it broken up for xterm
user_cmd = cmd
cmd = [self.get_exec_path("xterm")]
if "SUDO_USER" in os.environ:
cmd = [self.get_exec_path("sudo"), "-u", os.environ["SUDO_USER"]] + cmd
# if title:
# cmd.append("-T")
# cmd.append(title)
cmd.append("-e")
cmd.append(sudo_path)
cmd.extend(self.pre_cmd)
cmd.append(user_cmd)
# if channel:
# return self.cmd_raises(cmd, skip_pre_cmd=True)
# else:
p = self.popen(
cmd,
skip_pre_cmd=True,
stdin=None,
shell=False,
# stdout=open("/dev/null", "w"),
# stderr=open("/dev/null", "w"),
)
time_mod.sleep(2)
if p.poll() is not None:
self.logger.error("%s: Failed to launch xterm: %s", self, comm_error(p))
return ""
else:
self.logger.error(
"DISPLAY, STY, and TMUX not in environment, can't open window"
)
raise Exception("Window requestd but TMUX, Screen and X11 not available")
pane_info = self.cmd_raises(cmd, skip_pre_cmd=True).strip()
# Re-adjust the layout
if "TMUX" in os.environ:
self.cmd_status(
"tmux select-layout -t {} tiled".format(
pane_info if not tmux_target else tmux_target
),
skip_pre_cmd=True,
)
# Wait here if we weren't handed the channel to wait for
if channel and wait_for is True:
cmd = [self.get_exec_path("tmux"), "wait", channel]
self.cmd_status(cmd, skip_pre_cmd=True)
return pane_info
def delete(self):
pass
class LinuxNamespace(Commander):
"""
A linux Namespace.
An object that creates and executes commands in a linux namespace
"""
def __init__(
self,
name,
net=True,
mount=True,
uts=True,
cgroup=False,
ipc=False,
pid=False,
time=False,
user=False,
set_hostname=True,
private_mounts=None,
logger=None,
):
"""
Create a new linux namespace.
Args:
name: Internal name for the namespace.
net: Create network namespace.
mount: Create network namespace.
uts: Create UTS (hostname) namespace.
cgroup: Create cgroup namespace.
ipc: Create IPC namespace.
pid: Create PID namespace, also mounts new /proc.
time: Create time namespace.
user: Create user namespace, also keeps capabilities.
set_hostname: Set the hostname to `name`, uts must also be True.
private_mounts: List of strings of the form
"[/external/path:]/internal/path. If no external path is specified a
tmpfs is mounted on the internal path. Any paths specified are first
passed to `mkdir -p`.
logger: Passed to superclass.
"""
super(LinuxNamespace, self).__init__(name, logger)
self.logger.debug("%s: Creating", self)
self.intfs = []
nslist = []
cmd = ["/usr/bin/unshare"]
flags = "-"
self.ifnetns = {}
if cgroup:
nslist.append("cgroup")
flags += "C"
if ipc:
nslist.append("ipc")
flags += "i"
if mount:
nslist.append("mnt")
flags += "m"
if net:
nslist.append("net")
flags += "n"
if pid:
nslist.append("pid")
flags += "p"
cmd.append("--mount-proc")
if time:
# XXX this filename is probably wrong
nslist.append("time")
flags += "T"
if user:
nslist.append("user")
flags += "U"
cmd.append("--keep-caps")
if uts:
nslist.append("uts")
cmd.append("--uts")
cmd.append(flags)
cmd.append("/bin/cat")
# Using cat and a stdin PIPE is nice as it will exit when we do. However, we
# also detach it from the pgid so that signals do not propagate to it. This is
# b/c it would exit early (e.g., ^C) then, at least the main micronet proc which
# has no other processes like frr daemons running, will take the main network
# namespace with it, which will remove the bridges and the veth pair (because
# the bridge side veth is deleted).
self.logger.debug("%s: creating namespace process: %s", self, cmd)
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=open("/dev/null", "w"),
stderr=open("/dev/null", "w"),
preexec_fn=os.setsid, # detach from pgid so signals don't propogate
shell=False,
)
self.p = p
self.pid = p.pid
self.logger.debug("%s: namespace pid: %d", self, self.pid)
# -----------------------------------------------
# Now let's wait until unshare completes it's job
# -----------------------------------------------
timeout = Timeout(30)
while p.poll() is None and not timeout.is_expired():
for fname in tuple(nslist):
ours = os.readlink("/proc/self/ns/{}".format(fname))
theirs = os.readlink("/proc/{}/ns/{}".format(self.pid, fname))
# See if their namespace is different
if ours != theirs:
nslist.remove(fname)
if not nslist:
break
elapsed = int(timeout.elapsed())
if elapsed <= 3:
time_mod.sleep(0.1)
elif elapsed > 10:
self.logger.warning("%s: unshare taking more than %ss", self, elapsed)
time_mod.sleep(3)
else:
self.logger.info("%s: unshare taking more than %ss", self, elapsed)
time_mod.sleep(1)
assert p.poll() is None, "unshare unexpectedly exited!"
assert not nslist, "unshare never unshared!"
# Set pre-command based on our namespace proc
self.base_pre_cmd = ["/usr/bin/nsenter", "-a", "-t", str(self.pid)]
if not pid:
self.base_pre_cmd.append("-F")
self.set_pre_cmd(self.base_pre_cmd + ["--wd=" + self.cwd])
# Remount /sys to pickup any changes
self.cmd_raises("mount -t sysfs sysfs /sys")
# Set the hostname to the namespace name
if uts and set_hostname:
# Debugging get the root hostname
self.cmd_raises("hostname " + self.name)
nroot = subprocess.check_output("hostname")
if root_hostname != nroot:
result = self.p.poll()
assert root_hostname == nroot, "STATE of namespace process {}".format(
result
)
if private_mounts:
if is_string(private_mounts):
private_mounts = [private_mounts]
for m in private_mounts:
s = m.split(":", 1)
if len(s) == 1:
self.tmpfs_mount(s[0])
else:
self.bind_mount(s[0], s[1])
o = self.cmd_legacy("ls -l /proc/{}/ns".format(self.pid))
self.logger.debug("namespaces:\n %s", o)
# Doing this here messes up all_protocols ipv6 check
self.cmd_raises("ip link set lo up")
def __str__(self):
return "LinuxNamespace({})".format(self.name)
def tmpfs_mount(self, inner):
self.cmd_raises("mkdir -p " + inner)
self.cmd_raises("mount -n -t tmpfs tmpfs " + inner)
def bind_mount(self, outer, inner):
self.cmd_raises("mkdir -p " + inner)
self.cmd_raises("mount --rbind {} {} ".format(outer, inner))
def add_netns(self, ns):
self.logger.debug("Adding network namespace %s", ns)
ip_path = self.get_exec_path("ip")
assert ip_path, "XXX missing ip command!"
if os.path.exists("/run/netns/{}".format(ns)):
self.logger.warning("%s: Removing existing nsspace %s", self, ns)
try:
self.delete_netns(ns)
except Exception as ex:
self.logger.warning(
"%s: Couldn't remove existing nsspace %s: %s",
self,
ns,
str(ex),
exc_info=True,
)
self.cmd_raises([ip_path, "netns", "add", ns])
def delete_netns(self, ns):
self.logger.debug("Deleting network namespace %s", ns)
ip_path = self.get_exec_path("ip")
assert ip_path, "XXX missing ip command!"
self.cmd_raises([ip_path, "netns", "delete", ns])
def set_intf_netns(self, intf, ns, up=False):
# In case a user hard-codes 1 thinking it "resets"
ns = str(ns)
if ns == "1":
ns = str(self.pid)
self.logger.debug("Moving interface %s to namespace %s", intf, ns)
cmd = "ip link set {} netns " + ns
if up:
cmd += " up"
self.intf_ip_cmd(intf, cmd)
if ns == str(self.pid):
# If we are returning then remove from dict
if intf in self.ifnetns:
del self.ifnetns[intf]
else:
self.ifnetns[intf] = ns
def reset_intf_netns(self, intf):
self.logger.debug("Moving interface %s to default namespace", intf)
self.set_intf_netns(intf, str(self.pid))
def intf_ip_cmd(self, intf, cmd):
"""Run an ip command for considering an interfaces possible namespace.
`cmd` - format is run using the interface name on the command
"""
if intf in self.ifnetns:
assert cmd.startswith("ip ")
cmd = "ip -n " + self.ifnetns[intf] + cmd[2:]
self.cmd_raises(cmd.format(intf))
def set_cwd(self, cwd):
# Set pre-command based on our namespace proc
self.logger.debug("%s: new CWD %s", self, cwd)
self.set_pre_cmd(self.base_pre_cmd + ["--wd=" + cwd])
def register_interface(self, ifname):
if ifname not in self.intfs:
self.intfs.append(ifname)
def delete(self):
if self.p and self.p.poll() is None:
if sys.version_info[0] >= 3:
try:
self.p.terminate()
self.p.communicate(timeout=10)
except subprocess.TimeoutExpired:
self.p.kill()
self.p.communicate(timeout=2)
else:
self.p.kill()
self.p.communicate()
self.set_pre_cmd(["/bin/false"])
class SharedNamespace(Commander):
"""
Share another namespace.
An object that executes commands in an existing pid's linux namespace
"""
def __init__(self, name, pid, logger=None):
"""
Share a linux namespace.
Args:
name: Internal name for the namespace.
pid: PID of the process to share with.
"""
super(SharedNamespace, self).__init__(name, logger)
self.logger.debug("%s: Creating", self)
self.pid = pid
self.intfs = []
# Set pre-command based on our namespace proc
self.set_pre_cmd(
["/usr/bin/nsenter", "-a", "-t", str(self.pid), "--wd=" + self.cwd]
)
def __str__(self):
return "SharedNamespace({})".format(self.name)
def set_cwd(self, cwd):
# Set pre-command based on our namespace proc
self.logger.debug("%s: new CWD %s", self, cwd)
self.set_pre_cmd(["/usr/bin/nsenter", "-a", "-t", str(self.pid), "--wd=" + cwd])
def register_interface(self, ifname):
if ifname not in self.intfs:
self.intfs.append(ifname)
class Bridge(SharedNamespace):
"""
A linux bridge.
"""
next_brid_ord = 0
@classmethod
def _get_next_brid(cls):
brid_ord = cls.next_brid_ord
cls.next_brid_ord += 1
return brid_ord
def __init__(self, name=None, unet=None, logger=None):
"""Create a linux Bridge."""
self.unet = unet
self.brid_ord = self._get_next_brid()
if name:
self.brid = name
else:
self.brid = "br{}".format(self.brid_ord)
name = self.brid
super(Bridge, self).__init__(name, unet.pid, logger)
self.logger.debug("Bridge: Creating")
assert len(self.brid) <= 16 # Make sure fits in IFNAMSIZE
self.cmd_raises("ip link delete {} || true".format(self.brid))
self.cmd_raises("ip link add {} type bridge".format(self.brid))
self.cmd_raises("ip link set {} up".format(self.brid))
self.logger.debug("%s: Created, Running", self)
def __str__(self):
return "Bridge({})".format(self.brid)
def delete(self):
"""Stop the bridge (i.e., delete the linux resources)."""
rc, o, e = self.cmd_status("ip link show {}".format(self.brid), warn=False)
if not rc:
rc, o, e = self.cmd_status(
"ip link delete {}".format(self.brid), warn=False
)
if rc:
self.logger.error(
"%s: error deleting bridge %s: %s",
self,
self.brid,
cmd_error(rc, o, e),
)
else:
self.logger.debug("%s: Deleted.", self)
class Micronet(LinuxNamespace): # pylint: disable=R0205
"""
Micronet.
"""
def __init__(self):
"""Create a Micronet."""
self.hosts = {}
self.switches = {}
self.links = {}
self.macs = {}
self.rmacs = {}
super(Micronet, self).__init__("micronet", mount=True, net=True, uts=True)
self.logger.debug("%s: Creating", self)
def __str__(self):
return "Micronet()"
def __getitem__(self, key):
if key in self.switches:
return self.switches[key]
return self.hosts[key]
def add_host(self, name, cls=LinuxNamespace, **kwargs):
"""Add a host to micronet."""
self.logger.debug("%s: add_host %s", self, name)
self.hosts[name] = cls(name, **kwargs)
# Create a new mounted FS for tracking nested network namespaces creatd by the
# user with `ip netns add`
self.hosts[name].tmpfs_mount("/run/netns")
def add_link(self, name1, name2, if1, if2):
"""Add a link between switch and host to micronet."""
isp2p = False
if name1 in self.switches:
assert name2 in self.hosts
elif name2 in self.switches:
assert name1 in self.hosts
name1, name2 = name2, name1
if1, if2 = if2, if1
else:
# p2p link
assert name1 in self.hosts
assert name2 in self.hosts
isp2p = True
lname = "{}:{}-{}:{}".format(name1, if1, name2, if2)
self.logger.debug("%s: add_link %s%s", self, lname, " p2p" if isp2p else "")
self.links[lname] = (name1, if1, name2, if2)
# And create the veth now.
if isp2p:
lhost, rhost = self.hosts[name1], self.hosts[name2]
lifname = "i1{:x}".format(lhost.pid)
rifname = "i2{:x}".format(rhost.pid)
self.cmd_raises(
"ip link add {} type veth peer name {}".format(lifname, rifname)
)
self.cmd_raises("ip link set {} netns {}".format(lifname, lhost.pid))
lhost.cmd_raises("ip link set {} name {}".format(lifname, if1))
lhost.cmd_raises("ip link set {} up".format(if1))
lhost.register_interface(if1)
self.cmd_raises("ip link set {} netns {}".format(rifname, rhost.pid))
rhost.cmd_raises("ip link set {} name {}".format(rifname, if2))
rhost.cmd_raises("ip link set {} up".format(if2))
rhost.register_interface(if2)
else:
switch = self.switches[name1]
host = self.hosts[name2]
assert len(if1) <= 16 and len(if2) <= 16 # Make sure fits in IFNAMSIZE
self.logger.debug("%s: Creating veth pair for link %s", self, lname)
self.cmd_raises(
"ip link add {} type veth peer name {} netns {}".format(
if1, if2, host.pid
)
)
self.cmd_raises("ip link set {} netns {}".format(if1, switch.pid))
switch.register_interface(if1)
host.register_interface(if2)
self.cmd_raises("ip link set {} master {}".format(if1, switch.brid))
self.cmd_raises("ip link set {} up".format(if1))
host.cmd_raises("ip link set {} up".format(if2))
# Cache the MAC values, and reverse mapping
self.get_mac(name1, if1)
self.get_mac(name2, if2)
def add_switch(self, name):
"""Add a switch to micronet."""
self.logger.debug("%s: add_switch %s", self, name)
self.switches[name] = Bridge(name, self)
def get_mac(self, name, ifname):
if name in self.hosts:
dev = self.hosts[name]
else:
dev = self.switches[name]
if (name, ifname) not in self.macs:
_, output, _ = dev.cmd_status("ip -o link show " + ifname)
m = re.match(".*link/(loopback|ether) ([0-9a-fA-F:]+) .*", output)
mac = m.group(2)
self.macs[(name, ifname)] = mac
self.rmacs[mac] = (name, ifname)
return self.macs[(name, ifname)]
def delete(self):
"""Delete the micronet topology."""
self.logger.debug("%s: Deleting.", self)
for lname, (_, _, rname, rif) in self.links.items():
host = self.hosts[rname]
self.logger.debug("%s: Deleting veth pair for link %s", self, lname)
rc, o, e = host.cmd_status("ip link delete {}".format(rif), warn=False)
if rc:
self.logger.error(
"Error deleting veth pair %s: %s", lname, cmd_error(rc, o, e)
)
self.links = {}
for host in self.hosts.values():
try:
host.delete()
except Exception as error:
self.logger.error(
"%s: error while deleting host %s: %s", self, host, error
)
self.hosts = {}
for switch in self.switches.values():
try:
switch.delete()
except Exception as error:
self.logger.error(
"%s: error while deleting switch %s: %s", self, switch, error
)
self.switches = {}
self.logger.debug("%s: Deleted.", self)
super(Micronet, self).delete()
# ---------------------------
# Root level utility function
# ---------------------------
def get_exec_path(binary):
base = Commander("base")
return base.get_exec_path(binary)
commander = Commander("micronet")

View File

@ -0,0 +1,312 @@
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
#
# July 24 2021, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2021, LabN Consulting, L.L.C.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; see the file COPYING; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
import argparse
import datetime
import logging
import os
import pdb
import pty
import re
import readline
import select
import shlex
import socket
import subprocess
import sys
import tempfile
import termios
import time as time_mod
import traceback
import tty
ENDMARKER = b"\x00END\x00"
def lineiter(sock):
s = ""
while True:
sb = sock.recv(256)
if not sb:
return
s += sb.decode("utf-8")
i = s.find("\n")
if i != -1:
yield s[:i]
s = s[i + 1 :]
def spawn(unet, host, cmd):
if sys.stdin.isatty():
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
try:
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job
# control will not be enabled
p = unet.hosts[host].popen(
cmd,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True,
)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [], 0.25)
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
finally:
# restore tty settings back
if sys.stdin.isatty():
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
def doline(unet, line, writef):
def host_cmd_split(unet, cmd):
csplit = cmd.split()
for i, e in enumerate(csplit):
if e not in unet.hosts:
break
hosts = csplit[:i]
if not hosts:
hosts = sorted(unet.hosts.keys())
cmd = " ".join(csplit[i:])
return hosts, cmd
line = line.strip()
m = re.match(r"^(\S+)(?:\s+(.*))?$", line)
if not m:
return True
cmd = m.group(1)
oargs = m.group(2) if m.group(2) else ""
if cmd == "q" or cmd == "quit":
return False
if cmd == "hosts":
writef("%% hosts: %s\n" % " ".join(sorted(unet.hosts.keys())))
elif cmd in ["term", "vtysh", "xterm"]:
args = oargs.split()
if not args or (len(args) == 1 and args[0] == "*"):
args = sorted(unet.hosts.keys())
hosts = [unet.hosts[x] for x in args]
for host in hosts:
if cmd == "t" or cmd == "term":
host.run_in_window("bash")
elif cmd == "v" or cmd == "vtysh":
host.run_in_window("vtysh")
elif cmd == "x" or cmd == "xterm":
host.run_in_window("bash", forcex=True)
elif cmd == "sh":
hosts, cmd = host_cmd_split(unet, oargs)
for host in hosts:
if sys.stdin.isatty():
spawn(unet, host, cmd)
else:
if len(hosts) > 1:
writef("------ Host: %s ------\n" % host)
output = unet.hosts[host].cmd_legacy(cmd)
writef(output)
if len(hosts) > 1:
writef("------- End: %s ------\n" % host)
writef("\n")
elif cmd == "h" or cmd == "help":
writef(
"""
Commands:
help :: this help
sh [hosts] <shell-command> :: execute <shell-command> on <host>
term [hosts] :: open shell terminals for hosts
vtysh [hosts] :: open vtysh terminals for hosts
[hosts] <vtysh-command> :: execute vtysh-command on hosts\n\n"""
)
else:
hosts, cmd = host_cmd_split(unet, line)
for host in hosts:
if len(hosts) > 1:
writef("------ Host: %s ------\n" % host)
output = unet.hosts[host].cmd_legacy('vtysh -c "{}"'.format(cmd))
writef(output)
if len(hosts) > 1:
writef("------- End: %s ------\n" % host)
writef("\n")
return True
def cli_server_setup(unet):
sockdir = tempfile.mkdtemp("-sockdir", "pyt")
sockpath = os.path.join(sockdir, "cli-server.sock")
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10)
sock.bind(sockpath)
sock.listen(1)
return sock, sockdir, sockpath
except Exception:
unet.cmd_status("rm -rf " + sockdir)
raise
def cli_server(unet, server_sock):
sock, addr = server_sock.accept()
# Go into full non-blocking mode now
sock.settimeout(None)
for line in lineiter(sock):
line = line.strip()
def writef(x):
xb = x.encode("utf-8")
sock.send(xb)
if not doline(unet, line, writef):
return
sock.send(ENDMARKER)
def cli_client(sockpath, prompt="unet> "):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect(sockpath)
# Go into full non-blocking mode now
sock.settimeout(None)
print("\n--- Micronet CLI Starting ---\n\n")
while True:
if sys.version_info[0] == 2:
line = raw_input(prompt) # pylint: disable=E0602
else:
line = input(prompt)
if line is None:
return
# Need to put \n back
line += "\n"
# Send the CLI command
sock.send(line.encode("utf-8"))
def bendswith(b, sentinel):
slen = len(sentinel)
return len(b) >= slen and b[-slen:] == sentinel
# Collect the output
rb = b""
while not bendswith(rb, ENDMARKER):
lb = sock.recv(4096)
if not lb:
return
rb += lb
# Remove the marker
rb = rb[:-len(ENDMARKER)]
# Write the output
sys.stdout.write(rb.decode("utf-8"))
def local_cli(unet, outf, prompt="unet> "):
print("\n--- Micronet CLI Starting ---\n\n")
while True:
if sys.version_info[0] == 2:
line = raw_input(prompt) # pylint: disable=E0602
else:
line = input(prompt)
if line is None:
return
if not doline(unet, line, outf.write):
return
def cli(unet, histfile=None, sockpath=None, force_window=False, title=None, prompt=None, background=True):
if prompt is None:
prompt = "unet> "
if force_window or not sys.stdin.isatty():
# Run CLI in another window b/c we have no tty.
sock, sockdir, sockpath = cli_server_setup(unet)
python_path = unet.get_exec_path(["python3", "python"])
us = os.path.realpath(__file__)
cmd = "{} {}".format(python_path, us)
if histfile:
cmd += " --histfile=" + histfile
if title:
cmd += " --prompt={}".format(title)
cmd += " " + sockpath
try:
unet.run_in_window(cmd, new_window=True, title=title, background=background)
return cli_server(unet, sock)
finally:
unet.cmd_status("rm -rf " + sockdir)
if not unet:
logger.debug("client-cli using sockpath %s", sockpath)
try:
if histfile is None:
histfile = os.path.expanduser("~/.micronet-history.txt")
if not os.path.exists(histfile):
if unet:
unet.cmd("touch " + histfile)
else:
subprocess.run("touch " + histfile)
if histfile:
readline.read_history_file(histfile)
except Exception:
pass
try:
if sockpath:
cli_client(sockpath, prompt=prompt)
else:
local_cli(unet, sys.stdout, prompt=prompt)
except EOFError:
pass
except Exception as ex:
logger.critical("cli: got exception: %s", ex, exc_info=True)
raise
finally:
readline.write_history_file(histfile)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, filename="/tmp/topotests/cli-client.log")
logger = logging.getLogger("cli-client")
logger.info("Start logging cli-client")
parser = argparse.ArgumentParser()
parser.add_argument("--histfile", help="file to user for history")
parser.add_argument("--prompt-text", help="prompt string to use")
parser.add_argument("socket", help="path to pair of sockets to communicate over")
args = parser.parse_args()
prompt = "{}> ".format(args.prompt_text) if args.prompt_text else "unet> "
cli(None, args.histfile, args.socket, prompt=prompt)

View File

@ -0,0 +1,379 @@
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
#
# July 11 2021, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2021, LabN Consulting, L.L.C
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; see the file COPYING; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
import logging
import os
import subprocess
import traceback
import glob
import logging
import os
import signal
import time
from lib.micronet import LinuxNamespace, Micronet
from lib.micronet_cli import cli
def get_pids_with_env(has_var, has_val=None):
result = {}
for pidenv in glob.iglob("/proc/*/environ"):
pid = pidenv.split("/")[2]
with open(pidenv, "rb") as rfb:
envlist = [x.decode("utf-8").split("=", 1) for x in rfb.read().split(b"\0")]
envlist = [[x[0], ""] if len(x) == 1 else x for x in envlist]
envdict = dict(envlist)
if has_var not in envdict:
continue
if has_val is None:
result[pid] = envdict
elif envdict[has_var] == str(has_val):
result[pid] = envdict
return result
def _kill_piddict(pids_by_upid, sig):
for upid, pids in pids_by_upid:
logging.info(
"Sending %s to (%s) of micronet pid %s", sig, ", ".join(pids), upid
)
for pid in pids:
try:
os.kill(int(pid), sig)
except Exception:
pass
def _get_our_pids():
ourpid = str(os.getpid())
piddict = get_pids_with_env("MICRONET_PID", ourpid)
pids = [x for x in piddict if x != ourpid]
if pids:
return {ourpid: pids}
return {}
def _get_other_pids():
piddict = get_pids_with_env("MICRONET_PID")
unet_pids = {d["MICRONET_PID"] for d in piddict.values()}
pids_by_upid = {p: set() for p in unet_pids}
for pid, envdict in piddict.items():
pids_by_upid[envdict["MICRONET_PID"]].add(pid)
# Filter out any child pid sets whos micronet pid is still running
return {x: y for x, y in pids_by_upid.items() if x not in y}
def _get_pids_by_upid(ours):
if ours:
return _get_our_pids()
return _get_other_pids()
def _cleanup_pids(ours):
pids_by_upid = _get_pids_by_upid(ours).items()
if not pids_by_upid:
return
_kill_piddict(pids_by_upid, signal.SIGTERM)
# Give them 5 second to exit cleanly
logging.info("Waiting up to 5s to allow for clean exit of abandon'd pids")
for _ in range(0, 5):
pids_by_upid = _get_pids_by_upid(ours).items()
if not pids_by_upid:
return
time.sleep(1)
pids_by_upid = _get_pids_by_upid(ours).items()
_kill_piddict(pids_by_upid, signal.SIGKILL)
def cleanup_current():
"""Attempt to cleanup preview runs.
Currently this only scans for old processes.
"""
logging.info("reaping current micronet processes")
_cleanup_pids(True)
def cleanup_previous():
"""Attempt to cleanup preview runs.
Currently this only scans for old processes.
"""
logging.info("reaping past micronet processes")
_cleanup_pids(False)
class Node(LinuxNamespace):
"""Node (mininet compat)."""
def __init__(self, name, **kwargs):
"""
Create a Node.
"""
self.params = kwargs
if "private_mounts" in kwargs:
private_mounts = kwargs["private_mounts"]
else:
private_mounts = kwargs.get("privateDirs", [])
logger = kwargs.get("logger")
super(Node, self).__init__(name, logger=logger, private_mounts=private_mounts)
def cmd(self, cmd, **kwargs):
"""Execute a command, joins stdout, stderr, ignores exit status."""
return super(Node, self).cmd_legacy(cmd, **kwargs)
def config(self, lo="up", **params):
"""Called by Micronet when topology is built (but not started)."""
# mininet brings up loopback here.
del params
del lo
def intfNames(self):
return self.intfs
def terminate(self):
return
class Topo(object): # pylint: disable=R0205
"""
Topology object passed to Micronet to build actual topology.
"""
def __init__(self, *args, **kwargs):
self.params = kwargs
self.name = kwargs["name"] if "name" in kwargs else "unnamed"
self.tgen = kwargs["tgen"] if "tgen" in kwargs else None
self.logger = logging.getLogger(__name__ + ".topo")
self.logger.debug("%s: Creating", self)
self.nodes = {}
self.hosts = {}
self.switches = {}
self.links = {}
# if "no_init_build" in kwargs and kwargs["no_init_build"]:
# return
# This needs to move outside of here. Current tests count on it being called on init;
# however, b/c of this there is lots of twisty logic to support topogen based tests where
# the build routine must get get_topogen() so topogen can then set it's topogen.topo to the
# class it's in the process of instantiating (this one) b/c build will use topogen before
# the instantiation completes.
self.build(*args, **kwargs)
def __str__(self):
return "Topo({})".format(self.name)
def build(self, *args, **kwargs):
"Overriden by real class"
del args
del kwargs
raise NotImplementedError("Needs to be overriden")
def addHost(self, name, **kwargs):
self.logger.debug("%s: addHost %s", self, name)
self.nodes[name] = dict(**kwargs)
self.hosts[name] = self.nodes[name]
return name
addNode = addHost
def addSwitch(self, name, **kwargs):
self.logger.debug("%s: addSwitch %s", self, name)
self.nodes[name] = dict(**kwargs)
if "cls" in self.nodes[name]:
self.logger.warning("Overriding Bridge class with micronet.Bridge")
del self.nodes[name]["cls"]
self.switches[name] = self.nodes[name]
return name
def addLink(self, name1, name2, **kwargs):
"""Link up switch and a router.
possible kwargs:
- intfName1 :: switch-side interface name - sometimes missing
- intfName2 :: router-side interface name
- addr1 :: switch-side MAC used by test_ldp_topo1 only
- addr2 :: router-side MAC used by test_ldp_topo1 only
"""
if1 = (
kwargs["intfName1"]
if "intfName1" in kwargs
else "{}-{}".format(name1, name2)
)
if2 = (
kwargs["intfName2"]
if "intfName2" in kwargs
else "{}-{}".format(name2, name1)
)
self.logger.debug("%s: addLink %s %s if1: %s if2: %s", self, name1, name2, if1, if2)
if name1 in self.switches:
assert name2 in self.hosts
swname, rname = name1, name2
elif name2 in self.switches:
assert name1 in self.hosts
swname, rname = name2, name1
if1, if2 = if2, if1
else:
# p2p link
assert name1 in self.hosts
assert name2 in self.hosts
swname, rname = name1, name2
if swname not in self.links:
self.links[swname] = {}
if rname not in self.links[swname]:
self.links[swname][rname] = set()
self.links[swname][rname].add((if1, if2))
class Mininet(Micronet):
"""
Mininet using Micronet.
"""
g_mnet_inst = None
def __init__(self, controller=None, topo=None):
"""
Create a Micronet.
"""
assert not controller
if Mininet.g_mnet_inst is not None:
Mininet.g_mnet_inst.stop()
Mininet.g_mnet_inst = self
self.configured_hosts = set()
self.host_params = {}
self.prefix_len = 8
# SNMPd used to require this, which was set int he mininet shell
# that all commands executed from. This is goofy default so let's not
# do it if we don't have to. The snmpd.conf files have been updated
# to set permissions to root:frr 770 to make this unneeded in that case
# os.umask(0)
super(Mininet, self).__init__()
self.logger.debug("%s: Creating", self)
if topo and topo.hosts:
self.logger.debug("Adding hosts: %s", topo.hosts.keys())
for name in topo.hosts:
self.add_host(name, **topo.hosts[name])
if topo and topo.switches:
self.logger.debug("Adding switches: %s", topo.switches.keys())
for name in topo.switches:
self.add_switch(name, **topo.switches[name])
if topo and topo.links:
self.logger.debug("Adding links: ")
for swname in sorted(topo.links):
for rname in sorted(topo.links[swname]):
for link in topo.links[swname][rname]:
self.add_link(swname, rname, link[0], link[1])
if topo:
# Now that topology is built, configure hosts
self.configure_hosts()
def __str__(self):
return "Mininet()"
def configure_hosts(self):
"""
Configure hosts once the topology has been built.
This function can be called multiple times if routers are added to the topology
later.
"""
if not self.hosts:
return
self.logger.debug("Configuring hosts: %s", self.hosts.keys())
for name in sorted(self.hosts.keys()):
if name in self.configured_hosts:
continue
host = self.hosts[name]
first_intf = host.intfs[0] if host.intfs else None
params = self.host_params[name]
if first_intf and "ip" in params:
ip = params["ip"]
i = ip.find("/")
if i == -1:
plen = self.prefix_len
else:
plen = int(ip[i + 1 :])
ip = ip[:i]
host.cmd_raises("ip addr add {}/{} dev {}".format(ip, plen, first_intf))
if "defaultRoute" in params:
host.cmd_raises(
"ip route add default {}".format(params["defaultRoute"])
)
host.config()
self.configured_hosts.add(name)
def add_host(self, name, cls=Node, **kwargs):
"""Add a host to micronet."""
self.host_params[name] = kwargs
super(Mininet, self).add_host(name, cls=cls, **kwargs)
def start(self):
"""Start the micronet topology."""
self.logger.debug("%s: Starting (no-op).", self)
def stop(self):
"""Stop the mininet topology (deletes)."""
self.logger.debug("%s: Stopping (deleting).", self)
self.delete()
self.logger.debug("%s: Stopped (deleted).", self)
if Mininet.g_mnet_inst == self:
Mininet.g_mnet_inst = None
def cli(self):
cli(self)