systemd/debian/tests/boot-and-services
Martin Pitt 92bc4b1755 boot-and-services autopkgtest: Fix SeccompTest.test_failing
We need to reset scfail.service which we expect to fail. Otherwise
ServicesTest.test_no_failed fails on that when it runs later on.
2015-06-10 13:59:15 +02:00

440 lines
16 KiB
Python
Executable File

#!/usr/bin/python3
# autopkgtest check: Boot with systemd and check critical desktop services
# (C) 2014 Canonical Ltd.
# Author: Martin Pitt <martin.pitt@ubuntu.com>
import sys
import os
import unittest
import subprocess
import tempfile
import shutil
import time
from glob import glob
class ServicesTest(unittest.TestCase):
'''Check that expected services are running'''
def test_0_init(self):
'''Verify that init is systemd'''
self.assertIn('systemd', os.readlink('/proc/1/exe'))
def test_no_failed(self):
'''No failed units'''
out = subprocess.check_output(['systemctl', '--failed', '--no-legend'],
universal_newlines=True)
failed = out.splitlines()
# FIXME: ignore /etc/modules failure as stuff that we put there by
# default often fails
failed = [f for f in failed if 'systemd-modules-load' not in f]
# apparmor fails if not enabled in the kernel
if not os.path.exists('/sys/kernel/security/apparmor'):
failed = [f for f in failed if 'apparmor.service' not in f]
self.assertEqual(failed, [])
def test_lightdm(self):
out = subprocess.check_output(['ps', 'u', '-C', 'lightdm'])
self.assertIn(b'lightdm --session', out)
out = subprocess.check_output(['ps', 'u', '-C', 'Xorg'])
self.assertIn(b':0', out)
self.active_unit('lightdm')
def test_dbus(self):
out = subprocess.check_output(
['dbus-send', '--print-reply', '--system',
'--dest=org.freedesktop.DBus', '/', 'org.freedesktop.DBus.GetId'])
self.assertIn(b'string "', out)
self.active_unit('dbus')
def test_network_manager(self):
# 0.9.10 changed the command name
_help = subprocess.check_output(['nmcli', '--help'],
stderr=subprocess.STDOUT)
if b' g[eneral]' in _help:
out = subprocess.check_output(['nmcli', 'general'])
else:
out = subprocess.check_output(['nmcli', 'nm'])
self.assertIn(b'enabled', out)
self.active_unit('network-manager')
def test_cron(self):
out = subprocess.check_output(['ps', 'u', '-C', 'cron'])
self.assertIn(b'root', out)
self.active_unit('cron')
def test_logind(self):
out = subprocess.check_output(['loginctl'])
self.assertNotEqual(b'', out)
self.active_unit('systemd-logind')
def test_rsyslog(self):
out = subprocess.check_output(['ps', 'u', '-C', 'rsyslogd'])
self.assertIn(b'bin/rsyslogd', out)
self.active_unit('rsyslog')
with open('/var/log/syslog') as f:
log = f.read()
# has kernel messages
self.assertRegex(log, 'kernel:.*[cC]ommand line:')
# has init messages
self.assertRegex(log, 'systemd.*Reached target Default')
# has other services
self.assertRegex(log, 'NetworkManager.*:')
def test_udev(self):
out = subprocess.check_output(['udevadm', 'info', '--export-db'])
self.assertIn(b'\nP: /devices/', out)
self.active_unit('systemd-udevd')
def test_tmp_mount(self):
# check if we want to mount /tmp in fstab
want_tmp_mount = False
with open('/etc/fstab') as f:
for l in f:
try:
if not l.startswith('#') and l.split()[1] in ('/tmp', '/tmp/'):
want_tmp_mount = True
break
except IndexError:
pass
# ensure that we actually do/don't have a /tmp mount
(status, status_out) = subprocess.getstatusoutput('systemctl status tmp.mount')
findmnt = subprocess.call(['findmnt', '-n', '/tmp'], stdout=subprocess.PIPE)
if want_tmp_mount:
self.assertEqual(status, 0, status_out)
self.assertEqual(findmnt, 0)
else:
self.assertEqual(status, 3, status_out)
self.assertNotEqual(findmnt, 0)
def test_tmp_cleanup(self):
# systemd-tmpfiles-clean.timer only runs 15 mins after boot, shortcut
# it
self.assertEqual(subprocess.call(
['systemctl', 'status', 'systemd-tmpfiles-clean.timer'],
stdout=subprocess.PIPE), 0)
subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean'])
# all files in /tmp/ should get cleaned up on boot
self.assertFalse(os.path.exists('/tmp/oldfile.test'))
self.assertFalse(os.path.exists('/tmp/newfile.test'))
# files in /var/tmp/ older than 30d should get cleaned up
# XXX FIXME: /var/tmp/ cleanup was disabled in #675422
# self.assertFalse(os.path.exists('/var/tmp/oldfile.test'))
self.assertTrue(os.path.exists('/var/tmp/newfile.test'))
# next run should leave the recent ones
os.close(os.open('/tmp/newfile.test',
os.O_CREAT | os.O_EXCL | os.O_WRONLY))
subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean'])
time.sleep(2)
self.assertTrue(os.path.exists('/tmp/newfile.test'))
# Helper methods
def active_unit(self, unit):
'''Check that given unit is active'''
out = subprocess.check_output(['systemctl', 'status', unit])
self.assertIn(b'active (running)', out)
class JournalTest(unittest.TestCase):
'''Check journal functionality'''
def test_no_options(self):
out = subprocess.check_output(['journalctl'])
# has kernel messages
self.assertRegex(out, b'kernel:.*[cC]ommand line:')
# has init messages
self.assertRegex(out, b'systemd.*Reached target Default')
# has other services
self.assertRegex(out, b'NetworkManager.*:.*starting')
def test_log_for_service(self):
out = subprocess.check_output(
['journalctl', '_SYSTEMD_UNIT=NetworkManager.service'])
self.assertRegex(out, b'NetworkManager.*:.*starting')
self.assertNotIn(b'kernel:', out)
self.assertNotIn(b'systemd:', out)
class NspawnTest(unittest.TestCase):
'''Check nspawn'''
@classmethod
def setUpClass(kls):
'''Build a bootable busybox mini-container'''
kls.td_c_busybox = tempfile.TemporaryDirectory(prefix='c_busybox.')
kls.c_busybox = kls.td_c_busybox.name
for d in ['etc/init.d', 'bin', 'sbin']:
os.makedirs(os.path.join(kls.c_busybox, d))
shutil.copy('/bin/busybox', os.path.join(kls.c_busybox, 'bin'))
shutil.copy('/etc/os-release', os.path.join(kls.c_busybox, 'etc'))
os.symlink('busybox', os.path.join(kls.c_busybox, 'bin', 'sh'))
os.symlink('../bin/busybox', os.path.join(kls.c_busybox, 'sbin/init'))
with open(os.path.join(kls.c_busybox, 'etc/init.d/rcS'), 'w') as f:
f.write('''#!/bin/sh
echo fake container started
ps aux
poweroff\n''')
os.fchmod(f.fileno(), 0o755)
subprocess.check_call(['systemd-machine-id-setup', '--root',
kls.c_busybox], stderr=subprocess.PIPE)
def setUp(self):
self.workdir = tempfile.TemporaryDirectory()
def test_boot(self):
cont = os.path.join(self.workdir.name, 'c1')
shutil.copytree(self.c_busybox, cont, symlinks=True)
os.sync()
nspawn = subprocess.Popen(['systemd-nspawn', '-D', cont, '-b'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = nspawn.communicate(timeout=60)[0]
self.assertIn(b'Spawning container c1', out)
self.assertIn(b'fake container started', out)
self.assertRegex(out, b'\n\s+1\s+0\s+init[\r\n]')
self.assertRegex(out, b'\n\s+2+\s+0\s.*rcS[\r\n]')
self.assertRegex(out, b'Container c1.*shut down')
self.assertEqual(nspawn.returncode, 0)
def test_service(self):
self.assertTrue(os.path.isdir('/var/lib/machines'))
cont = '/var/lib/machines/c1'
shutil.copytree(self.c_busybox, cont, symlinks=True)
self.addCleanup(shutil.rmtree, cont)
os.sync()
subprocess.check_call(['systemctl', 'start', 'systemd-nspawn@c1'])
time.sleep(5)
systemctl = subprocess.Popen(
['systemctl', 'status', '-l', 'systemd-nspawn@c1'],
stdout=subprocess.PIPE)
out = systemctl.communicate()[0]
self.assertEqual(systemctl.returncode, 3, out)
self.assertIn(b'Requesting system poweroff', out)
@unittest.skipUnless(os.path.exists('/sys/kernel/security/apparmor'),
'AppArmor not enabled')
class AppArmorTest(unittest.TestCase):
def test_profile(self):
'''AppArmor confined unit'''
# create AppArmor profile
aa_profile = tempfile.NamedTemporaryFile(prefix='aa_violator.')
aa_profile.write(b'''#include <tunables/global>
profile "violator-test" {
#include <abstractions/base>
/bin/** rix,
/etc/machine-id r,
}
''')
aa_profile.flush()
subprocess.check_call(['apparmor_parser', '-r', '-v', aa_profile.name])
# create confined unit
with open('/run/systemd/system/violator.service', 'w') as f:
f.write('''[Unit]
Description=AppArmor test
[Service]
ExecStart=/bin/sh -euc 'echo CP1; cat /etc/machine-id; echo CP2; if cat /etc/passwd; then exit 1; fi; echo CP3'
AppArmorProfile=violator-test
''')
self.addCleanup(os.unlink, '/run/systemd/system/violator.service')
# launch
subprocess.check_call(['systemctl', 'daemon-reload'])
subprocess.check_call(['systemctl', 'start', 'violator.service'])
time.sleep(2)
# check status
st = subprocess.Popen(['systemctl', 'status', '-l',
'violator.service'], stdout=subprocess.PIPE,
universal_newlines=True)
out = st.communicate()[0]
# unit should be stopped
self.assertEqual(st.returncode, 3)
self.assertIn('inactive', out)
self.assertIn('CP1', out)
self.assertIn('CP2', out)
self.assertIn('CP3', out)
with open('/etc/machine-id') as f:
self.assertIn(f.read().strip(), out)
self.assertNotIn('root:x', out, 'unit can read /etc/passwd')
class CgroupsTest(unittest.TestCase):
'''Check cgroup setup'''
@classmethod
def setUpClass(kls):
kls.controllers = []
for controller in glob('/sys/fs/cgroup/*'):
if not os.path.islink(controller):
kls.controllers.append(controller)
def setUp(self):
self.service = 'testsrv.service'
self.service_file = '/run/systemd/system/' + self.service
def tearDown(self):
subprocess.call(['systemctl', 'stop', self.service],
stderr=subprocess.PIPE)
try:
os.unlink(self.service_file)
except OSError:
pass
subprocess.check_call(['systemctl', 'daemon-reload'])
def create_service(self, extra_service=''):
'''Create test service unit'''
with open(self.service_file, 'w') as f:
f.write('''[Unit]
Description=test service
[Service]
ExecStart=/bin/sleep 500
%s
''' % extra_service)
subprocess.check_call(['systemctl', 'daemon-reload'])
def assertNoControllers(self):
'''Assert that no cgroup controllers exist for test service'''
cs = glob('/sys/fs/cgroup/*/system.slice/%s' % self.service)
self.assertEqual(cs, [])
def assertController(self, name):
'''Assert that cgroup controller exists for test service'''
c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service)
self.assertTrue(os.path.isdir(c))
def assertNoController(self, name):
'''Assert that cgroup controller does not exist for test service'''
c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service)
self.assertFalse(os.path.isdir(c))
def test_simple(self):
'''simple service'''
self.create_service()
self.assertNoControllers()
subprocess.check_call(['systemctl', 'start', self.service])
self.assertController('systemd')
subprocess.check_call(['systemctl', 'stop', self.service])
self.assertNoControllers()
def test_cpushares(self):
'''service with CPUShares'''
self.create_service('CPUShares=1000')
self.assertNoControllers()
subprocess.check_call(['systemctl', 'start', self.service])
self.assertController('systemd')
self.assertController('cpu,cpuacct')
subprocess.check_call(['systemctl', 'stop', self.service])
self.assertNoControllers()
def test_custom_cgroup_cleanup(self):
'''cgroup cleanup does not touch manually created cgroups'''
# reproduces https://bugs.debian.org/777601
self.create_service()
os.mkdir('/sys/fs/cgroup/blkio/aux')
os.mkdir('/sys/fs/cgroup/perf_event/aux')
self.addCleanup(os.rmdir, '/sys/fs/cgroup/blkio/aux')
self.addCleanup(os.rmdir, '/sys/fs/cgroup/perf_event/aux')
subprocess.check_call(['systemctl', 'start', self.service])
self.assertController('systemd')
self.assertTrue(os.path.exists('/sys/fs/cgroup/blkio/aux'))
self.assertTrue(os.path.exists('/sys/fs/cgroup/perf_event/aux'))
subprocess.check_call(['systemctl', 'daemon-reload'])
time.sleep(1)
subprocess.check_call(['systemctl', 'restart', self.service])
time.sleep(1)
self.assertTrue(os.path.exists('/sys/fs/cgroup/blkio/aux'))
self.assertTrue(os.path.exists('/sys/fs/cgroup/perf_event/aux'))
subprocess.check_call(['systemctl', 'stop', self.service])
self.assertNoControllers()
self.assertTrue(os.path.exists('/sys/fs/cgroup/blkio/aux'))
self.assertTrue(os.path.exists('/sys/fs/cgroup/perf_event/aux'))
@unittest.skipUnless('+SECCOMP' in subprocess.getoutput('systemctl --version'),
'seccomp support not enabled')
class SeccompTest(unittest.TestCase):
'''Check seccomp syscall filtering'''
def test_failing(self):
with open('/run/systemd/system/scfail.service', 'w') as f:
f.write('''[Unit]
Description=seccomp test
[Service]
ExecStart=/bin/cat /etc/machine-id
SystemCallFilter=access
''')
self.addCleanup(os.unlink, '/run/systemd/system/scfail.service')
# launch
subprocess.check_call(['systemctl', 'daemon-reload'])
subprocess.check_call(['systemctl', 'start', 'scfail.service'])
time.sleep(2)
# check status
st = subprocess.Popen(['systemctl', 'status', '-l',
'scfail.service'], stdout=subprocess.PIPE)
out = st.communicate()[0]
# unit should be stopped
self.assertEqual(st.returncode, 3)
subprocess.check_call(['systemctl', 'reset-failed', 'scfail.service'])
self.assertIn(b'failed', out)
self.assertIn(b'code=killed, signal=SYS', out)
with open('/etc/machine-id') as f:
self.assertNotIn(f.read().strip().encode('ASCII'), out)
def pre_boot_setup():
'''Test setup before rebooting testbed'''
# create a few temporary files to ensure that they get cleaned up on boot
os.close(os.open('/tmp/newfile.test',
os.O_CREAT | os.O_EXCL | os.O_WRONLY))
os.close(os.open('/var/tmp/newfile.test',
os.O_CREAT | os.O_EXCL | os.O_WRONLY))
# we can't use utime() here, as systemd looks for ctime
cur_time = time.clock_gettime(time.CLOCK_REALTIME)
time.clock_settime(time.CLOCK_REALTIME, cur_time - 2 * 30 * 86400)
try:
os.close(os.open('/tmp/oldfile.test',
os.O_CREAT | os.O_EXCL | os.O_WRONLY))
os.close(os.open('/var/tmp/oldfile.test',
os.O_CREAT | os.O_EXCL | os.O_WRONLY))
finally:
time.clock_settime(time.CLOCK_REALTIME, cur_time)
if __name__ == '__main__':
if not os.getenv('ADT_REBOOT_MARK'):
pre_boot_setup()
print('Rebooting...')
subprocess.check_call(['/tmp/autopkgtest-reboot', 'boot1'])
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
verbosity=2))