diff --git a/contrib/qubes/README.md b/contrib/qubes/README.md index 4b3aa2daf..97b992cc5 100644 --- a/contrib/qubes/README.md +++ b/contrib/qubes/README.md @@ -123,7 +123,7 @@ qvm-copy fwupd-qubes-vm-whonix-_amd64.deb # qubes-dom0-update gcab fwupd python36 ``` -* Make sure that sys-firewall, sys-whonix, and sys-usb (if exists) are running. +* Make sure that sys-firewall and sys-whonix are running. * Compare the SHA sums of the package in dom0 and qubes-builder VM. If they match, install the package: @@ -132,7 +132,7 @@ qvm-copy fwupd-qubes-vm-whonix-_amd64.deb # rpm -U qubes-fwupd-dom0-0.2.0-1.fc32.x86_64.rpm ``` -* Reboot system (or reboot sys-firewall, sys-whonix, and sys-usb) +* Reboot system (or reboot sys-firewall and sys-whonix) * Run the tests to verify the installation process diff --git a/contrib/qubes/meson.build b/contrib/qubes/meson.build index 639ae9e14..688c3f4f6 100644 --- a/contrib/qubes/meson.build +++ b/contrib/qubes/meson.build @@ -28,7 +28,6 @@ install_data([ install_data([ 'src/vms/fwupd_common_vm.py', 'src/vms/fwupd_download_updates.py', - 'src/vms/fwupd_usbvm_validate.py', ], install_dir: 'libexec/qubes-fwupd', install_mode: 'rwxrwxr-x', diff --git a/contrib/qubes/src/fwupd_receive_updates.py b/contrib/qubes/src/fwupd_receive_updates.py index fcb54bee4..208464616 100644 --- a/contrib/qubes/src/fwupd_receive_updates.py +++ b/contrib/qubes/src/fwupd_receive_updates.py @@ -240,12 +240,8 @@ class FwupdReceiveUpdates: self._jcat_verification(self.metadata_file_jcat, FWUPD_DOM0_METADATA_DIR) os.umask(self.old_umask) - def clean_cache(self, usbvm=False): - """Removes updates data - - Keyword arguments: - usbvm -- usbvm support flag - """ + def clean_cache(self): + """Removes updates data""" print("Cleaning dom0 cache directories") if os.path.exists(FWUPD_DOM0_METADATA_DIR): shutil.rmtree(FWUPD_DOM0_METADATA_DIR) @@ -253,6 +249,3 @@ class FwupdReceiveUpdates: shutil.rmtree(FWUPD_DOM0_UPDATES_DIR) if os.path.exists(HEADS_UPDATES_DIR): shutil.rmtree(HEADS_UPDATES_DIR) - if usbvm: - print("Cleaning usbvm cache directories") - self._clean_usbvm() diff --git a/contrib/qubes/src/qubes_fwupd_update.py b/contrib/qubes/src/qubes_fwupd_update.py index fa06a1132..15718a5b5 100644 --- a/contrib/qubes/src/qubes_fwupd_update.py +++ b/contrib/qubes/src/qubes_fwupd_update.py @@ -35,10 +35,6 @@ run_cmd = ( ) -def usbvm_run(args, **kwargs): - return subprocess.check_call((*run_cmd, *args), **kwargs) - - def run_in_tty(updatevm, args, **kwargs): return subprocess.check_call( ( @@ -87,12 +83,12 @@ class FwupdUpdate: raise Exception("Specifying updatevm failed") def _check_updatevm(self): - """Checks if usbvm is running""" + """Checks if updatevm is running""" cmd_xl_list = ["xl", "list"] p = subprocess.Popen(cmd_xl_list, stdout=subprocess.PIPE) output = p.communicate()[0].decode() if p.returncode != 0: - raise Exception("fwupd-qubes: Firmware downgrade failed") + raise Exception("fwupd-qubes: updatevm check failed") return self.updatevm in output def _encrypt_update_url(self, url): diff --git a/contrib/qubes/src/qubes_fwupdmgr.py b/contrib/qubes/src/qubes_fwupdmgr.py index 959489854..dc9f83bff 100755 --- a/contrib/qubes/src/qubes_fwupdmgr.py +++ b/contrib/qubes/src/qubes_fwupdmgr.py @@ -19,7 +19,6 @@ from pathlib import Path from packaging import version as pversion FWUPD_QUBES_DIR = "/usr/share/qubes-fwupd" -USBVM_N = "sys-usb" # Check if script is run by tests and append sys path properly if __name__ == "__main__": @@ -29,18 +28,13 @@ else: try: from qubes_fwupd_heads import FwupdHeads - from qubes_fwupd_update import FwupdUpdate, usbvm_run, run_in_tty + from qubes_fwupd_update import FwupdUpdate, run_in_tty from fwupd_receive_updates import FwupdReceiveUpdates except ModuleNotFoundError: raise ModuleNotFoundError( "qubes-fwupd modules not found. You may need to reinstall package." ) - -def usbvm_run_in_tty(args, **kwargs): - return run_in_tty(USBVM_N, args, **kwargs) - - FWUPD_DOM0_DIR = "/var/cache/qubes-fwupd" FWUPD_DOM0_METADATA_DIR = os.path.join(FWUPD_DOM0_DIR, "metadata") FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates") @@ -49,32 +43,12 @@ FWUPD_DOM0_METADATA_SIGNATURE = os.path.join( ) FWUPD_DOM0_METADATA_FILE = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz") FWUPD_DOM0_METADATA_JCAT = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz.jcat") -FWUPD_VM_LOG = os.path.join(FWUPD_DOM0_DIR, "usbvm-devices.log") -FWUPD_VM_VALIDATE = "/usr/libexec/qubes-fwupd/fwupd_usbvm_validate.py" -FWUPD_VM_DIR = "/home/user/.cache/fwupd" -FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") -FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -FWUPD_VM_METADATA_SIGNATURE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.asc") -FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") -FWUPD_VM_METADATA_JCAT = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat") FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/" FWUPDMGR = "/bin/fwupdmgr" - -def usbvm_create_file(fname, dest, **kwargs): - with open(fname, "rb") as stdin: - subprocess.check_call( - [*usbvm_run_cmd, "dd", "of=" + dest, "bs=128K", "conv=fsync"], - stdin=stdin, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - BIOS_UPDATE_FLAG = os.path.join(FWUPD_DOM0_DIR, "bios_update") LVFS_TESTING_DOM0_FLAG = os.path.join(FWUPD_DOM0_DIR, "lvfs_testing") -LVFS_TESTING_USBVM_FLAG = os.path.join(FWUPD_VM_DIR, "lvfs_testing") HELP = { "Usage": [ @@ -120,114 +94,10 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): if not os.path.exists(self.metadata_file): raise FileNotFoundError("Metadata file does not exist") - def _validate_usbvm_dirs(self): - """Validates if sys-ubs updates and metadata directories exist.""" - try: - usbvm_run_in_tty((FWUPD_VM_VALIDATE, "dirs")) - except subprocess.CalledProcessError: - raise Exception("Validation of usbvm directories failed.") - - def _validate_usbvm_archive(self, arch_name, sha): - """Validates checksum and gpg signature of the archive file.""" - arch_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name) - try: - usbvm_run_in_tty((FWUPD_VM_VALIDATE, "updates", arch_path, sha)) - except subprocess.CalledProcessError: - raise Exception("Validation of the archive file failed.") - - def _copy_usbvm_metadata(self): - """Copies metadata files to usbvm.""" - self.metadata_file_usbvm = self.metadata_file.replace( - FWUPD_DOM0_METADATA_DIR, FWUPD_VM_METADATA_DIR - ) - self.metadata_file_jcat_usbvm = self.metadata_file_usbvm + ".jcat" - usbvm_create_file(self.metadata_file, self.metadata_file_usbvm) - usbvm_create_file(self.metadata_file_jcat, self.metadata_file_jcat_usbvm) - - def _validate_usbvm_metadata(self, metadata_url=None): - """Checks GPG signature of metadata files in usbvm.""" - usbvm_cmd = f'"{FWUPD_VM_VALIDATE} metadata"' - cmd_metadata = [FWUPD_VM_DOWNLOAD, "metadata"] - if metadata_url: - cmd_metadata.append("--url=" + metadata_url) - p = subprocess.Popen(cmd_validate_metadata) - try: - usbvm_run_in_tty(cmd_metadata) - except subprocess.CalledProcessError: - raise Exception("Metadata validation failed.") - - def _refresh_usbvm_metadata(self): - """Refreshes metadata in usbvm.""" - sig_metadata_file = self.metadata_file_jcat_usbvm - usbvm_run_in_tty( - ( - FWUPDMGR, - "refresh", - self.metadata_file_usbvm, - sig_metadata_file, - self.lvfs, - ) - ) - - def _copy_firmware_updates(self, arch_name): - """Copies updates files to usbvm. - - Keywords arguments: - arch_name - name of the archive file - """ - arch_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, arch_name) - output_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name) - usbvm_create_file(arch_path, output_path) - - def _install_usbvm_firmware_update(self, arch_name): - """Installs firmware update for specified device in dom0. - - Keywords arguments: - arch_name - name of the archive file - """ - arch_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name) - usbvm_run_in_tty((FWUPDMGR, "--", "install", arch_path)) - - def _install_usbvm_firmware_downgrade(self, arch_name): - """Installs firmware downgrades for specified device in dom0. - - Keywords arguments: - arch_name - name of the archive file - """ - arch_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name) - usbvm_run_in_tty((FWUPDMGR, "--allow-older", "--", "install", arch_path)) - - def _clean_usbvm(self): - """Cleans usbvm directories.""" - try: - usbvm_run_in_tty((FWUPDMGR, "clean")) - except subprocess.CalledProcessError: - raise Exception("Cleaning usbvm directories failed") - - def _enable_lvfs_testing_dom0(self): - """Checks and enable lvfs-testing for custom metadata in dom0""" - cmd_lvfs_testing = [FWUPDMGR, "enable-remote", "-y", "lvfs-testing"] - if not os.path.exists(LVFS_TESTING_DOM0_FLAG): - p = subprocess.Popen(cmd_lvfs_testing) - p.wait() - if p.returncode != 0: - raise Exception("Enabling dom0 lvfs-testing failed!!") - Path(LVFS_TESTING_DOM0_FLAG).touch(mode=0o644, exist_ok=False) - - def _enable_lvfs_testing_usbvm(self, usbvm=False): - """Checks and enable lvfs-testing for custom metadata in usbvm""" - if not usbvm: - return - try: - usbvm_run_in_tty((FWUPDMGR, "enable-remote", "-y", "lvfs-testing")) - except subprocess.CalledProcessError: - raise Exception("Enabling usbvm lvfs-testing failed!!") - - def refresh_metadata(self, usbvm=False, whonix=False, metadata_url=None): + def refresh_metadata(self, whonix=False, metadata_url=None): """Updates metadata with downloaded files. Keyword arguments: - usbvm -- usbvm support flag whonix -- Flag enforces downloading the metadata updates via Tor metadata_url -- Use custom metadata from the url """ @@ -237,17 +107,11 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): self.metadata_file_jcat = self.metadata_file + ".jcat" self.lvfs = "lvfs-testing" self._enable_lvfs_testing_dom0() - self._enable_lvfs_testing_usbvm(usbvm=usbvm) else: self.metadata_file = FWUPD_DOM0_METADATA_FILE self.metadata_file_jcat = FWUPD_DOM0_METADATA_JCAT self.lvfs = "lvfs" self._download_metadata(whonix=whonix, metadata_url=metadata_url) - if usbvm: - self._validate_usbvm_dirs() - self._copy_usbvm_metadata() - self._validate_usbvm_metadata(metadata_url=metadata_url) - self._refresh_usbvm_metadata() cmd_refresh = [ FWUPDMGR, "refresh", @@ -311,7 +175,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): if not os.path.exists(update_path): raise NotADirectoryError("Firmware update files do not exist") - def _user_input(self, updates_dict, downgrade=False, usbvm=False): + def _user_input(self, updates_list, downgrade=False): """UI for update process. Keywords arguments: @@ -319,11 +183,6 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): downgrade -- downgrade flag """ decorator = "======================================================" - if usbvm: - updates_list = updates_dict["dom0"] + updates_dict["usbvm"] - else: - updates_list = updates_dict["dom0"] - dom0_updates_num = len(updates_dict["dom0"]) if len(updates_list) == 0: print("No updates available.") return EXIT_CODES["NOTHING_TO_DO"] @@ -331,11 +190,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): print("Available downgrades:") else: print("Available updates:") - self._updates_crawler(updates_dict["dom0"]) - if usbvm: - self._updates_crawler( - updates_dict["usbvm"], usbvm=True, prefix=dom0_updates_num - ) + self._updates_crawler(updates_list) while True: try: @@ -346,10 +201,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): device_num = int(choice) - 1 if 0 <= device_num < len(updates_list): if not downgrade: - if device_num >= dom0_updates_num: - return "usbvm", device_num - dom0_updates_num - else: - return "dom0", device_num + return device_num break else: raise ValueError() @@ -379,27 +231,22 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): return EXIT_CODES["NOTHING_TO_DO"] downgrade_num = int(choice) - 1 if 0 <= downgrade_num < len(releases): - if device_num >= dom0_updates_num: - device_abs_num = device_num - dom0_updates_num - return "usbvm", device_abs_num, downgrade_num - else: - return "dom0", device_num, downgrade_num + return device_num, downgrade_num else: raise ValueError() except ValueError: print("Invalid choice.") - def _parse_parameters(self, updates_dict, vm_name, choice): + def _parse_parameters(self, updates_list, choice): """Parses device name, url, version and SHA256 checksum of the file list. Keywords arguments: - updates_dict - dictionary of updates for dom0 and usbvm - vm_name - VM name + updates_list - list of updates for dom0 choice -- number of device to be updated """ - self.name = updates_dict[vm_name][choice]["Name"] - self.version = updates_dict[vm_name][choice]["Releases"][0]["Version"] - for ver_check in updates_dict[vm_name][choice]["Releases"]: + self.name = updates_list[choice]["Name"] + self.version = updates_list[choice]["Releases"][0]["Version"] + for ver_check in updates_list[choice]["Releases"]: if pversion.parse(ver_check["Version"]) >= pversion.parse(self.version): self.version = ver_check["Version"] self.url = ver_check["Url"] @@ -460,95 +307,26 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): if p.returncode != 0: raise Exception("fwupd-qubes: Getting devices info failed") - def _get_usbvm_devices(self): - """Gathers information about devices connected in usbvm.""" - if os.path.exists(FWUPD_VM_LOG): - os.remove(FWUPD_VM_LOG) - with open(FWUPD_VM_LOG, "wb") as log: - p = usbvm_run( - [FWUPDAGENT, "get-devices"], - stdout=log, - stderr=log, - stdin=subprocess.DEVNULL, - ) - if p.returncode != 0 and p.returncode != 2 and not os.path.exists(FWUPD_VM_LOG): - raise Exception("fwupd-qubes: Getting usbvm devices info failed") - if not os.path.exists(FWUPD_VM_LOG): - raise Exception("usbvm device info log does not exist") - - def _parse_usbvm_updates(self, usbvm_devices_info): - """Creates dictionary and list with information about updates. - - Keywords argument: - usbvm_devices_info - gathered usbvm information - """ - self.usbvm_updates_list = [] - if "No detected devices" in usbvm_devices_info: - return EXIT_CODES["NOTHING_TO_DO"] - usbvm_device_info_dict = json.loads(usbvm_devices_info) - for device in usbvm_device_info_dict["Devices"]: - if "Releases" in device: - self.usbvm_updates_list.append( - { - "Name": device["Name"], - "Version": device["Version"], - "Releases": [], - } - ) - current_version = device["Version"] - for update in device["Releases"]: - if pversion.parse(update["Version"]) > pversion.parse( - current_version - ): - self.usbvm_updates_list[-1]["Releases"].append( - { - "Version": update["Version"], - "Url": update["Uri"], - "Checksum": update["Checksum"][-1], - "Description": update["Description"], - } - ) - if not self.usbvm_updates_list[-1]["Releases"]: - self.usbvm_updates_list.pop() - - def update_firmware(self, usbvm=False, whonix=False): + def update_firmware(self, whonix=False): """Updates firmware of the specified device. Keyword arguments: - usbvm -- usbvm support flag whonix -- Flag enforces downloading the metadata updates via Tor """ self._get_dom0_updates() self._parse_dom0_updates_info(self.dom0_updates_info) - if usbvm: - self._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - self._parse_usbvm_updates(raw) - update_dict = { - "usbvm": self.usbvm_updates_list, - "dom0": self.dom0_updates_list, - } - ret_input = self._user_input(update_dict, usbvm=True) - else: - update_dict = {"dom0": self.dom0_updates_list} - ret_input = self._user_input(update_dict) + updates_list = self.dom0_updates_list + ret_input = self._user_input(updates_list) if ret_input == EXIT_CODES["NOTHING_TO_DO"]: exit(EXIT_CODES["NOTHING_TO_DO"]) - vm_name, choice = ret_input - self._parse_parameters(update_dict, vm_name, choice) + choice = ret_input + self._parse_parameters(updates_list, choice) self._download_firmware_updates(self.url, self.sha, whonix=whonix) if self.name == "System Firmware": Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True) extracted_path = self.arch_path.replace(".cab", "") self._verify_dmi(extracted_path, self.version) - if vm_name == "dom0": - self._install_dom0_firmware_update(self.arch_path) - if vm_name == "usbvm": - self._validate_usbvm_dirs() - self._copy_firmware_updates(self.arch_name) - self._validate_usbvm_archive(self.arch_name, self.sha) - self._install_usbvm_firmware_update(self.arch_name) + self._install_dom0_firmware_update(self.arch_path) def _parse_downgrades(self, device_list): """Parses information about possible downgrades. @@ -597,49 +375,34 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): if p.returncode != 0: raise Exception("fwupd-qubes: Firmware downgrade failed") - def downgrade_firmware(self, usbvm=False, whonix=False): + def downgrade_firmware(self, whonix=False): """Downgrades firmware of the specified device. Keyword arguments: - usbvm -- usbvm support flag whonix -- Flag enforces downloading the metadata updates via Tor """ self._get_dom0_devices() dom0_downgrades = self._parse_downgrades(self.dom0_devices_info) - if usbvm: - self._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - usbvm_downgrades = self._parse_downgrades(raw) - downgrade_dict = {"usbvm": usbvm_downgrades, "dom0": dom0_downgrades} - ret_input = self._user_input(downgrade_dict, downgrade=True, usbvm=True) - else: - downgrade_dict = {"dom0": dom0_downgrades} - ret_input = self._user_input(downgrade_dict, downgrade=True) + ret_input = self._user_input(dom0_downgrades, downgrade=True) if ret_input == EXIT_CODES["NOTHING_TO_DO"]: exit(EXIT_CODES["NOTHING_TO_DO"]) - vm_name, device_choice, downgrade_choice = ret_input - releases = downgrade_dict[vm_name][device_choice]["Releases"] + device_choice, downgrade_choice = ret_input + downgrade = dom0_downgrades[device_choice] + releases = downgrade["Releases"] downgrade_url = releases[downgrade_choice]["Url"] downgrade_sha = releases[downgrade_choice]["Checksum"] self._download_firmware_updates(downgrade_url, downgrade_sha, whonix=whonix) - if downgrade_dict[vm_name][device_choice]["Name"] == "System Firmware": + if downgrade["Name"] == "System Firmware": Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True) extracted_path = self.arch_path.replace(".cab", "") self._verify_dmi( extracted_path, - downgrade_dict[vm_name][device_choice]["Version"], + downgrade["Version"], downgrade=True, ) - if vm_name == "dom0": - self._install_dom0_firmware_downgrade(self.arch_path) - if vm_name == "usbvm": - self._validate_usbvm_dirs() - self._copy_firmware_updates(self.arch_name) - self._validate_usbvm_archive(self.arch_name, downgrade_sha) - self._install_usbvm_firmware_downgrade(self.arch_name) + self._install_dom0_firmware_downgrade(self.arch_path) - def _output_crawler(self, updev_dict, level, help_f=False, dom0=True): + def _output_crawler(self, updev_dict, level, help_f=False): """Prints device and updates information as a tree. Keywords arguments: @@ -679,29 +442,23 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): if level == 0 and help_f is True: print(output) else: - if level == 0 and dom0 is True: + if level == 0: print(f"Dom0 {output}") - elif level == 0 and dom0 is False: - print(f"{USBVM_N} {output}") for nested_dict in updev_dict[updev_key]: self._output_crawler(nested_dict, level + 1) - def _updates_crawler(self, updates_list, usbvm=False, prefix=0): - """Prints updates information for dom0 and usbvm + def _updates_crawler(self, updates_list, prefix=0): + """Prints updates information for dom0 Keywords arguments: updates_list -- list of devices updates - usbvm -- usbvm support flag prefix -- device number prefix """ available_updates = False decorator = "======================================================" print(decorator) - if usbvm: - print(f"{USBVM_N} updates:") - else: - print("Dom0 updates:") + print("Dom0 updates:") print(decorator) if len(updates_list) == 0: print("No updates available.") @@ -734,80 +491,37 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): print("No updates available.") return EXIT_CODES["NOTHING_TO_DO"] - def get_devices_qubes(self, usbvm=False): - """Gathers and prints devices information. - - Keyword arguments: - usbvm -- usbvm support flag - """ + def get_devices_qubes(self): + """Gathers and prints devices information.""" self._get_dom0_devices() dom0_devices_info_dict = json.loads(self.dom0_devices_info) self._output_crawler(dom0_devices_info_dict, 0) - if usbvm: - self._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - if "No detected devices" not in raw: - usbvm_device_info_dict = json.loads(raw) - else: - print(f"No detected devices in {USBVM_N}") - return EXIT_CODES["NOTHING_TO_DO"] - self._output_crawler(usbvm_device_info_dict, 0, dom0=False) - def get_updates_qubes(self, usbvm=False): - """Gathers and prints updates information. - - Keyword arguments: - usbvm -- usbvm support flag - """ + def get_updates_qubes(self): + """Gathers and prints updates information.""" self._get_dom0_updates() self._parse_dom0_updates_info(self.dom0_updates_info) self._updates_crawler(self.dom0_updates_list) - if usbvm: - self._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - self._parse_usbvm_updates(raw) - self._updates_crawler(self.usbvm_updates_list, usbvm=True) def help(self): """Prints help information""" self._output_crawler(HELP, 0, help_f=True) - def check_usbvm(self): - """Checks if usbvm is running""" - cmd_xl_list = ["xl", "list"] - p = subprocess.Popen(cmd_xl_list, stdout=subprocess.PIPE) - self.output = p.communicate()[0].decode() - if p.returncode != 0: - raise Exception("fwupd-qubes: Firmware downgrade failed") - return USBVM_N in self.output - - def trusted_cleanup(self, usbvm=False): - """Deletes trusted directory. - - Keyword arguments: - usbvm -- usbvm support flag - """ + def trusted_cleanup(self): + """Deletes trusted directory.""" trusted_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, "trusted.cab") if os.path.exists(trusted_path): os.remove(trusted_path) shutil.rmtree(trusted_path.replace(".cab", "")) - if usbvm: - self._clean_usbvm() - def refresh_metadata_after_bios_update(self, usbvm=False): - """Refreshes metadata after bios update - - Keyword arguments: - usbvm -- usbvm support flag - """ + def refresh_metadata_after_bios_update(self): + """Refreshes metadata after bios update""" if os.path.exists(BIOS_UPDATE_FLAG): print("BIOS was updated. Refreshing metadata...") if "--whonix" in sys.argv: - self.refresh_metadata(usbvm=usbvm, whonix=True) + self.refresh_metadata(whonix=True) else: - self.refresh_metadata(usbvm=usbvm) + self.refresh_metadata() os.remove(BIOS_UPDATE_FLAG) def heads_update(self, device="x230", whonix=False, metadata_url=None): @@ -875,16 +589,15 @@ def main(): exit(EXIT_CODES["ERROR"]) q = QubesFwupdmgr() - sys_usb = q.check_usbvm() q.validate_dom0_dirs() - q.trusted_cleanup(usbvm=sys_usb) - q.refresh_metadata_after_bios_update(usbvm=sys_usb) + q.trusted_cleanup() + q.refresh_metadata_after_bios_update() metadata_url = None device = "x230" if not os.path.exists(FWUPD_DOM0_DIR): - q.refresh_metadata(usbvm=sys_usb) + q.refresh_metadata() if len(sys.argv) < 2: q.help() @@ -903,23 +616,23 @@ def main(): device = arg.replace("--device=", "") if sys.argv[1] == "get-updates": - q.get_updates_qubes(usbvm=sys_usb) + q.get_updates_qubes() elif sys.argv[1] == "get-devices": - q.get_devices_qubes(usbvm=sys_usb) + q.get_devices_qubes() elif sys.argv[1] == "update" and "--whonix" in sys.argv: - q.update_firmware(usbvm=sys_usb, whonix=True) + q.update_firmware(whonix=True) elif sys.argv[1] == "update" and "--whonix" not in sys.argv: - q.update_firmware(usbvm=sys_usb) + q.update_firmware() elif sys.argv[1] == "downgrade" and "--whonix" in sys.argv: - q.downgrade_firmware(usbvm=sys_usb, whonix=True) + q.downgrade_firmware(whonix=True) elif sys.argv[1] == "downgrade" and "--whonix" not in sys.argv: - q.downgrade_firmware(usbvm=sys_usb) + q.downgrade_firmware() elif sys.argv[1] == "clean": - q.clean_cache(usbvm=sys_usb) + q.clean_cache() elif sys.argv[1] == "refresh" and "--whonix" not in sys.argv: - q.refresh_metadata(usbvm=sys_usb, metadata_url=metadata_url) + q.refresh_metadata(metadata_url=metadata_url) elif sys.argv[1] == "refresh" and "--whonix" in sys.argv: - q.refresh_metadata(usbvm=sys_usb, whonix=True, metadata_url=metadata_url) + q.refresh_metadata(whonix=True, metadata_url=metadata_url) elif sys.argv[1] == "update-heads" and "--whonix" not in sys.argv: q.heads_update(device=device, metadata_url=metadata_url) elif sys.argv[1] == "update-heads" and "--whonix" in sys.argv: diff --git a/contrib/qubes/src/vms/fwupd_usbvm_validate.py b/contrib/qubes/src/vms/fwupd_usbvm_validate.py deleted file mode 100644 index f39b8e76a..000000000 --- a/contrib/qubes/src/vms/fwupd_usbvm_validate.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/python3 -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2021 Norbert KamiƄski -# -# SPDX-License-Identifier: LGPL-2.1+ -# - -import glob -import os -import shutil -import subprocess -import sys - -from fwupd_common_vm import FwupdVmCommon - -FWUPD_VM_DIR = "/home/user/.cache/fwupd" -FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") -FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -FWUPD_VM_METADATA_JCAT = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat") -FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") -FWUPDMGR = "/bin/fwupdmgr" -FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/" - - -class FwupdUsbvmUpdates(FwupdVmCommon): - def _verify_received(self, files_path, regex_pattern): - """Checks if sent files match regex filename pattern. - - Keyword arguments: - - files_path -- absolute path to inspected directory - regex_pattern -- pattern of the expected files - """ - for untrusted_f in os.listdir(files_path): - if not regex_pattern.match(untrusted_f): - raise Exception("Dom0 sent unexpected file") - f = untrusted_f - assert "/" not in f - assert "\0" not in f - assert "\x1b" not in f - path_f = os.path.join(files_path, f) - if os.path.islink(path_f) or not os.path.isfile(path_f): - raise Exception("Dom0 sent not regular file") - - def validate_metadata(self, metadata_url=None): - """Validates received the metadata files.""" - print("Running validation of the metadata files") - if metadata_url: - metadata_name = metadata_url.replace(FWUPD_DOWNLOAD_PREFIX, "") - metadata_file = os.path.join(FWUPD_VM_METADATA_DIR, metadata_name) - else: - metadata_file = FWUPD_VM_METADATA_FILE - try: - self._jcat_verification(f"{metadata_file}.jcat", FWUPD_VM_METADATA_DIR) - except Exception as e: - print(str(e), file=sys.stderr) - self.clean_vm_cache() - exit(1) - - def validate_updates(self, archive_path, sha): - """Validates received an update file. - - Keyword arguments: - archive_path - path to the firmware update archive - sha -- SHA256 checksum of the firmware update archive - """ - print("Running validation of the update archive") - self.check_shasum(archive_path, sha) - archive_name = archive_path.replace(f"{FWUPD_VM_UPDATES_DIR}/", "") - output_path = archive_path.replace(".cab", "") - arch_temp = os.path.join(output_path, archive_name) - os.mkdir(output_path) - # jcat verification will be done by fwupd itself - shutil.copyfile(archive_path, arch_temp) - - -def main(): - f = FwupdUsbvmUpdates() - f_val = FwupdVmCommon() - metadata_url = None - if len(sys.argv) < 2: - raise Exception("Invalid number of arguments.") - for arg in sys.argv: - if "--url=" in arg: - metadata_url = arg.replace("--url=", "") - if sys.argv[1] == "metadata": - f.validate_metadata(metadata_url=metadata_url) - elif sys.argv[1] == "dirs": - f_val.validate_vm_dirs() - elif sys.argv[1] == "clean": - f.clean_vm_cache() - elif sys.argv[1] == "updates" and len(sys.argv) < 4: - raise Exception( - "Invalid number of arguments.\n" "Expected archive path and checksum." - ) - elif sys.argv[1] == "updates" and not len(sys.argv) < 4: - f.validate_updates(sys.argv[2], sys.argv[3]) - else: - raise Exception("Invalid command") - - -if __name__ == "__main__": - main() diff --git a/contrib/qubes/test/test_qubes_fwupdmgr.py b/contrib/qubes/test/test_qubes_fwupdmgr.py index d1b554c29..71070b6e2 100755 --- a/contrib/qubes/test/test_qubes_fwupdmgr.py +++ b/contrib/qubes/test/test_qubes_fwupdmgr.py @@ -33,37 +33,17 @@ elif os.path.exists(QUBES_FWUPDMGR_BINDIR): FWUPD_DOM0_DIR = "/var/cache/qubes-fwupd" FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates") FWUPD_DOM0_UNTRUSTED_DIR = os.path.join(FWUPD_DOM0_UPDATES_DIR, "untrusted") -FWUPD_VM_LOG = os.path.join(FWUPD_DOM0_DIR, "usbvm-devices.log") FWUPD_DOM0_METADATA_DIR = os.path.join(FWUPD_DOM0_DIR, "metadata") FWUPD_DOM0_METADATA_FILE = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz") FWUPD_DOM0_METADATA_FILE_JCAT = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz") -FWUPD_VM_DIR = "/home/user/.cache/fwupd" -FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") -FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") -FWUPD_VM_METADATA_FILE_JCAT = os.path.join( - FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat" -) REQUIRED_DEV = "Requires device not connected" -REQUIRED_USBVM = "Requires sys-usb" XL_LIST_LOG = "Name ID Mem VCPUs State Time(s)" -USBVM_N = "sys-usb" FWUPDMGR = "/bin/fwupdmgr" BIOS_UPDATE_FLAG = os.path.join(FWUPD_DOM0_DIR, "bios_update") LVFS_TESTING_DOM0_FLAG = os.path.join(FWUPD_DOM0_DIR, "lvfs_testing") -LVFS_TESTING_USBVM_FLAG = os.path.join(FWUPD_VM_DIR, "lvfs_testing") CUSTOM_METADATA = "https://fwupd.org/downloads/firmware-3c81bfdc9db5c8a42c09d38091944bc1a05b27b0.xml.gz" -def check_usbvm(): - """Checks if sys-usb is running""" - if "qubes" not in platform.release(): - return False - q = qfwupd.QubesFwupdmgr() - q.check_usbvm() - return "sys-usb" in q.output - - def device_connected_dom0(): """Checks if the testing device is connected in dom0""" if "qubes" not in platform.release(): @@ -73,25 +53,11 @@ def device_connected_dom0(): return "ColorHug2" in q.dom0_devices_info -def device_connected_usbvm(): - """Checks if the testing device is connected in usbvm""" - if not check_usbvm(): - return False - q = qfwupd.QubesFwupdmgr() - q._validate_usbvm_dirs() - if not os.path.exists(FWUPD_DOM0_DIR): - q.refresh_metadata() - q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - return "ColorHug2" in usbvm_device_info.read() - - def check_whonix_updatevm(): """Checks if the sys-whonix is running""" if "qubes" not in platform.release(): return False q = qfwupd.QubesFwupdmgr() - q.check_usbvm() return "sys-whonix" in q.output @@ -162,24 +128,6 @@ class TestQubesFwupdmgr(unittest.TestCase): msg="Metadata refresh failed.", ) - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_refresh_metadata_usbvm(self): - self.q.refresh_metadata(usbvm=True) - self.assertEqual( - self.q.output, - "Successfully refreshed metadata manually\n", - msg="Metadata refresh failed.", - ) - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_refresh_metadata_usbvm_custom(self): - self.q.refresh_metadata(usbvm=True, metadata_url=CUSTOM_METADATA) - self.assertEqual( - self.q.output, - "Successfully refreshed metadata manually\n", - msg="Metadata refresh failed.", - ) - @unittest.skipUnless(check_whonix_updatevm(), "Requires sys-whonix") def test_refresh_metadata_whonix(self): self.q.refresh_metadata(whonix=True) @@ -245,57 +193,30 @@ class TestQubesFwupdmgr(unittest.TestCase): self.assertTrue(os.path.exists(update_path)) def test_user_input_empty_dict(self): - downgrade_dict = {"usbvm": [], "dom0": []} - self.assertEqual(self.q._user_input(downgrade_dict), 2) + self.assertEqual(self.q._user_input([]), 2) def test_user_input_n(self): user_input = ["sth", "n"] with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) - downgrade_dict = { - "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list, - } - choice = self.q._user_input(downgrade_dict, usbvm=True) + choice = self.q._user_input(self.q.dom0_updates_list) self.assertEqual(choice, 2) user_input = ["sth", "N"] with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) - downgrade_dict = { - "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list, - } - choice = self.q._user_input(downgrade_dict, usbvm=True) + choice = self.q._user_input(self.q.dom0_updates_list) self.assertEqual(choice, 2) def test_user_input_choice(self): user_input = ["6", "1"] with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) - updates_dict = { - "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list, - } - key, choice = self.q._user_input(updates_dict) - self.assertEqual(key, "dom0") - self.assertEqual(choice, 0) - - def test_user_input_choice_usbvm(self): - user_input = ["6", "2"] - with patch("builtins.input", side_effect=user_input): - self.q._parse_dom0_updates_info(UPDATE_INFO) - updates_dict = { - "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list, - } - key, choice = self.q._user_input(updates_dict, usbvm=True) - self.assertEqual(key, "usbvm") + choice = self.q._user_input(self.q.dom0_updates_list) self.assertEqual(choice, 0) def test_parse_parameters(self): self.q._parse_dom0_updates_info(UPDATE_INFO) - update_dict = {"dom0": self.q.dom0_updates_list} - self.q._parse_parameters(update_dict, "dom0", 0) + self.q._parse_parameters(self.q.dom0_updates_list, 0) self.assertEqual( self.q.url, "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab", @@ -312,31 +233,6 @@ class TestQubesFwupdmgr(unittest.TestCase): self.assertFalse(os.path.exists(FWUPD_DOM0_METADATA_DIR)) self.assertFalse(os.path.exists(FWUPD_DOM0_UNTRUSTED_DIR)) - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_clean_cache_dom0_n_usbvm(self): - self.q._validate_usbvm_dirs() - self.q.clean_cache(usbvm=True) - self.assertFalse(os.path.exists(FWUPD_DOM0_METADATA_DIR)) - self.assertFalse(os.path.exists(FWUPD_DOM0_UNTRUSTED_DIR)) - cmd_validate_metadata = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"! [ -d {FWUPD_VM_METADATA_DIR} ]", - ] - p = subprocess.Popen(cmd_validate_metadata) - p.wait() - self.assertEqual(p.returncode, 0, msg="Creating metadata directory failed") - cmd_validate_udpdate = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"! [ -d {FWUPD_VM_UPDATES_DIR} ]", - ] - p = subprocess.Popen(cmd_validate_udpdate) - p.wait() - self.assertEqual(p.returncode, 0, msg="Cleaning update directory failed") - def test_output_crawler(self): crawler_output = io.StringIO() sys.stdout = crawler_output @@ -360,14 +256,6 @@ class TestQubesFwupdmgr(unittest.TestCase): self.assertNotEqual(get_devices_output.getvalue().strip(), "") sys.stdout = self.captured_output - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_get_devices_qubes_usbvm(self): - get_devices_output = io.StringIO() - sys.stdout = get_devices_output - self.q.get_devices_qubes(usbvm=True) - self.assertNotEqual(get_devices_output.getvalue().strip(), "") - sys.stdout = self.captured_output - @unittest.skipUnless(device_connected_dom0(), REQUIRED_DEV) def test_get_updates_qubes_dom0(self): get_updates_output = io.StringIO() @@ -376,14 +264,6 @@ class TestQubesFwupdmgr(unittest.TestCase): self.assertNotEqual(get_updates_output.getvalue().strip(), "") sys.stdout = self.captured_output - @unittest.skipUnless(device_connected_usbvm(), REQUIRED_DEV) - def test_get_updates_qubes_usbvm(self): - get_updates_output = io.StringIO() - sys.stdout = get_updates_output - self.q.get_updates_qubes(usbvm=True) - self.assertNotEqual(get_updates_output.getvalue().strip(), "") - sys.stdout = self.captured_output - def test_help(self): help_output = io.StringIO() sys.stdout = help_output @@ -441,95 +321,6 @@ class TestQubesFwupdmgr(unittest.TestCase): new_version = downgrades[number]["Version"] self.assertGreater(Version(old_version), Version(new_version)) - @unittest.skipUnless( - check_whonix_updatevm() and device_connected_usbvm(), REQUIRED_DEV - ) - def test_update_n_downgrade_firmware_whonix(self): - old_version = None - self.q.clean_cache(usbvm=True) - self.q._get_dom0_devices() - dom0_downgrades = self.q._parse_downgrades(self.q.dom0_devices_info) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - downgrades = self.q._parse_downgrades(raw) - for number, device in enumerate(downgrades): - if "Name" not in device: - continue - if device["Name"] == "ColorHug2": - old_version = device["Version"] - break - if old_version is None: - self.fail("Test device not found") - user_input = [str(number + 1 + len(dom0_downgrades)), "1"] - with patch("builtins.input", side_effect=user_input): - self.q.downgrade_firmware(usbvm=True, whonix=True) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - downgrades = self.q._parse_downgrades(raw) - new_version = downgrades[number]["Version"] - self.assertGreater(Version(old_version), Version(new_version)) - old_version = None - new_version = None - self.q._get_dom0_updates() - self.q._parse_dom0_updates_info(self.q.dom0_updates_info) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - self.q._parse_usbvm_updates(raw) - for number, device in enumerate(self.q.usbvm_updates_list): - if "Name" not in device: - continue - if device["Name"] == "ColorHug2": - old_version = device["Version"] - break - if old_version is None: - self.fail("Test device not found") - user_input = [str(number + 1 + len(self.q.dom0_updates_list)), "1"] - with patch("builtins.input", side_effect=user_input): - self.q.update_firmware(usbvm=True, whonix=True) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - usbvm_devices_info_dict = json.loads(raw) - for device in usbvm_devices_info_dict["Devices"]: - if "Name" not in device: - continue - if device["Name"] == "ColorHug2": - new_version = device["Version"] - break - if new_version is None: - self.fail("Test device not found") - self.assertLess(Version(old_version), Version(new_version)) - - @unittest.skipUnless(device_connected_usbvm(), REQUIRED_DEV) - def test_downgrade_firmware_usbvm(self): - old_version = None - self.q._get_dom0_devices() - dom0_downgrades = self.q._parse_downgrades(self.q.dom0_devices_info) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - downgrades = self.q._parse_downgrades(raw) - for number, device in enumerate(downgrades): - if "Name" not in device: - continue - if device["Name"] == "ColorHug2": - old_version = device["Version"] - break - if old_version is None: - self.fail("Test device not found") - user_input = [str(number + 1 + len(dom0_downgrades)), "1"] - with patch("builtins.input", side_effect=user_input): - self.q.downgrade_firmware(usbvm=True) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - downgrades = self.q._parse_downgrades(raw) - new_version = downgrades[number]["Version"] - self.assertGreater(Version(old_version), Version(new_version)) - def test_parse_downgrades(self): downgrades = self.q._parse_downgrades(GET_DEVICES) self.assertEqual(downgrades[0]["Name"], "ColorHug2") @@ -558,18 +349,6 @@ class TestQubesFwupdmgr(unittest.TestCase): "4ee9dfa38df3b810f739d8a19d13da1b3175fb87", ) - def test_user_input_downgrade_usbvm(self): - user_input = ["2", "6", "sth", "2.2.1", "", " ", "\0", "2"] - with patch("builtins.input", side_effect=user_input): - downgrade_list = self.q._parse_downgrades(GET_DEVICES) - downgrade_dict = {"usbvm": downgrade_list, "dom0": downgrade_list} - key, device_choice, downgrade_choice = self.q._user_input( - downgrade_dict, downgrade=True, usbvm=True - ) - self.assertEqual(key, "usbvm") - self.assertEqual(device_choice, 0) - self.assertEqual(downgrade_choice, 1) - def test_user_input_downgrade_dom0(self): user_input = ["1", "6", "sth", "2.2.1", "", " ", "\0", "2"] with patch("builtins.input", side_effect=user_input): @@ -586,8 +365,7 @@ class TestQubesFwupdmgr(unittest.TestCase): user_input = ["N"] with patch("builtins.input", side_effect=user_input): downgrade_list = self.q._parse_downgrades(GET_DEVICES) - downgrade_dict = {"usbvm": downgrade_list, "dom0": downgrade_list} - N_choice = self.q._user_input(downgrade_dict, downgrade=True, usbvm=True) + N_choice = self.q._user_input(downgrade_list, downgrade=True) self.assertEqual(N_choice, 2) @unittest.skipUnless(device_connected_dom0(), REQUIRED_DEV) @@ -619,126 +397,6 @@ class TestQubesFwupdmgr(unittest.TestCase): self.fail("Test device not found") self.assertLess(Version(old_version), Version(new_version)) - @unittest.skipUnless(device_connected_usbvm(), REQUIRED_DEV) - def test_update_firmware_usbvm(self): - old_version = None - new_version = None - self.q._get_dom0_updates() - self.q._parse_dom0_updates_info(self.q.dom0_updates_info) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - self.q._parse_usbvm_updates(raw) - for number, device in enumerate(self.q.usbvm_updates_list): - if "Name" not in device: - continue - if device["Name"] == "ColorHug2": - old_version = device["Version"] - break - if old_version is None: - self.fail("Test device not found") - user_input = [str(number + 1 + len(self.q.dom0_updates_list)), "1"] - with patch("builtins.input", side_effect=user_input): - self.q.update_firmware(usbvm=True) - self.q._get_usbvm_devices() - with open(FWUPD_VM_LOG) as usbvm_device_info: - raw = usbvm_device_info.read() - usbvm_devices_info_dict = json.loads(raw) - for device in usbvm_devices_info_dict["Devices"]: - if "Name" not in device: - continue - if device["Name"] == "ColorHug2": - new_version = device["Version"] - break - if new_version is None: - self.fail("Test device not found") - self.assertLess(Version(old_version), Version(new_version)) - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_get_usbvm_devices(self): - self.q._get_usbvm_devices() - self.assertTrue(os.path.exists(FWUPD_VM_LOG)) - - def test_parse_usbvm_updates(self): - self.q._parse_usbvm_updates(GET_DEVICES) - self.assertEqual(self.q.usbvm_updates_list[0]["Name"], "ColorHug2") - self.assertEqual(self.q.usbvm_updates_list[0]["Version"], "2.0.6") - self.assertListEqual( - self.q.usbvm_updates_list[0]["Releases"], - [ - { - "Checksum": "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda", - "Description": "

