systemd/debian/tests/storage
Dan Streetman 02df3e5aa4 d/t/storage: change plaintext_name to include testname
Setting the plaintext name to include the testname makes debugging failures
easier.

Also for convenience, create service_name field in setup method, for use
in tests and/or teardown method.
2019-07-24 14:15:09 -04:00

251 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python3
# systemd integration test: Handling of storage devices
# (C) 2015 Canonical Ltd.
# Author: Martin Pitt <martin.pitt@ubuntu.com>
import os
import sys
import unittest
import subprocess
import time
import random
from glob import glob
@unittest.skipIf(os.path.isdir('/sys/module/scsi_debug'),
'The scsi_debug module is already loaded')
class FakeDriveTestBase(unittest.TestCase):
@classmethod
def setUpClass(klass):
# create a fake SCSI hard drive
subprocess.check_call(['modprobe', 'scsi_debug', 'dev_size_mb=32'])
# wait until drive got created
sys_dirs = []
while not sys_dirs:
sys_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
time.sleep(0.1)
assert len(sys_dirs) == 1
devs = os.listdir(sys_dirs[0])
assert len(devs) == 1
klass.device = '/dev/' + devs[0]
@classmethod
def tearDownClass(klass):
# create a fake SCSI hard drive
subprocess.check_call(['rmmod', 'scsi_debug'])
def tearDown(self):
# clear drive
with open(self.device, 'wb') as f:
block = b'0' * 1048576
try:
while True:
f.write(block)
except OSError:
pass
subprocess.check_call(['udevadm', 'settle'])
subprocess.check_call(['systemctl', 'daemon-reload'])
class CryptsetupTest(FakeDriveTestBase):
def setUp(self):
testname = self.id().split('.')[-1]
self.plaintext_name = 'testcrypt_%s' % testname
self.plaintext_dev = '/dev/mapper/' + self.plaintext_name
self.service_name = 'systemd-cryptsetup@%s.service' % self.plaintext_name
if os.path.exists(self.plaintext_dev):
self.fail('%s exists already' % self.plaintext_dev)
super().setUp()
if os.path.exists('/etc/crypttab'):
os.rename('/etc/crypttab', '/etc/crypttab.systemdtest')
self.password = 'pwd%i' % random.randint(1000, 10000)
self.password_agent = None
def tearDown(self):
if self.password_agent:
os.kill(self.password_agent, 9)
os.waitpid(self.password_agent, 0)
self.password_agent = None
subprocess.call(['umount', self.plaintext_dev], stderr=subprocess.DEVNULL)
subprocess.call(['systemctl', 'start', '--no-ask-password', self.service_name],
stderr=subprocess.STDOUT)
subprocess.call(['systemctl', 'stop', self.service_name],
stderr=subprocess.STDOUT)
if os.path.exists('/etc/crypttab'):
os.unlink('/etc/crypttab')
if os.path.exists('/etc/crypttab.systemdtest'):
os.rename('/etc/crypttab.systemdtest', '/etc/crypttab')
if os.path.exists(self.plaintext_dev):
subprocess.call(['dmsetup', 'remove', self.plaintext_dev],
stderr=subprocess.STDOUT)
super().tearDown()
def format_luks(self):
'''Format test device with LUKS'''
p = subprocess.Popen(['cryptsetup', '--batch-mode', 'luksFormat', self.device, '-'],
stdin=subprocess.PIPE)
p.communicate(self.password.encode())
self.assertEqual(p.returncode, 0)
os.sync()
subprocess.check_call(['udevadm', 'settle'])
def start_password_agent(self):
'''Run password agent to answer passphrase request for crypt device'''
pid = os.fork()
if pid > 0:
self.password_agent = pid
return
# wait for incoming request
found = False
while not found:
for ask in glob('/run/systemd/ask-password/ask.*'):
with open(ask) as f:
contents = f.read()
if 'disk scsi_debug' in contents and self.plaintext_name in contents:
found = True
break
if not found:
time.sleep(0.5)
# parse Socket=
for line in contents.splitlines():
if line.startswith('Socket='):
socket = line.split('=', 1)[1]
break
# send reply
p = subprocess.Popen(['/lib/systemd/systemd-reply-password', '1', socket],
stdin=subprocess.PIPE)
p.communicate(self.password.encode())
assert p.returncode == 0
os._exit(0)
def apply(self, target):
'''Tell systemd to generate and run the cryptsetup units'''
subprocess.check_call(['systemctl', 'daemon-reload'])
self.start_password_agent()
subprocess.check_call(['systemctl', '--no-ask-password', 'restart', target])
for timeout in range(50):
if os.path.exists(self.plaintext_dev):
break
time.sleep(0.1)
else:
self.fail('timed out for %s to appear' % self.plaintext_dev)
def test_luks_by_devname(self):
'''LUKS device by plain device name, empty'''
self.format_luks()
with open('/etc/crypttab', 'w') as f:
f.write('%s %s none luks\n' % (self.plaintext_name, self.device))
self.apply('cryptsetup.target')
# should not be mounted
with open('/proc/mounts') as f:
self.assertNotIn(self.plaintext_name, f.read())
# device should not have anything on it
p = subprocess.Popen(['blkid', self.plaintext_dev], stdout=subprocess.PIPE)
out = p.communicate()[0]
self.assertEqual(out, b'')
self.assertNotEqual(p.returncode, 0)
def test_luks_by_uuid(self):
'''LUKS device by UUID, empty'''
self.format_luks()
uuid = subprocess.check_output(['blkid', '-ovalue', '-sUUID', self.device],
universal_newlines=True).strip()
with open('/etc/crypttab', 'w') as f:
f.write('%s UUID=%s none luks\n' % (self.plaintext_name, uuid))
self.apply('cryptsetup.target')
# should not be mounted
with open('/proc/mounts') as f:
self.assertNotIn(self.plaintext_name, f.read())
# device should not have anything on it
p = subprocess.Popen(['blkid', self.plaintext_dev], stdout=subprocess.PIPE)
out = p.communicate()[0]
self.assertEqual(out, b'')
self.assertNotEqual(p.returncode, 0)
def test_luks_swap(self):
'''LUKS device with "swap" option'''
self.format_luks()
with open('/etc/crypttab', 'w') as f:
f.write('%s %s none luks,swap\n' % (self.plaintext_name, self.device))
self.apply('cryptsetup.target')
# should not be mounted
with open('/proc/mounts') as f:
self.assertNotIn(self.plaintext_name, f.read())
# device should be formatted with swap
out = subprocess.check_output(['blkid', '-ovalue', '-sTYPE', self.plaintext_dev])
self.assertEqual(out, b'swap\n')
def test_luks_tmp(self):
'''LUKS device with "tmp" option'''
self.format_luks()
with open('/etc/crypttab', 'w') as f:
f.write('%s %s none luks,tmp\n' % (self.plaintext_name, self.device))
self.apply('cryptsetup.target')
# should not be mounted
with open('/proc/mounts') as f:
self.assertNotIn(self.plaintext_name, f.read())
# device should be formatted with ext2
out = subprocess.check_output(['blkid', '-ovalue', '-sTYPE', self.plaintext_dev])
self.assertEqual(out, b'ext2\n')
def test_luks_fstab(self):
'''LUKS device in /etc/fstab'''
self.format_luks()
with open('/etc/crypttab', 'w') as f:
f.write('%s %s none luks,tmp\n' % (self.plaintext_name, self.device))
mountpoint = '/run/crypt1.systemdtest'
os.mkdir(mountpoint)
self.addCleanup(os.rmdir, mountpoint)
os.rename('/etc/fstab', '/etc/fstab.systemdtest')
self.addCleanup(os.rename, '/etc/fstab.systemdtest', '/etc/fstab')
with open('/etc/fstab', 'a') as f:
with open('/etc/fstab.systemdtest') as forig:
f.write(forig.read())
f.write('%s %s ext2 defaults 0 0\n' % (self.plaintext_dev, mountpoint))
# this should now be a requirement of local-fs.target
self.apply('local-fs.target')
# should be mounted
found = False
with open('/proc/mounts') as f:
for line in f:
fields = line.split()
if fields[0] == self.plaintext_dev:
self.assertEqual(fields[1], mountpoint)
self.assertEqual(fields[2], 'ext2')
found = True
break
if not found:
self.fail('%s is not mounted' % self.plaintext_dev)
if __name__ == '__main__':
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
verbosity=2))