Drop sys-usb support

sys-usb will use the Qubes OS updates proxy and an in-VM fwupd instance.
This allows shedding a large amount of code.
This commit is contained in:
Demi Marie Obenour 2022-11-11 01:03:59 -05:00 committed by Richard Hughes
parent 0bbdc8a0b6
commit ff1f3677d9
7 changed files with 64 additions and 932 deletions

View File

@ -123,7 +123,7 @@ qvm-copy fwupd-qubes-vm-whonix-<ver>_amd64.deb
# qubes-dom0-update gcab fwupd python36
```
* Make sure that sys-firewall, sys-whonix, and sys-usb (if exists) are running.
* Make sure that sys-firewall and sys-whonix are running.
* Compare the SHA sums of the package in dom0 and qubes-builder VM.
If they match, install the package:
@ -132,7 +132,7 @@ qvm-copy fwupd-qubes-vm-whonix-<ver>_amd64.deb
# rpm -U qubes-fwupd-dom0-0.2.0-1.fc32.x86_64.rpm
```
* Reboot system (or reboot sys-firewall, sys-whonix, and sys-usb)
* Reboot system (or reboot sys-firewall and sys-whonix)
* Run the tests to verify the installation process

View File

@ -28,7 +28,6 @@ install_data([
install_data([
'src/vms/fwupd_common_vm.py',
'src/vms/fwupd_download_updates.py',
'src/vms/fwupd_usbvm_validate.py',
],
install_dir: 'libexec/qubes-fwupd',
install_mode: 'rwxrwxr-x',

View File

@ -240,12 +240,8 @@ class FwupdReceiveUpdates:
self._jcat_verification(self.metadata_file_jcat, FWUPD_DOM0_METADATA_DIR)
os.umask(self.old_umask)
def clean_cache(self, usbvm=False):
"""Removes updates data
Keyword arguments:
usbvm -- usbvm support flag
"""
def clean_cache(self):
"""Removes updates data"""
print("Cleaning dom0 cache directories")
if os.path.exists(FWUPD_DOM0_METADATA_DIR):
shutil.rmtree(FWUPD_DOM0_METADATA_DIR)
@ -253,6 +249,3 @@ class FwupdReceiveUpdates:
shutil.rmtree(FWUPD_DOM0_UPDATES_DIR)
if os.path.exists(HEADS_UPDATES_DIR):
shutil.rmtree(HEADS_UPDATES_DIR)
if usbvm:
print("Cleaning usbvm cache directories")
self._clean_usbvm()

View File

@ -35,10 +35,6 @@ run_cmd = (
)
def usbvm_run(args, **kwargs):
return subprocess.check_call((*run_cmd, *args), **kwargs)
def run_in_tty(updatevm, args, **kwargs):
return subprocess.check_call(
(
@ -87,12 +83,12 @@ class FwupdUpdate:
raise Exception("Specifying updatevm failed")
def _check_updatevm(self):
"""Checks if usbvm is running"""
"""Checks if updatevm is running"""
cmd_xl_list = ["xl", "list"]
p = subprocess.Popen(cmd_xl_list, stdout=subprocess.PIPE)
output = p.communicate()[0].decode()
if p.returncode != 0:
raise Exception("fwupd-qubes: Firmware downgrade failed")
raise Exception("fwupd-qubes: updatevm check failed")
return self.updatevm in output
def _encrypt_update_url(self, url):

View File

@ -19,7 +19,6 @@ from pathlib import Path
from packaging import version as pversion
FWUPD_QUBES_DIR = "/usr/share/qubes-fwupd"
USBVM_N = "sys-usb"
# Check if script is run by tests and append sys path properly
if __name__ == "__main__":
@ -29,18 +28,13 @@ else:
try:
from qubes_fwupd_heads import FwupdHeads
from qubes_fwupd_update import FwupdUpdate, usbvm_run, run_in_tty
from qubes_fwupd_update import FwupdUpdate, run_in_tty
from fwupd_receive_updates import FwupdReceiveUpdates
except ModuleNotFoundError:
raise ModuleNotFoundError(
"qubes-fwupd modules not found. You may need to reinstall package."
)
def usbvm_run_in_tty(args, **kwargs):
return run_in_tty(USBVM_N, args, **kwargs)
FWUPD_DOM0_DIR = "/var/cache/qubes-fwupd"
FWUPD_DOM0_METADATA_DIR = os.path.join(FWUPD_DOM0_DIR, "metadata")
FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates")
@ -49,32 +43,12 @@ FWUPD_DOM0_METADATA_SIGNATURE = os.path.join(
)
FWUPD_DOM0_METADATA_FILE = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz")
FWUPD_DOM0_METADATA_JCAT = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz.jcat")
FWUPD_VM_LOG = os.path.join(FWUPD_DOM0_DIR, "usbvm-devices.log")
FWUPD_VM_VALIDATE = "/usr/libexec/qubes-fwupd/fwupd_usbvm_validate.py"
FWUPD_VM_DIR = "/home/user/.cache/fwupd"
FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates")
FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata")
FWUPD_VM_METADATA_SIGNATURE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.asc")
FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz")
FWUPD_VM_METADATA_JCAT = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat")
FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/"
FWUPDMGR = "/bin/fwupdmgr"
def usbvm_create_file(fname, dest, **kwargs):
with open(fname, "rb") as stdin:
subprocess.check_call(
[*usbvm_run_cmd, "dd", "of=" + dest, "bs=128K", "conv=fsync"],
stdin=stdin,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
BIOS_UPDATE_FLAG = os.path.join(FWUPD_DOM0_DIR, "bios_update")
LVFS_TESTING_DOM0_FLAG = os.path.join(FWUPD_DOM0_DIR, "lvfs_testing")
LVFS_TESTING_USBVM_FLAG = os.path.join(FWUPD_VM_DIR, "lvfs_testing")
HELP = {
"Usage": [
@ -120,114 +94,10 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
if not os.path.exists(self.metadata_file):
raise FileNotFoundError("Metadata file does not exist")
def _validate_usbvm_dirs(self):
"""Validates if sys-ubs updates and metadata directories exist."""
try:
usbvm_run_in_tty((FWUPD_VM_VALIDATE, "dirs"))
except subprocess.CalledProcessError:
raise Exception("Validation of usbvm directories failed.")
def _validate_usbvm_archive(self, arch_name, sha):
"""Validates checksum and gpg signature of the archive file."""
arch_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name)
try:
usbvm_run_in_tty((FWUPD_VM_VALIDATE, "updates", arch_path, sha))
except subprocess.CalledProcessError:
raise Exception("Validation of the archive file failed.")
def _copy_usbvm_metadata(self):
"""Copies metadata files to usbvm."""
self.metadata_file_usbvm = self.metadata_file.replace(
FWUPD_DOM0_METADATA_DIR, FWUPD_VM_METADATA_DIR
)
self.metadata_file_jcat_usbvm = self.metadata_file_usbvm + ".jcat"
usbvm_create_file(self.metadata_file, self.metadata_file_usbvm)
usbvm_create_file(self.metadata_file_jcat, self.metadata_file_jcat_usbvm)
def _validate_usbvm_metadata(self, metadata_url=None):
"""Checks GPG signature of metadata files in usbvm."""
usbvm_cmd = f'"{FWUPD_VM_VALIDATE} metadata"'
cmd_metadata = [FWUPD_VM_DOWNLOAD, "metadata"]
if metadata_url:
cmd_metadata.append("--url=" + metadata_url)
p = subprocess.Popen(cmd_validate_metadata)
try:
usbvm_run_in_tty(cmd_metadata)
except subprocess.CalledProcessError:
raise Exception("Metadata validation failed.")
def _refresh_usbvm_metadata(self):
"""Refreshes metadata in usbvm."""
sig_metadata_file = self.metadata_file_jcat_usbvm
usbvm_run_in_tty(
(
FWUPDMGR,
"refresh",
self.metadata_file_usbvm,
sig_metadata_file,
self.lvfs,
)
)
def _copy_firmware_updates(self, arch_name):
"""Copies updates files to usbvm.
Keywords arguments:
arch_name - name of the archive file
"""
arch_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, arch_name)
output_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name)
usbvm_create_file(arch_path, output_path)
def _install_usbvm_firmware_update(self, arch_name):
"""Installs firmware update for specified device in dom0.
Keywords arguments:
arch_name - name of the archive file
"""
arch_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name)
usbvm_run_in_tty((FWUPDMGR, "--", "install", arch_path))
def _install_usbvm_firmware_downgrade(self, arch_name):
"""Installs firmware downgrades for specified device in dom0.
Keywords arguments:
arch_name - name of the archive file
"""
arch_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name)
usbvm_run_in_tty((FWUPDMGR, "--allow-older", "--", "install", arch_path))
def _clean_usbvm(self):
"""Cleans usbvm directories."""
try:
usbvm_run_in_tty((FWUPDMGR, "clean"))
except subprocess.CalledProcessError:
raise Exception("Cleaning usbvm directories failed")
def _enable_lvfs_testing_dom0(self):
"""Checks and enable lvfs-testing for custom metadata in dom0"""
cmd_lvfs_testing = [FWUPDMGR, "enable-remote", "-y", "lvfs-testing"]
if not os.path.exists(LVFS_TESTING_DOM0_FLAG):
p = subprocess.Popen(cmd_lvfs_testing)
p.wait()
if p.returncode != 0:
raise Exception("Enabling dom0 lvfs-testing failed!!")
Path(LVFS_TESTING_DOM0_FLAG).touch(mode=0o644, exist_ok=False)
def _enable_lvfs_testing_usbvm(self, usbvm=False):
"""Checks and enable lvfs-testing for custom metadata in usbvm"""
if not usbvm:
return
try:
usbvm_run_in_tty((FWUPDMGR, "enable-remote", "-y", "lvfs-testing"))
except subprocess.CalledProcessError:
raise Exception("Enabling usbvm lvfs-testing failed!!")
def refresh_metadata(self, usbvm=False, whonix=False, metadata_url=None):
def refresh_metadata(self, whonix=False, metadata_url=None):
"""Updates metadata with downloaded files.
Keyword arguments:
usbvm -- usbvm support flag
whonix -- Flag enforces downloading the metadata updates via Tor
metadata_url -- Use custom metadata from the url
"""
@ -237,17 +107,11 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
self.metadata_file_jcat = self.metadata_file + ".jcat"
self.lvfs = "lvfs-testing"
self._enable_lvfs_testing_dom0()
self._enable_lvfs_testing_usbvm(usbvm=usbvm)
else:
self.metadata_file = FWUPD_DOM0_METADATA_FILE
self.metadata_file_jcat = FWUPD_DOM0_METADATA_JCAT
self.lvfs = "lvfs"
self._download_metadata(whonix=whonix, metadata_url=metadata_url)
if usbvm:
self._validate_usbvm_dirs()
self._copy_usbvm_metadata()
self._validate_usbvm_metadata(metadata_url=metadata_url)
self._refresh_usbvm_metadata()
cmd_refresh = [
FWUPDMGR,
"refresh",
@ -311,7 +175,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
if not os.path.exists(update_path):
raise NotADirectoryError("Firmware update files do not exist")
def _user_input(self, updates_dict, downgrade=False, usbvm=False):
def _user_input(self, updates_list, downgrade=False):
"""UI for update process.
Keywords arguments:
@ -319,11 +183,6 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
downgrade -- downgrade flag
"""
decorator = "======================================================"
if usbvm:
updates_list = updates_dict["dom0"] + updates_dict["usbvm"]
else:
updates_list = updates_dict["dom0"]
dom0_updates_num = len(updates_dict["dom0"])
if len(updates_list) == 0:
print("No updates available.")
return EXIT_CODES["NOTHING_TO_DO"]
@ -331,11 +190,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
print("Available downgrades:")
else:
print("Available updates:")
self._updates_crawler(updates_dict["dom0"])
if usbvm:
self._updates_crawler(
updates_dict["usbvm"], usbvm=True, prefix=dom0_updates_num
)
self._updates_crawler(updates_list)
while True:
try:
@ -346,10 +201,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
device_num = int(choice) - 1
if 0 <= device_num < len(updates_list):
if not downgrade:
if device_num >= dom0_updates_num:
return "usbvm", device_num - dom0_updates_num
else:
return "dom0", device_num
return device_num
break
else:
raise ValueError()
@ -379,27 +231,22 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
return EXIT_CODES["NOTHING_TO_DO"]
downgrade_num = int(choice) - 1
if 0 <= downgrade_num < len(releases):
if device_num >= dom0_updates_num:
device_abs_num = device_num - dom0_updates_num
return "usbvm", device_abs_num, downgrade_num
else:
return "dom0", device_num, downgrade_num
return device_num, downgrade_num
else:
raise ValueError()
except ValueError:
print("Invalid choice.")
def _parse_parameters(self, updates_dict, vm_name, choice):
def _parse_parameters(self, updates_list, choice):
"""Parses device name, url, version and SHA256 checksum of the file list.
Keywords arguments:
updates_dict - dictionary of updates for dom0 and usbvm
vm_name - VM name
updates_list - list of updates for dom0
choice -- number of device to be updated
"""
self.name = updates_dict[vm_name][choice]["Name"]
self.version = updates_dict[vm_name][choice]["Releases"][0]["Version"]
for ver_check in updates_dict[vm_name][choice]["Releases"]:
self.name = updates_list[choice]["Name"]
self.version = updates_list[choice]["Releases"][0]["Version"]
for ver_check in updates_list[choice]["Releases"]:
if pversion.parse(ver_check["Version"]) >= pversion.parse(self.version):
self.version = ver_check["Version"]
self.url = ver_check["Url"]
@ -460,95 +307,26 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
if p.returncode != 0:
raise Exception("fwupd-qubes: Getting devices info failed")
def _get_usbvm_devices(self):
"""Gathers information about devices connected in usbvm."""
if os.path.exists(FWUPD_VM_LOG):
os.remove(FWUPD_VM_LOG)
with open(FWUPD_VM_LOG, "wb") as log:
p = usbvm_run(
[FWUPDAGENT, "get-devices"],
stdout=log,
stderr=log,
stdin=subprocess.DEVNULL,
)
if p.returncode != 0 and p.returncode != 2 and not os.path.exists(FWUPD_VM_LOG):
raise Exception("fwupd-qubes: Getting usbvm devices info failed")
if not os.path.exists(FWUPD_VM_LOG):
raise Exception("usbvm device info log does not exist")
def _parse_usbvm_updates(self, usbvm_devices_info):
"""Creates dictionary and list with information about updates.
Keywords argument:
usbvm_devices_info - gathered usbvm information
"""
self.usbvm_updates_list = []
if "No detected devices" in usbvm_devices_info:
return EXIT_CODES["NOTHING_TO_DO"]
usbvm_device_info_dict = json.loads(usbvm_devices_info)
for device in usbvm_device_info_dict["Devices"]:
if "Releases" in device:
self.usbvm_updates_list.append(
{
"Name": device["Name"],
"Version": device["Version"],
"Releases": [],
}
)
current_version = device["Version"]
for update in device["Releases"]:
if pversion.parse(update["Version"]) > pversion.parse(
current_version
):
self.usbvm_updates_list[-1]["Releases"].append(
{
"Version": update["Version"],
"Url": update["Uri"],
"Checksum": update["Checksum"][-1],
"Description": update["Description"],
}
)
if not self.usbvm_updates_list[-1]["Releases"]:
self.usbvm_updates_list.pop()
def update_firmware(self, usbvm=False, whonix=False):
def update_firmware(self, whonix=False):
"""Updates firmware of the specified device.
Keyword arguments:
usbvm -- usbvm support flag
whonix -- Flag enforces downloading the metadata updates via Tor
"""
self._get_dom0_updates()
self._parse_dom0_updates_info(self.dom0_updates_info)
if usbvm:
self._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
self._parse_usbvm_updates(raw)
update_dict = {
"usbvm": self.usbvm_updates_list,
"dom0": self.dom0_updates_list,
}
ret_input = self._user_input(update_dict, usbvm=True)
else:
update_dict = {"dom0": self.dom0_updates_list}
ret_input = self._user_input(update_dict)
updates_list = self.dom0_updates_list
ret_input = self._user_input(updates_list)
if ret_input == EXIT_CODES["NOTHING_TO_DO"]:
exit(EXIT_CODES["NOTHING_TO_DO"])
vm_name, choice = ret_input
self._parse_parameters(update_dict, vm_name, choice)
choice = ret_input
self._parse_parameters(updates_list, choice)
self._download_firmware_updates(self.url, self.sha, whonix=whonix)
if self.name == "System Firmware":
Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True)
extracted_path = self.arch_path.replace(".cab", "")
self._verify_dmi(extracted_path, self.version)
if vm_name == "dom0":
self._install_dom0_firmware_update(self.arch_path)
if vm_name == "usbvm":
self._validate_usbvm_dirs()
self._copy_firmware_updates(self.arch_name)
self._validate_usbvm_archive(self.arch_name, self.sha)
self._install_usbvm_firmware_update(self.arch_name)
self._install_dom0_firmware_update(self.arch_path)
def _parse_downgrades(self, device_list):
"""Parses information about possible downgrades.
@ -597,49 +375,34 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
if p.returncode != 0:
raise Exception("fwupd-qubes: Firmware downgrade failed")
def downgrade_firmware(self, usbvm=False, whonix=False):
def downgrade_firmware(self, whonix=False):
"""Downgrades firmware of the specified device.
Keyword arguments:
usbvm -- usbvm support flag
whonix -- Flag enforces downloading the metadata updates via Tor
"""
self._get_dom0_devices()
dom0_downgrades = self._parse_downgrades(self.dom0_devices_info)
if usbvm:
self._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
usbvm_downgrades = self._parse_downgrades(raw)
downgrade_dict = {"usbvm": usbvm_downgrades, "dom0": dom0_downgrades}
ret_input = self._user_input(downgrade_dict, downgrade=True, usbvm=True)
else:
downgrade_dict = {"dom0": dom0_downgrades}
ret_input = self._user_input(downgrade_dict, downgrade=True)
ret_input = self._user_input(dom0_downgrades, downgrade=True)
if ret_input == EXIT_CODES["NOTHING_TO_DO"]:
exit(EXIT_CODES["NOTHING_TO_DO"])
vm_name, device_choice, downgrade_choice = ret_input
releases = downgrade_dict[vm_name][device_choice]["Releases"]
device_choice, downgrade_choice = ret_input
downgrade = dom0_downgrades[device_choice]
releases = downgrade["Releases"]
downgrade_url = releases[downgrade_choice]["Url"]
downgrade_sha = releases[downgrade_choice]["Checksum"]
self._download_firmware_updates(downgrade_url, downgrade_sha, whonix=whonix)
if downgrade_dict[vm_name][device_choice]["Name"] == "System Firmware":
if downgrade["Name"] == "System Firmware":
Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True)
extracted_path = self.arch_path.replace(".cab", "")
self._verify_dmi(
extracted_path,
downgrade_dict[vm_name][device_choice]["Version"],
downgrade["Version"],
downgrade=True,
)
if vm_name == "dom0":
self._install_dom0_firmware_downgrade(self.arch_path)
if vm_name == "usbvm":
self._validate_usbvm_dirs()
self._copy_firmware_updates(self.arch_name)
self._validate_usbvm_archive(self.arch_name, downgrade_sha)
self._install_usbvm_firmware_downgrade(self.arch_name)
self._install_dom0_firmware_downgrade(self.arch_path)
def _output_crawler(self, updev_dict, level, help_f=False, dom0=True):
def _output_crawler(self, updev_dict, level, help_f=False):
"""Prints device and updates information as a tree.
Keywords arguments:
@ -679,29 +442,23 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
if level == 0 and help_f is True:
print(output)
else:
if level == 0 and dom0 is True:
if level == 0:
print(f"Dom0 {output}")
elif level == 0 and dom0 is False:
print(f"{USBVM_N} {output}")
for nested_dict in updev_dict[updev_key]:
self._output_crawler(nested_dict, level + 1)
def _updates_crawler(self, updates_list, usbvm=False, prefix=0):
"""Prints updates information for dom0 and usbvm
def _updates_crawler(self, updates_list, prefix=0):
"""Prints updates information for dom0
Keywords arguments:
updates_list -- list of devices updates
usbvm -- usbvm support flag
prefix -- device number prefix
"""
available_updates = False
decorator = "======================================================"
print(decorator)
if usbvm:
print(f"{USBVM_N} updates:")
else:
print("Dom0 updates:")
print("Dom0 updates:")
print(decorator)
if len(updates_list) == 0:
print("No updates available.")
@ -734,80 +491,37 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates):
print("No updates available.")
return EXIT_CODES["NOTHING_TO_DO"]
def get_devices_qubes(self, usbvm=False):
"""Gathers and prints devices information.
Keyword arguments:
usbvm -- usbvm support flag
"""
def get_devices_qubes(self):
"""Gathers and prints devices information."""
self._get_dom0_devices()
dom0_devices_info_dict = json.loads(self.dom0_devices_info)
self._output_crawler(dom0_devices_info_dict, 0)
if usbvm:
self._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
if "No detected devices" not in raw:
usbvm_device_info_dict = json.loads(raw)
else:
print(f"No detected devices in {USBVM_N}")
return EXIT_CODES["NOTHING_TO_DO"]
self._output_crawler(usbvm_device_info_dict, 0, dom0=False)
def get_updates_qubes(self, usbvm=False):
"""Gathers and prints updates information.
Keyword arguments:
usbvm -- usbvm support flag
"""
def get_updates_qubes(self):
"""Gathers and prints updates information."""
self._get_dom0_updates()
self._parse_dom0_updates_info(self.dom0_updates_info)
self._updates_crawler(self.dom0_updates_list)
if usbvm:
self._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
self._parse_usbvm_updates(raw)
self._updates_crawler(self.usbvm_updates_list, usbvm=True)
def help(self):
"""Prints help information"""
self._output_crawler(HELP, 0, help_f=True)
def check_usbvm(self):
"""Checks if usbvm is running"""
cmd_xl_list = ["xl", "list"]
p = subprocess.Popen(cmd_xl_list, stdout=subprocess.PIPE)
self.output = p.communicate()[0].decode()
if p.returncode != 0:
raise Exception("fwupd-qubes: Firmware downgrade failed")
return USBVM_N in self.output
def trusted_cleanup(self, usbvm=False):
"""Deletes trusted directory.
Keyword arguments:
usbvm -- usbvm support flag
"""
def trusted_cleanup(self):
"""Deletes trusted directory."""
trusted_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, "trusted.cab")
if os.path.exists(trusted_path):
os.remove(trusted_path)
shutil.rmtree(trusted_path.replace(".cab", ""))
if usbvm:
self._clean_usbvm()
def refresh_metadata_after_bios_update(self, usbvm=False):
"""Refreshes metadata after bios update
Keyword arguments:
usbvm -- usbvm support flag
"""
def refresh_metadata_after_bios_update(self):
"""Refreshes metadata after bios update"""
if os.path.exists(BIOS_UPDATE_FLAG):
print("BIOS was updated. Refreshing metadata...")
if "--whonix" in sys.argv:
self.refresh_metadata(usbvm=usbvm, whonix=True)
self.refresh_metadata(whonix=True)
else:
self.refresh_metadata(usbvm=usbvm)
self.refresh_metadata()
os.remove(BIOS_UPDATE_FLAG)
def heads_update(self, device="x230", whonix=False, metadata_url=None):
@ -875,16 +589,15 @@ def main():
exit(EXIT_CODES["ERROR"])
q = QubesFwupdmgr()
sys_usb = q.check_usbvm()
q.validate_dom0_dirs()
q.trusted_cleanup(usbvm=sys_usb)
q.refresh_metadata_after_bios_update(usbvm=sys_usb)
q.trusted_cleanup()
q.refresh_metadata_after_bios_update()
metadata_url = None
device = "x230"
if not os.path.exists(FWUPD_DOM0_DIR):
q.refresh_metadata(usbvm=sys_usb)
q.refresh_metadata()
if len(sys.argv) < 2:
q.help()
@ -903,23 +616,23 @@ def main():
device = arg.replace("--device=", "")
if sys.argv[1] == "get-updates":
q.get_updates_qubes(usbvm=sys_usb)
q.get_updates_qubes()
elif sys.argv[1] == "get-devices":
q.get_devices_qubes(usbvm=sys_usb)
q.get_devices_qubes()
elif sys.argv[1] == "update" and "--whonix" in sys.argv:
q.update_firmware(usbvm=sys_usb, whonix=True)
q.update_firmware(whonix=True)
elif sys.argv[1] == "update" and "--whonix" not in sys.argv:
q.update_firmware(usbvm=sys_usb)
q.update_firmware()
elif sys.argv[1] == "downgrade" and "--whonix" in sys.argv:
q.downgrade_firmware(usbvm=sys_usb, whonix=True)
q.downgrade_firmware(whonix=True)
elif sys.argv[1] == "downgrade" and "--whonix" not in sys.argv:
q.downgrade_firmware(usbvm=sys_usb)
q.downgrade_firmware()
elif sys.argv[1] == "clean":
q.clean_cache(usbvm=sys_usb)
q.clean_cache()
elif sys.argv[1] == "refresh" and "--whonix" not in sys.argv:
q.refresh_metadata(usbvm=sys_usb, metadata_url=metadata_url)
q.refresh_metadata(metadata_url=metadata_url)
elif sys.argv[1] == "refresh" and "--whonix" in sys.argv:
q.refresh_metadata(usbvm=sys_usb, whonix=True, metadata_url=metadata_url)
q.refresh_metadata(whonix=True, metadata_url=metadata_url)
elif sys.argv[1] == "update-heads" and "--whonix" not in sys.argv:
q.heads_update(device=device, metadata_url=metadata_url)
elif sys.argv[1] == "update-heads" and "--whonix" in sys.argv:

View File

@ -1,105 +0,0 @@
#!/usr/bin/python3
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2021 Norbert Kamiński <norbert.kaminski@3mdeb.com>
#
# SPDX-License-Identifier: LGPL-2.1+
#
import glob
import os
import shutil
import subprocess
import sys
from fwupd_common_vm import FwupdVmCommon
FWUPD_VM_DIR = "/home/user/.cache/fwupd"
FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates")
FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata")
FWUPD_VM_METADATA_JCAT = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat")
FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz")
FWUPDMGR = "/bin/fwupdmgr"
FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/"
class FwupdUsbvmUpdates(FwupdVmCommon):
def _verify_received(self, files_path, regex_pattern):
"""Checks if sent files match regex filename pattern.
Keyword arguments:
files_path -- absolute path to inspected directory
regex_pattern -- pattern of the expected files
"""
for untrusted_f in os.listdir(files_path):
if not regex_pattern.match(untrusted_f):
raise Exception("Dom0 sent unexpected file")
f = untrusted_f
assert "/" not in f
assert "\0" not in f
assert "\x1b" not in f
path_f = os.path.join(files_path, f)
if os.path.islink(path_f) or not os.path.isfile(path_f):
raise Exception("Dom0 sent not regular file")
def validate_metadata(self, metadata_url=None):
"""Validates received the metadata files."""
print("Running validation of the metadata files")
if metadata_url:
metadata_name = metadata_url.replace(FWUPD_DOWNLOAD_PREFIX, "")
metadata_file = os.path.join(FWUPD_VM_METADATA_DIR, metadata_name)
else:
metadata_file = FWUPD_VM_METADATA_FILE
try:
self._jcat_verification(f"{metadata_file}.jcat", FWUPD_VM_METADATA_DIR)
except Exception as e:
print(str(e), file=sys.stderr)
self.clean_vm_cache()
exit(1)
def validate_updates(self, archive_path, sha):
"""Validates received an update file.
Keyword arguments:
archive_path - path to the firmware update archive
sha -- SHA256 checksum of the firmware update archive
"""
print("Running validation of the update archive")
self.check_shasum(archive_path, sha)
archive_name = archive_path.replace(f"{FWUPD_VM_UPDATES_DIR}/", "")
output_path = archive_path.replace(".cab", "")
arch_temp = os.path.join(output_path, archive_name)
os.mkdir(output_path)
# jcat verification will be done by fwupd itself
shutil.copyfile(archive_path, arch_temp)
def main():
f = FwupdUsbvmUpdates()
f_val = FwupdVmCommon()
metadata_url = None
if len(sys.argv) < 2:
raise Exception("Invalid number of arguments.")
for arg in sys.argv:
if "--url=" in arg:
metadata_url = arg.replace("--url=", "")
if sys.argv[1] == "metadata":
f.validate_metadata(metadata_url=metadata_url)
elif sys.argv[1] == "dirs":
f_val.validate_vm_dirs()
elif sys.argv[1] == "clean":
f.clean_vm_cache()
elif sys.argv[1] == "updates" and len(sys.argv) < 4:
raise Exception(
"Invalid number of arguments.\n" "Expected archive path and checksum."
)
elif sys.argv[1] == "updates" and not len(sys.argv) < 4:
f.validate_updates(sys.argv[2], sys.argv[3])
else:
raise Exception("Invalid command")
if __name__ == "__main__":
main()

View File

@ -33,37 +33,17 @@ elif os.path.exists(QUBES_FWUPDMGR_BINDIR):
FWUPD_DOM0_DIR = "/var/cache/qubes-fwupd"
FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates")
FWUPD_DOM0_UNTRUSTED_DIR = os.path.join(FWUPD_DOM0_UPDATES_DIR, "untrusted")
FWUPD_VM_LOG = os.path.join(FWUPD_DOM0_DIR, "usbvm-devices.log")
FWUPD_DOM0_METADATA_DIR = os.path.join(FWUPD_DOM0_DIR, "metadata")
FWUPD_DOM0_METADATA_FILE = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz")
FWUPD_DOM0_METADATA_FILE_JCAT = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz")
FWUPD_VM_DIR = "/home/user/.cache/fwupd"
FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates")
FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata")
FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz")
FWUPD_VM_METADATA_FILE_JCAT = os.path.join(
FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat"
)
REQUIRED_DEV = "Requires device not connected"
REQUIRED_USBVM = "Requires sys-usb"
XL_LIST_LOG = "Name ID Mem VCPUs State Time(s)"
USBVM_N = "sys-usb"
FWUPDMGR = "/bin/fwupdmgr"
BIOS_UPDATE_FLAG = os.path.join(FWUPD_DOM0_DIR, "bios_update")
LVFS_TESTING_DOM0_FLAG = os.path.join(FWUPD_DOM0_DIR, "lvfs_testing")
LVFS_TESTING_USBVM_FLAG = os.path.join(FWUPD_VM_DIR, "lvfs_testing")
CUSTOM_METADATA = "https://fwupd.org/downloads/firmware-3c81bfdc9db5c8a42c09d38091944bc1a05b27b0.xml.gz"
def check_usbvm():
"""Checks if sys-usb is running"""
if "qubes" not in platform.release():
return False
q = qfwupd.QubesFwupdmgr()
q.check_usbvm()
return "sys-usb" in q.output
def device_connected_dom0():
"""Checks if the testing device is connected in dom0"""
if "qubes" not in platform.release():
@ -73,25 +53,11 @@ def device_connected_dom0():
return "ColorHug2" in q.dom0_devices_info
def device_connected_usbvm():
"""Checks if the testing device is connected in usbvm"""
if not check_usbvm():
return False
q = qfwupd.QubesFwupdmgr()
q._validate_usbvm_dirs()
if not os.path.exists(FWUPD_DOM0_DIR):
q.refresh_metadata()
q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
return "ColorHug2" in usbvm_device_info.read()
def check_whonix_updatevm():
"""Checks if the sys-whonix is running"""
if "qubes" not in platform.release():
return False
q = qfwupd.QubesFwupdmgr()
q.check_usbvm()
return "sys-whonix" in q.output
@ -162,24 +128,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
msg="Metadata refresh failed.",
)
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_refresh_metadata_usbvm(self):
self.q.refresh_metadata(usbvm=True)
self.assertEqual(
self.q.output,
"Successfully refreshed metadata manually\n",
msg="Metadata refresh failed.",
)
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_refresh_metadata_usbvm_custom(self):
self.q.refresh_metadata(usbvm=True, metadata_url=CUSTOM_METADATA)
self.assertEqual(
self.q.output,
"Successfully refreshed metadata manually\n",
msg="Metadata refresh failed.",
)
@unittest.skipUnless(check_whonix_updatevm(), "Requires sys-whonix")
def test_refresh_metadata_whonix(self):
self.q.refresh_metadata(whonix=True)
@ -245,57 +193,30 @@ class TestQubesFwupdmgr(unittest.TestCase):
self.assertTrue(os.path.exists(update_path))
def test_user_input_empty_dict(self):
downgrade_dict = {"usbvm": [], "dom0": []}
self.assertEqual(self.q._user_input(downgrade_dict), 2)
self.assertEqual(self.q._user_input([]), 2)
def test_user_input_n(self):
user_input = ["sth", "n"]
with patch("builtins.input", side_effect=user_input):
self.q._parse_dom0_updates_info(UPDATE_INFO)
downgrade_dict = {
"usbvm": self.q.dom0_updates_list,
"dom0": self.q.dom0_updates_list,
}
choice = self.q._user_input(downgrade_dict, usbvm=True)
choice = self.q._user_input(self.q.dom0_updates_list)
self.assertEqual(choice, 2)
user_input = ["sth", "N"]
with patch("builtins.input", side_effect=user_input):
self.q._parse_dom0_updates_info(UPDATE_INFO)
downgrade_dict = {
"usbvm": self.q.dom0_updates_list,
"dom0": self.q.dom0_updates_list,
}
choice = self.q._user_input(downgrade_dict, usbvm=True)
choice = self.q._user_input(self.q.dom0_updates_list)
self.assertEqual(choice, 2)
def test_user_input_choice(self):
user_input = ["6", "1"]
with patch("builtins.input", side_effect=user_input):
self.q._parse_dom0_updates_info(UPDATE_INFO)
updates_dict = {
"usbvm": self.q.dom0_updates_list,
"dom0": self.q.dom0_updates_list,
}
key, choice = self.q._user_input(updates_dict)
self.assertEqual(key, "dom0")
self.assertEqual(choice, 0)
def test_user_input_choice_usbvm(self):
user_input = ["6", "2"]
with patch("builtins.input", side_effect=user_input):
self.q._parse_dom0_updates_info(UPDATE_INFO)
updates_dict = {
"usbvm": self.q.dom0_updates_list,
"dom0": self.q.dom0_updates_list,
}
key, choice = self.q._user_input(updates_dict, usbvm=True)
self.assertEqual(key, "usbvm")
choice = self.q._user_input(self.q.dom0_updates_list)
self.assertEqual(choice, 0)
def test_parse_parameters(self):
self.q._parse_dom0_updates_info(UPDATE_INFO)
update_dict = {"dom0": self.q.dom0_updates_list}
self.q._parse_parameters(update_dict, "dom0", 0)
self.q._parse_parameters(self.q.dom0_updates_list, 0)
self.assertEqual(
self.q.url,
"https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab",
@ -312,31 +233,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
self.assertFalse(os.path.exists(FWUPD_DOM0_METADATA_DIR))
self.assertFalse(os.path.exists(FWUPD_DOM0_UNTRUSTED_DIR))
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_clean_cache_dom0_n_usbvm(self):
self.q._validate_usbvm_dirs()
self.q.clean_cache(usbvm=True)
self.assertFalse(os.path.exists(FWUPD_DOM0_METADATA_DIR))
self.assertFalse(os.path.exists(FWUPD_DOM0_UNTRUSTED_DIR))
cmd_validate_metadata = [
"qvm-run",
"--pass-io",
"sys-usb",
f"! [ -d {FWUPD_VM_METADATA_DIR} ]",
]
p = subprocess.Popen(cmd_validate_metadata)
p.wait()
self.assertEqual(p.returncode, 0, msg="Creating metadata directory failed")
cmd_validate_udpdate = [
"qvm-run",
"--pass-io",
"sys-usb",
f"! [ -d {FWUPD_VM_UPDATES_DIR} ]",
]
p = subprocess.Popen(cmd_validate_udpdate)
p.wait()
self.assertEqual(p.returncode, 0, msg="Cleaning update directory failed")
def test_output_crawler(self):
crawler_output = io.StringIO()
sys.stdout = crawler_output
@ -360,14 +256,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
self.assertNotEqual(get_devices_output.getvalue().strip(), "")
sys.stdout = self.captured_output
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_get_devices_qubes_usbvm(self):
get_devices_output = io.StringIO()
sys.stdout = get_devices_output
self.q.get_devices_qubes(usbvm=True)
self.assertNotEqual(get_devices_output.getvalue().strip(), "")
sys.stdout = self.captured_output
@unittest.skipUnless(device_connected_dom0(), REQUIRED_DEV)
def test_get_updates_qubes_dom0(self):
get_updates_output = io.StringIO()
@ -376,14 +264,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
self.assertNotEqual(get_updates_output.getvalue().strip(), "")
sys.stdout = self.captured_output
@unittest.skipUnless(device_connected_usbvm(), REQUIRED_DEV)
def test_get_updates_qubes_usbvm(self):
get_updates_output = io.StringIO()
sys.stdout = get_updates_output
self.q.get_updates_qubes(usbvm=True)
self.assertNotEqual(get_updates_output.getvalue().strip(), "")
sys.stdout = self.captured_output
def test_help(self):
help_output = io.StringIO()
sys.stdout = help_output
@ -441,95 +321,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
new_version = downgrades[number]["Version"]
self.assertGreater(Version(old_version), Version(new_version))
@unittest.skipUnless(
check_whonix_updatevm() and device_connected_usbvm(), REQUIRED_DEV
)
def test_update_n_downgrade_firmware_whonix(self):
old_version = None
self.q.clean_cache(usbvm=True)
self.q._get_dom0_devices()
dom0_downgrades = self.q._parse_downgrades(self.q.dom0_devices_info)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
downgrades = self.q._parse_downgrades(raw)
for number, device in enumerate(downgrades):
if "Name" not in device:
continue
if device["Name"] == "ColorHug2":
old_version = device["Version"]
break
if old_version is None:
self.fail("Test device not found")
user_input = [str(number + 1 + len(dom0_downgrades)), "1"]
with patch("builtins.input", side_effect=user_input):
self.q.downgrade_firmware(usbvm=True, whonix=True)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
downgrades = self.q._parse_downgrades(raw)
new_version = downgrades[number]["Version"]
self.assertGreater(Version(old_version), Version(new_version))
old_version = None
new_version = None
self.q._get_dom0_updates()
self.q._parse_dom0_updates_info(self.q.dom0_updates_info)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
self.q._parse_usbvm_updates(raw)
for number, device in enumerate(self.q.usbvm_updates_list):
if "Name" not in device:
continue
if device["Name"] == "ColorHug2":
old_version = device["Version"]
break
if old_version is None:
self.fail("Test device not found")
user_input = [str(number + 1 + len(self.q.dom0_updates_list)), "1"]
with patch("builtins.input", side_effect=user_input):
self.q.update_firmware(usbvm=True, whonix=True)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
usbvm_devices_info_dict = json.loads(raw)
for device in usbvm_devices_info_dict["Devices"]:
if "Name" not in device:
continue
if device["Name"] == "ColorHug2":
new_version = device["Version"]
break
if new_version is None:
self.fail("Test device not found")
self.assertLess(Version(old_version), Version(new_version))
@unittest.skipUnless(device_connected_usbvm(), REQUIRED_DEV)
def test_downgrade_firmware_usbvm(self):
old_version = None
self.q._get_dom0_devices()
dom0_downgrades = self.q._parse_downgrades(self.q.dom0_devices_info)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
downgrades = self.q._parse_downgrades(raw)
for number, device in enumerate(downgrades):
if "Name" not in device:
continue
if device["Name"] == "ColorHug2":
old_version = device["Version"]
break
if old_version is None:
self.fail("Test device not found")
user_input = [str(number + 1 + len(dom0_downgrades)), "1"]
with patch("builtins.input", side_effect=user_input):
self.q.downgrade_firmware(usbvm=True)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
downgrades = self.q._parse_downgrades(raw)
new_version = downgrades[number]["Version"]
self.assertGreater(Version(old_version), Version(new_version))
def test_parse_downgrades(self):
downgrades = self.q._parse_downgrades(GET_DEVICES)
self.assertEqual(downgrades[0]["Name"], "ColorHug2")
@ -558,18 +349,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
"4ee9dfa38df3b810f739d8a19d13da1b3175fb87",
)
def test_user_input_downgrade_usbvm(self):
user_input = ["2", "6", "sth", "2.2.1", "", " ", "\0", "2"]
with patch("builtins.input", side_effect=user_input):
downgrade_list = self.q._parse_downgrades(GET_DEVICES)
downgrade_dict = {"usbvm": downgrade_list, "dom0": downgrade_list}
key, device_choice, downgrade_choice = self.q._user_input(
downgrade_dict, downgrade=True, usbvm=True
)
self.assertEqual(key, "usbvm")
self.assertEqual(device_choice, 0)
self.assertEqual(downgrade_choice, 1)
def test_user_input_downgrade_dom0(self):
user_input = ["1", "6", "sth", "2.2.1", "", " ", "\0", "2"]
with patch("builtins.input", side_effect=user_input):
@ -586,8 +365,7 @@ class TestQubesFwupdmgr(unittest.TestCase):
user_input = ["N"]
with patch("builtins.input", side_effect=user_input):
downgrade_list = self.q._parse_downgrades(GET_DEVICES)
downgrade_dict = {"usbvm": downgrade_list, "dom0": downgrade_list}
N_choice = self.q._user_input(downgrade_dict, downgrade=True, usbvm=True)
N_choice = self.q._user_input(downgrade_list, downgrade=True)
self.assertEqual(N_choice, 2)
@unittest.skipUnless(device_connected_dom0(), REQUIRED_DEV)
@ -619,126 +397,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
self.fail("Test device not found")
self.assertLess(Version(old_version), Version(new_version))
@unittest.skipUnless(device_connected_usbvm(), REQUIRED_DEV)
def test_update_firmware_usbvm(self):
old_version = None
new_version = None
self.q._get_dom0_updates()
self.q._parse_dom0_updates_info(self.q.dom0_updates_info)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
self.q._parse_usbvm_updates(raw)
for number, device in enumerate(self.q.usbvm_updates_list):
if "Name" not in device:
continue
if device["Name"] == "ColorHug2":
old_version = device["Version"]
break
if old_version is None:
self.fail("Test device not found")
user_input = [str(number + 1 + len(self.q.dom0_updates_list)), "1"]
with patch("builtins.input", side_effect=user_input):
self.q.update_firmware(usbvm=True)
self.q._get_usbvm_devices()
with open(FWUPD_VM_LOG) as usbvm_device_info:
raw = usbvm_device_info.read()
usbvm_devices_info_dict = json.loads(raw)
for device in usbvm_devices_info_dict["Devices"]:
if "Name" not in device:
continue
if device["Name"] == "ColorHug2":
new_version = device["Version"]
break
if new_version is None:
self.fail("Test device not found")
self.assertLess(Version(old_version), Version(new_version))
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_get_usbvm_devices(self):
self.q._get_usbvm_devices()
self.assertTrue(os.path.exists(FWUPD_VM_LOG))
def test_parse_usbvm_updates(self):
self.q._parse_usbvm_updates(GET_DEVICES)
self.assertEqual(self.q.usbvm_updates_list[0]["Name"], "ColorHug2")
self.assertEqual(self.q.usbvm_updates_list[0]["Version"], "2.0.6")
self.assertListEqual(
self.q.usbvm_updates_list[0]["Releases"],
[
{
"Checksum": "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda",
"Description": "<p>This release fixes prevents the firmware returning an "
"error when the remote SHA1 hash was never sent.</p>",
"Url": "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab",
"Version": "2.0.7",
}
],
)
def test_parse_usbvm_updates_no_updates_available(self):
self.q._parse_usbvm_updates(GET_DEVICES_NO_UPDATES)
self.assertListEqual(self.q.usbvm_updates_list, [])
def test_updates_crawler(self):
crawler_output = io.StringIO()
sys.stdout = crawler_output
self.q._parse_usbvm_updates(GET_DEVICES)
self.q._updates_crawler(self.q.usbvm_updates_list, usbvm=True)
with open("test/logs/get_updates.log", "r") as getupdates:
self.assertEqual(
getupdates.read(), crawler_output.getvalue().strip() + "\n"
)
sys.stdout = self.captured_output
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_validate_usbvm_dirs(self):
self.q._validate_usbvm_dirs()
cmd_validate_metadata = [
"qvm-run",
"--pass-io",
"sys-usb",
f"[ -d {FWUPD_VM_METADATA_DIR} ]",
]
p = subprocess.Popen(cmd_validate_metadata)
p.wait()
self.assertEqual(p.returncode, 0, msg="Creating metadata directory failed")
cmd_validate_udpdate = [
"qvm-run",
"--pass-io",
"sys-usb",
f"[ -d {FWUPD_VM_UPDATES_DIR} ]",
]
p = subprocess.Popen(cmd_validate_udpdate)
p.wait()
self.assertEqual(p.returncode, 0, msg="Creating update directory failed")
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_copy_usbvm_metadata(self):
self.q.metadata_file = FWUPD_DOM0_METADATA_FILE
self.q.metadata_file_jcat = self.q.metadata_file + ".jcat"
self.q._download_metadata()
self.q._validate_usbvm_dirs()
self.q._copy_usbvm_metadata()
cmd_validate_metadata_file = [
"qvm-run",
"--pass-io",
"sys-usb",
f"[ -f {FWUPD_VM_METADATA_FILE} ]",
]
p = subprocess.Popen(cmd_validate_metadata_file)
p.wait()
self.assertEqual(p.returncode, 0, msg="Metadata file does not exist")
cmd_validate_metadata_jcat = [
"qvm-run",
"--pass-io",
"sys-usb",
f"[ -f {FWUPD_VM_METADATA_FILE_JCAT} ]",
]
p = subprocess.Popen(cmd_validate_metadata_jcat)
p.wait()
self.assertEqual(p.returncode, 0, msg="Metadata jcat signature does not exist")
@unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS")
def test_enable_lvfs_testing_dom0(self):
if os.path.exists(LVFS_TESTING_DOM0_FLAG):
@ -746,128 +404,6 @@ class TestQubesFwupdmgr(unittest.TestCase):
self.q._enable_lvfs_testing_dom0()
self.assertTrue(os.path.exists(LVFS_TESTING_DOM0_FLAG))
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_enable_lvfs_testing_usbvm(self):
cmd_validate_flag = [
"qvm-run",
"--pass-io",
USBVM_N,
(
"script --quiet --return --command "
f'"ls {LVFS_TESTING_USBVM_FLAG} &>/dev/null"'
),
]
cmd_rm_flag = [
"qvm-run",
"--pass-io",
USBVM_N,
("script --quiet --return --command " f'"rm {LVFS_TESTING_USBVM_FLAG}"'),
]
flag = subprocess.Popen(cmd_validate_flag)
flag.wait()
if flag.returncode == 0:
rm_flag = subprocess.Popen(cmd_rm_flag)
rm_flag.wait()
if rm_flag.returncode != 0:
raise Exception("Removing lvfs-testing flag failed!!")
self.q._enable_lvfs_testing_usbvm(usbvm=True)
flag = subprocess.Popen(cmd_validate_flag)
flag.wait()
self.assertEqual(flag.returncode, 0)
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_validate_usbvm_metadata(self):
self.q.metadata_file = FWUPD_DOM0_METADATA_FILE
self.q.metadata_file_jcat = self.q.metadata_file + ".jcat"
self.q._download_metadata()
self.q._validate_usbvm_dirs()
self.q._copy_usbvm_metadata()
self.q._validate_usbvm_metadata()
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_refresh_usbvm_metadata(self):
self.q.metadata_file = FWUPD_DOM0_METADATA_FILE
self.q.metadata_file_jcat = self.q.metadata_file + ".jcat"
self.q.lvfs = "lvfs"
self.q._download_metadata()
self.q._validate_usbvm_dirs()
self.q._copy_usbvm_metadata()
self.q._validate_usbvm_metadata()
self.q._refresh_usbvm_metadata()
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_clean_usbvm(self):
self.q._validate_usbvm_dirs()
self.q._clean_usbvm()
cmd_validate_metadata = [
"qvm-run",
"--pass-io",
"sys-usb",
f"! [ -d {FWUPD_VM_METADATA_DIR} ]",
]
p = subprocess.Popen(cmd_validate_metadata)
p.wait()
self.assertEqual(p.returncode, 0, msg="Cleaning metadata directory failed")
cmd_validate_udpdate = [
"qvm-run",
"--pass-io",
"sys-usb",
f"! [ -d {FWUPD_VM_METADATA_DIR} ]",
]
p = subprocess.Popen(cmd_validate_udpdate)
p.wait()
self.assertEqual(p.returncode, 0, msg="Cleaning update directory failed")
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_validate_usbvm_archive(self):
url = (
"https://fwupd.org/downloads/e5ad222bdbd3d3d48d8613e67c7e0a0e1"
"94f8cd828e33c554d9f05d933e482c7-hughski-colorhug2-2.0.7.cab"
)
sha = "e5ad222bdbd3d3d48d8613e67c7e0a0e194f8cd828e33c554d9f05d933e482c7"
name = url.replace("https://fwupd.org/downloads/", "")
self.q._clean_usbvm()
self.q._validate_usbvm_dirs()
self.q._download_firmware_updates(url, sha)
self.q._copy_firmware_updates(name)
self.q._validate_usbvm_archive(name, sha)
cmd_validate_udpdate = [
"qvm-run",
"--pass-io",
"sys-usb",
"[ -f %s ]" % os.path.join(FWUPD_VM_UPDATES_DIR, name),
]
p = subprocess.Popen(cmd_validate_udpdate)
p.wait()
self.assertEqual(p.returncode, 0, msg="Archive validation failed")
@unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS")
def test_check_usbvm(self):
self.q.check_usbvm()
self.assertIn(XL_LIST_LOG, self.q.output)
@unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS")
def test_bios_refresh_metadata(self):
sys_usb = self.q.check_usbvm()
Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True)
self.q.refresh_metadata_after_bios_update(usbvm=sys_usb)
self.assertEqual(
self.q.output,
"Successfully refreshed metadata manually\n",
msg="Metadata refresh failed.",
)
@unittest.skipUnless(check_usbvm(), REQUIRED_USBVM)
def test_trusted_cleanup(self):
trusted_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, "trusted.cab")
if not os.path.exists(trusted_path):
Path(FWUPD_DOM0_UPDATES_DIR).mkdir(exist_ok=True)
Path(trusted_path).touch(mode=0o644, exist_ok=True)
os.mkdir(trusted_path.replace(".cab", ""))
self.q.trusted_cleanup(usbvm=True)
self.assertFalse(os.path.exists(trusted_path))
self.assertFalse(os.path.exists(trusted_path.replace(".cab", "")))
if __name__ == "__main__":
unittest.main()