This release fixes prevents the firmware returning an " - "error when the remote SHA1 hash was never sent.

", - "Url": "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab", - "Version": "2.0.7", - } - ], - ) - - def test_parse_usbvm_updates_no_updates_available(self): - self.q._parse_usbvm_updates(GET_DEVICES_NO_UPDATES) - self.assertListEqual(self.q.usbvm_updates_list, []) - - def test_updates_crawler(self): - crawler_output = io.StringIO() - sys.stdout = crawler_output - self.q._parse_usbvm_updates(GET_DEVICES) - self.q._updates_crawler(self.q.usbvm_updates_list, usbvm=True) - with open("test/logs/get_updates.log", "r") as getupdates: - self.assertEqual( - getupdates.read(), crawler_output.getvalue().strip() + "\n" - ) - sys.stdout = self.captured_output - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_validate_usbvm_dirs(self): - self.q._validate_usbvm_dirs() - cmd_validate_metadata = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"[ -d {FWUPD_VM_METADATA_DIR} ]", - ] - p = subprocess.Popen(cmd_validate_metadata) - p.wait() - self.assertEqual(p.returncode, 0, msg="Creating metadata directory failed") - cmd_validate_udpdate = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"[ -d {FWUPD_VM_UPDATES_DIR} ]", - ] - p = subprocess.Popen(cmd_validate_udpdate) - p.wait() - self.assertEqual(p.returncode, 0, msg="Creating update directory failed") - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_copy_usbvm_metadata(self): - self.q.metadata_file = FWUPD_DOM0_METADATA_FILE - self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" - self.q._download_metadata() - self.q._validate_usbvm_dirs() - self.q._copy_usbvm_metadata() - cmd_validate_metadata_file = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"[ -f {FWUPD_VM_METADATA_FILE} ]", - ] - p = subprocess.Popen(cmd_validate_metadata_file) - p.wait() - self.assertEqual(p.returncode, 0, msg="Metadata file does not exist") - cmd_validate_metadata_jcat = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"[ -f {FWUPD_VM_METADATA_FILE_JCAT} ]", - ] - p = subprocess.Popen(cmd_validate_metadata_jcat) - p.wait() - self.assertEqual(p.returncode, 0, msg="Metadata jcat signature does not exist") - @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_enable_lvfs_testing_dom0(self): if os.path.exists(LVFS_TESTING_DOM0_FLAG): @@ -746,128 +404,6 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q._enable_lvfs_testing_dom0() self.assertTrue(os.path.exists(LVFS_TESTING_DOM0_FLAG)) - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_enable_lvfs_testing_usbvm(self): - cmd_validate_flag = [ - "qvm-run", - "--pass-io", - USBVM_N, - ( - "script --quiet --return --command " - f'"ls {LVFS_TESTING_USBVM_FLAG} &>/dev/null"' - ), - ] - cmd_rm_flag = [ - "qvm-run", - "--pass-io", - USBVM_N, - ("script --quiet --return --command " f'"rm {LVFS_TESTING_USBVM_FLAG}"'), - ] - flag = subprocess.Popen(cmd_validate_flag) - flag.wait() - if flag.returncode == 0: - rm_flag = subprocess.Popen(cmd_rm_flag) - rm_flag.wait() - if rm_flag.returncode != 0: - raise Exception("Removing lvfs-testing flag failed!!") - self.q._enable_lvfs_testing_usbvm(usbvm=True) - flag = subprocess.Popen(cmd_validate_flag) - flag.wait() - self.assertEqual(flag.returncode, 0) - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_validate_usbvm_metadata(self): - self.q.metadata_file = FWUPD_DOM0_METADATA_FILE - self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" - self.q._download_metadata() - self.q._validate_usbvm_dirs() - self.q._copy_usbvm_metadata() - self.q._validate_usbvm_metadata() - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_refresh_usbvm_metadata(self): - self.q.metadata_file = FWUPD_DOM0_METADATA_FILE - self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" - self.q.lvfs = "lvfs" - self.q._download_metadata() - self.q._validate_usbvm_dirs() - self.q._copy_usbvm_metadata() - self.q._validate_usbvm_metadata() - self.q._refresh_usbvm_metadata() - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_clean_usbvm(self): - self.q._validate_usbvm_dirs() - self.q._clean_usbvm() - cmd_validate_metadata = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"! [ -d {FWUPD_VM_METADATA_DIR} ]", - ] - p = subprocess.Popen(cmd_validate_metadata) - p.wait() - self.assertEqual(p.returncode, 0, msg="Cleaning metadata directory failed") - cmd_validate_udpdate = [ - "qvm-run", - "--pass-io", - "sys-usb", - f"! [ -d {FWUPD_VM_METADATA_DIR} ]", - ] - p = subprocess.Popen(cmd_validate_udpdate) - p.wait() - self.assertEqual(p.returncode, 0, msg="Cleaning update directory failed") - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_validate_usbvm_archive(self): - url = ( - "https://fwupd.org/downloads/e5ad222bdbd3d3d48d8613e67c7e0a0e1" - "94f8cd828e33c554d9f05d933e482c7-hughski-colorhug2-2.0.7.cab" - ) - sha = "e5ad222bdbd3d3d48d8613e67c7e0a0e194f8cd828e33c554d9f05d933e482c7" - name = url.replace("https://fwupd.org/downloads/", "") - self.q._clean_usbvm() - self.q._validate_usbvm_dirs() - self.q._download_firmware_updates(url, sha) - self.q._copy_firmware_updates(name) - self.q._validate_usbvm_archive(name, sha) - cmd_validate_udpdate = [ - "qvm-run", - "--pass-io", - "sys-usb", - "[ -f %s ]" % os.path.join(FWUPD_VM_UPDATES_DIR, name), - ] - p = subprocess.Popen(cmd_validate_udpdate) - p.wait() - self.assertEqual(p.returncode, 0, msg="Archive validation failed") - - @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") - def test_check_usbvm(self): - self.q.check_usbvm() - self.assertIn(XL_LIST_LOG, self.q.output) - - @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") - def test_bios_refresh_metadata(self): - sys_usb = self.q.check_usbvm() - Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True) - self.q.refresh_metadata_after_bios_update(usbvm=sys_usb) - self.assertEqual( - self.q.output, - "Successfully refreshed metadata manually\n", - msg="Metadata refresh failed.", - ) - - @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) - def test_trusted_cleanup(self): - trusted_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, "trusted.cab") - if not os.path.exists(trusted_path): - Path(FWUPD_DOM0_UPDATES_DIR).mkdir(exist_ok=True) - Path(trusted_path).touch(mode=0o644, exist_ok=True) - os.mkdir(trusted_path.replace(".cab", "")) - self.q.trusted_cleanup(usbvm=True) - self.assertFalse(os.path.exists(trusted_path)) - self.assertFalse(os.path.exists(trusted_path.replace(".cab", ""))) - if __name__ == "__main__": unittest.main()