diff --git a/contrib/ci/flatpak.py b/contrib/ci/flatpak.py index 0b8084bb0..f63a07d08 100755 --- a/contrib/ci/flatpak.py +++ b/contrib/ci/flatpak.py @@ -7,72 +7,72 @@ import shutil def prepare(target): # clone the flatpak json - cmd = ['git', 'submodule', 'update', '--remote', 'contrib/flatpak'] + cmd = ["git", "submodule", "update", "--remote", "contrib/flatpak"] subprocess.run(cmd, check=True) # clone the submodules for that - cmd = ['git', 'submodule', 'update', '--init', '--remote', 'shared-modules/'] - subprocess.run(cmd, cwd='contrib/flatpak', check=True) + cmd = ["git", "submodule", "update", "--init", "--remote", "shared-modules/"] + subprocess.run(cmd, cwd="contrib/flatpak", check=True) # parse json - if os.path.isdir('build'): - shutil.rmtree('build') + if os.path.isdir("build"): + shutil.rmtree("build") data = {} - with open('contrib/flatpak/org.freedesktop.fwupd.json', 'r') as rfd: + with open("contrib/flatpak/org.freedesktop.fwupd.json", "r") as rfd: data = json.load(rfd, strict=False) - platform = 'runtime/%s/x86_64/%s' % (data['runtime'], data['runtime-version']) - sdk = 'runtime/%s/x86_64/%s' % (data['sdk'], data['runtime-version']) - num_modules = len(data['modules']) + platform = "runtime/%s/x86_64/%s" % (data["runtime"], data["runtime-version"]) + sdk = "runtime/%s/x86_64/%s" % (data["sdk"], data["runtime-version"]) + num_modules = len(data["modules"]) # update to build from master data["branch"] = "master" for index in range(0, num_modules): - module = data['modules'][index] - if type(module) != dict or not 'name' in module: + module = data["modules"][index] + if type(module) != dict or not "name" in module: continue - name = module['name'] - if not 'fwupd' in name: + name = module["name"] + if not "fwupd" in name: continue - data['modules'][index]['sources'][0].pop('url') - data['modules'][index]['sources'][0].pop('sha256') - data['modules'][index]['sources'][0]['type'] = 'dir' - data['modules'][index]['sources'][0]['skip'] = [".git"] - data['modules'][index]['sources'][0]['path'] = ".." + data["modules"][index]["sources"][0].pop("url") + data["modules"][index]["sources"][0].pop("sha256") + data["modules"][index]["sources"][0]["type"] = "dir" + data["modules"][index]["sources"][0]["skip"] = [".git"] + data["modules"][index]["sources"][0]["path"] = ".." # write json - os.mkdir('build') - with open(target, 'w') as wfd: + os.mkdir("build") + with open(target, "w") as wfd: json.dump(data, wfd, indent=4) - os.symlink('../contrib/flatpak/shared-modules', 'build/shared-modules') + os.symlink("../contrib/flatpak/shared-modules", "build/shared-modules") # install the runtimes (parsed from json!) - repo = 'flathub' - repo_url = 'https://dl.flathub.org/repo/flathub.flatpakrepo' + repo = "flathub" + repo_url = "https://dl.flathub.org/repo/flathub.flatpakrepo" print("Installing dependencies") - cmd = ['flatpak', 'remote-add', '--if-not-exists', repo, repo_url] + cmd = ["flatpak", "remote-add", "--if-not-exists", repo, repo_url] subprocess.run(cmd, check=True) - cmd = ['flatpak', 'install', '--assumeyes', repo, sdk] + cmd = ["flatpak", "install", "--assumeyes", repo, sdk] subprocess.run(cmd, check=True) - cmd = ['flatpak', 'install', '--assumeyes', repo, platform] + cmd = ["flatpak", "install", "--assumeyes", repo, platform] subprocess.run(cmd, check=True) def build(target): cmd = [ - 'flatpak-builder', - '--repo=repo', - '--force-clean', - '--disable-rofiles-fuse', - 'build-dir', + "flatpak-builder", + "--repo=repo", + "--force-clean", + "--disable-rofiles-fuse", + "build-dir", target, ] subprocess.run(cmd, check=True) - cmd = ['flatpak', 'build-bundle', 'repo', 'fwupd.flatpak', 'org.freedesktop.fwupd'] + cmd = ["flatpak", "build-bundle", "repo", "fwupd.flatpak", "org.freedesktop.fwupd"] subprocess.run(cmd, check=True) -if __name__ == '__main__': - t = os.path.join('build', 'org.freedesktop.fwupd.json') +if __name__ == "__main__": + t = os.path.join("build", "org.freedesktop.fwupd.json") prepare(t) build(t) diff --git a/contrib/ci/generate_debian.py b/contrib/ci/generate_debian.py index 0e06380ff..4309dd6be 100755 --- a/contrib/ci/generate_debian.py +++ b/contrib/ci/generate_debian.py @@ -10,25 +10,25 @@ import xml.etree.ElementTree as etree def parse_control_dependencies(requested_type): - TARGET = os.getenv('OS') - QUBES = os.getenv('QUBES') + TARGET = os.getenv("OS") + QUBES = os.getenv("QUBES") deps = [] - dep = '' + dep = "" - if TARGET == '': + if TARGET == "": print("Missing OS environment variable") sys.exit(1) OS = TARGET - SUBOS = '' + SUBOS = "" if TARGET: - split = TARGET.split('-') + split = TARGET.split("-") if len(split) >= 2: OS = split[0] SUBOS = split[1] else: import lsb_release - OS = lsb_release.get_distro_information()['ID'].lower() + OS = lsb_release.get_distro_information()["ID"].lower() import platform SUBOS = platform.machine() @@ -49,33 +49,33 @@ def parse_control_dependencies(requested_type): packages = distro.findall("package") for package in packages: if SUBOS: - if not 'variant' in package.attrib: + if not "variant" in package.attrib: continue - if package.attrib['variant'] != SUBOS: + if package.attrib["variant"] != SUBOS: continue if package.text: dep = package.text else: dep = child.attrib["id"] if child.attrib["type"] == requested_type and dep: - version = control.find('version') + version = control.find("version") if version is not None: dep = "%s %s" % (dep, version.text) - inclusions = control.findall('inclusive') + inclusions = control.findall("inclusive") if inclusions: for i in range(0, len(inclusions)): - prefix = '' - suffix = ' ' + prefix = "" + suffix = " " if i == 0: prefix = " [" if i == len(inclusions) - 1: suffix = "]" dep = "%s%s%s%s" % (dep, prefix, inclusions[i].text, suffix) - exclusions = control.findall('exclusive') + exclusions = control.findall("exclusive") if exclusions: for i in range(0, len(exclusions)): - prefix = '!' - suffix = ' ' + prefix = "!" + suffix = " " if i == 0: prefix = " [!" if i == len(exclusions) - 1: @@ -86,26 +86,26 @@ def parse_control_dependencies(requested_type): def update_debian_control(target): - control_in = os.path.join(target, 'control.in') - control_out = os.path.join(target, 'control') + control_in = os.path.join(target, "control.in") + control_out = os.path.join(target, "control") if not os.path.exists(control_in): print("Missing file %s" % control_in) sys.exit(1) - with open(control_in, 'r') as rfd: + with open(control_in, "r") as rfd: lines = rfd.readlines() deps, QUBES = parse_control_dependencies("build") deps.sort() if QUBES: - lines += '\n' - control_qubes_in = os.path.join(target, 'control.qubes.in') - with open(control_qubes_in, 'r') as rfd: + lines += "\n" + control_qubes_in = os.path.join(target, "control.qubes.in") + with open(control_qubes_in, "r") as rfd: lines += rfd.readlines() - with open(control_out, 'w') as wfd: + with open(control_out, "w") as wfd: for line in lines: if line.startswith("Build-Depends: %%%DYNAMIC%%%"): wfd.write("Build-Depends:\n") @@ -118,8 +118,8 @@ def update_debian_control(target): def update_debian_copyright(directory): - copyright_in = os.path.join(directory, 'copyright.in') - copyright_out = os.path.join(directory, 'copyright') + copyright_in = os.path.join(directory, "copyright.in") + copyright_out = os.path.join(directory, "copyright") if not os.path.exists(copyright_in): print("Missing file %s" % copyright_in) @@ -127,14 +127,14 @@ def update_debian_copyright(directory): # Assume all files are remaining LGPL-2.1+ copyrights = [] - for root, dirs, files in os.walk('.'): + for root, dirs, files in os.walk("."): for file in files: target = os.path.join(root, file) # skip translations and license file - if target.startswith('./po/') or file == "COPYING": + if target.startswith("./po/") or file == "COPYING": continue try: - with open(target, 'r') as rfd: + with open(target, "r") as rfd: # read about the first few lines of the file only lines = rfd.readlines(220) except UnicodeDecodeError: @@ -142,17 +142,17 @@ def update_debian_copyright(directory): except FileNotFoundError: continue for line in lines: - if 'Copyright (C) ' in line: - parts = line.split('Copyright (C)')[ + if "Copyright (C) " in line: + parts = line.split("Copyright (C)")[ 1 ].strip() # split out the copyright header - partition = parts.partition(' ')[2] # remove the year string + partition = parts.partition(" ")[2] # remove the year string copyrights += ["%s" % partition] copyrights = "\n\t ".join(sorted(set(copyrights))) - with open(copyright_in, 'r') as rfd: + with open(copyright_in, "r") as rfd: lines = rfd.readlines() - with open(copyright_out, 'w') as wfd: + with open(copyright_out, "w") as wfd: for line in lines: if line.startswith("%%%DYNAMIC%%%"): wfd.write("Files: *\n") @@ -163,6 +163,6 @@ def update_debian_copyright(directory): wfd.write(line) -directory = os.path.join(os.getcwd(), 'debian') +directory = os.path.join(os.getcwd(), "debian") update_debian_control(directory) update_debian_copyright(directory) diff --git a/contrib/ci/generate_dependencies.py b/contrib/ci/generate_dependencies.py index f7779ab3f..bc41cc98a 100755 --- a/contrib/ci/generate_dependencies.py +++ b/contrib/ci/generate_dependencies.py @@ -13,7 +13,7 @@ import xml.etree.ElementTree as etree def parse_dependencies(OS, SUBOS, requested_type): deps = [] - dep = '' + dep = "" directory = os.path.dirname(sys.argv[0]) tree = etree.parse(os.path.join(directory, "dependencies.xml")) root = tree.getroot() @@ -28,9 +28,9 @@ def parse_dependencies(OS, SUBOS, requested_type): packages = distro.findall("package") for package in packages: if SUBOS: - if 'variant' not in package.attrib: + if "variant" not in package.attrib: continue - if package.attrib['variant'] != SUBOS: + if package.attrib["variant"] != SUBOS: continue if package.text: dep = package.text @@ -41,7 +41,7 @@ def parse_dependencies(OS, SUBOS, requested_type): return deps -if __name__ == '__main__': +if __name__ == "__main__": try: import distro @@ -60,15 +60,15 @@ if __name__ == '__main__': ) args = parser.parse_args() - target = os.getenv('OS', args.os) + target = os.getenv("OS", args.os) if target is None: print("Missing OS environment variable") sys.exit(1) _os = target.lower() - _sub_os = '' - split = target.split('-') + _sub_os = "" + split = target.split("-") if len(split) >= 2: _os, _sub_os = split[:2] dependencies = parse_dependencies(_os, _sub_os, "build") - print(*dependencies, sep='\n') + print(*dependencies, sep="\n") diff --git a/contrib/ci/generate_docker.py b/contrib/ci/generate_docker.py index 58b293a8e..9f176374a 100755 --- a/contrib/ci/generate_docker.py +++ b/contrib/ci/generate_docker.py @@ -12,23 +12,23 @@ from generate_dependencies import parse_dependencies def get_container_cmd(): - '''return docker or podman as container manager''' + """return docker or podman as container manager""" - if shutil.which('docker'): - return 'docker' - if shutil.which('podman'): - return 'podman' + if shutil.which("docker"): + return "docker" + if shutil.which("podman"): + return "podman" directory = os.path.dirname(sys.argv[0]) -TARGET = os.getenv('OS') +TARGET = os.getenv("OS") if TARGET is None: print("Missing OS environment variable") sys.exit(1) OS = TARGET -SUBOS = '' -split = TARGET.split('-') +SUBOS = "" +split = TARGET.split("-") if len(split) >= 2: OS = split[0] SUBOS = split[1] @@ -40,19 +40,19 @@ if not os.path.exists(input): print("Missing input file %s for %s" % (input, OS)) sys.exit(1) -with open(input, 'r') as rfd: +with open(input, "r") as rfd: lines = rfd.readlines() -with open('Dockerfile', 'w') as wfd: +with open("Dockerfile", "w") as wfd: for line in lines: if line.startswith("FROM %%%ARCH_PREFIX%%%"): if (OS == "debian" or OS == "ubuntu") and SUBOS == "i386": replace = SUBOS + "/" else: - replace = '' + replace = "" wfd.write(line.replace("%%%ARCH_PREFIX%%%", replace)) elif line == "%%%INSTALL_DEPENDENCIES_COMMAND%%%\n": - if OS == "fedora" or OS == 'flatpak': + if OS == "fedora" or OS == "flatpak": wfd.write("RUN dnf --enablerepo=updates-testing -y install \\\n") elif OS == "centos": wfd.write("RUN yum -y install \\\n") @@ -75,19 +75,19 @@ with open('Dockerfile', 'w') as wfd: 'RUN cat /etc/apt/sources.list | sed "s/deb/deb-src/" >> /etc/apt/sources.list\n' ) # add new architecture - wfd.write('RUN dpkg --add-architecture %s\n' % SUBOS) + wfd.write("RUN dpkg --add-architecture %s\n" % SUBOS) elif line == "%%%OS%%%\n": wfd.write("ENV OS %s\n" % TARGET) else: wfd.write(line) wfd.flush() -if len(sys.argv) == 2 and sys.argv[1] == 'build': +if len(sys.argv) == 2 and sys.argv[1] == "build": cmd = get_container_cmd() args = [cmd, "build", "-t", "fwupd-%s" % TARGET] - if 'http_proxy' in os.environ: - args += ['--build-arg=http_proxy=%s' % os.environ['http_proxy']] - if 'https_proxy' in os.environ: - args += ['--build-arg=https_proxy=%s' % os.environ['https_proxy']] + if "http_proxy" in os.environ: + args += ["--build-arg=http_proxy=%s" % os.environ["http_proxy"]] + if "https_proxy" in os.environ: + args += ["--build-arg=https_proxy=%s" % os.environ["https_proxy"]] args += ["-f", "./Dockerfile", "."] subprocess.check_call(args) diff --git a/contrib/ci/oss-fuzz.py b/contrib/ci/oss-fuzz.py index 863b1eb26..8327119ec 100755 --- a/contrib/ci/oss-fuzz.py +++ b/contrib/ci/oss-fuzz.py @@ -119,14 +119,7 @@ class Builder: if not os.path.exists(fullsrc): fullsrc = os.path.join(self.builddir, src) dst = os.path.basename(src).replace(".c", ".o") - argv.extend( - [ - "-c", - fullsrc, - "-o", - os.path.join(self.builddir, dst), - ] - ) + argv.extend(["-c", fullsrc, "-o", os.path.join(self.builddir, dst)]) print("building {} into {}".format(src, dst)) try: subprocess.run(argv, cwd=self.srcdir, check=True) @@ -169,11 +162,9 @@ class Builder: def makezip(self, dst: str, globstr: str) -> None: """ create a zip file archive from a glob """ - argv = [ - "zip", - "--junk-paths", - os.path.join(self.installdir, dst), - ] + glob.glob(os.path.join(self.srcdir, globstr)) + argv = ["zip", "--junk-paths", os.path.join(self.installdir, dst)] + glob.glob( + os.path.join(self.srcdir, globstr) + ) print("assembling {}".format(dst)) subprocess.run(argv, cwd=self.srcdir, check=True) @@ -240,16 +231,10 @@ def _build(bld: Builder) -> None: # JSON-GLib src = bld.checkout_source( - "json-glib", - url="https://gitlab.gnome.org/GNOME/json-glib.git", + "json-glib", url="https://gitlab.gnome.org/GNOME/json-glib.git" ) bld.build_meson_project( - src, - [ - "-Dgtk_doc=disabled", - "-Dtests=false", - "-Dintrospection=disabled", - ], + src, ["-Dgtk_doc=disabled", "-Dtests=false", "-Dintrospection=disabled"] ) bld.add_work_includedir("include/json-glib-1.0/json-glib") bld.add_work_includedir("include/json-glib-1.0") @@ -258,12 +243,7 @@ def _build(bld: Builder) -> None: # libxmlb src = bld.checkout_source("libxmlb", url="https://github.com/hughsie/libxmlb.git") bld.build_meson_project( - src, - [ - "-Dgtkdoc=false", - "-Dintrospection=false", - "-Dtests=false", - ], + src, ["-Dgtkdoc=false", "-Dintrospection=false", "-Dtests=false"] ) bld.add_work_includedir("include/libxmlb-2") bld.add_work_includedir("include/libxmlb-2/libxmlb") diff --git a/contrib/firmware_packager/add_capsule_header.py b/contrib/firmware_packager/add_capsule_header.py index e12f4be5e..92e26c9dc 100755 --- a/contrib/firmware_packager/add_capsule_header.py +++ b/contrib/firmware_packager/add_capsule_header.py @@ -24,26 +24,26 @@ def add_header(infile, outfile, gd, fl=None): import struct try: - with open(infile, 'rb') as f: + with open(infile, "rb") as f: bin_data = f.read() except FileNotFoundError as e: print(e) return 1 # check if already has header - hdrsz = struct.calcsize('<16sIII') + hdrsz = struct.calcsize("<16sIII") if len(bin_data) >= hdrsz: - hdr = struct.unpack('<16sIII', bin_data[:hdrsz]) + hdr = struct.unpack("<16sIII", bin_data[:hdrsz]) imgsz = hdr[3] if imgsz == len(bin_data): - print('Replacing existing CAPSULE_HEADER of:') + print("Replacing existing CAPSULE_HEADER of:") guid_mixed = uuid.UUID(bytes_le=hdr[0]) hdrsz_old = hdr[1] flags = hdr[2] - print('GUID: %s' % guid_mixed) - print('HdrSz: 0x%04x' % hdrsz_old) - print('Flags: 0x%04x' % flags) - print('PayloadSz: 0x%04x' % imgsz) + print("GUID: %s" % guid_mixed) + print("HdrSz: 0x%04x" % hdrsz_old) + print("Flags: 0x%04x" % flags) + print("PayloadSz: 0x%04x" % imgsz) bin_data = bin_data[hdrsz_old:] # set header flags @@ -59,24 +59,24 @@ def add_header(infile, outfile, gd, fl=None): hdrsz = 4096 imgsz = hdrsz + len(bin_data) hdr = ctypes.create_string_buffer(hdrsz) - struct.pack_into('<16sIII', hdr, 0, guid.bytes_le, hdrsz, flags, imgsz) - with open(outfile, 'wb') as f: + struct.pack_into("<16sIII", hdr, 0, guid.bytes_le, hdrsz, flags, imgsz) + with open(outfile, "wb") as f: f.write(hdr) f.write(bin_data) - print('Wrote capsule %s' % outfile) - print('GUID: %s' % guid) - print('HdrSz: 0x%04x' % hdrsz) - print('Flags: 0x%04x' % flags) - print('PayloadSz: 0x%04x' % imgsz) + print("Wrote capsule %s" % outfile) + print("GUID: %s" % guid) + print("HdrSz: 0x%04x" % hdrsz) + print("Flags: 0x%04x" % flags) + print("PayloadSz: 0x%04x" % imgsz) return 0 -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Add capsule header on firmware') - parser.add_argument('--guid', help='GUID of the device', required=True) - parser.add_argument('--bin', help='Path to the .bin file', required=True) - parser.add_argument('--cap', help='Output capsule file path', required=True) - parser.add_argument('--flags', help='Flags, e.g. 0x40000', default=None) +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Add capsule header on firmware") + parser.add_argument("--guid", help="GUID of the device", required=True) + parser.add_argument("--bin", help="Path to the .bin file", required=True) + parser.add_argument("--cap", help="Output capsule file path", required=True) + parser.add_argument("--flags", help="Flags, e.g. 0x40000", default=None) args = parser.parse_args() sys.exit(add_header(args.bin, args.cap, args.guid, args.flags)) diff --git a/contrib/firmware_packager/add_dfu_header.py b/contrib/firmware_packager/add_dfu_header.py index a7bbaf251..749c99e75 100755 --- a/contrib/firmware_packager/add_dfu_header.py +++ b/contrib/firmware_packager/add_dfu_header.py @@ -13,43 +13,43 @@ import argparse def main(bin_fn, dfu_fn, pad, vid, pid, rev): # read binary file - with open(bin_fn, 'rb') as f: + with open(bin_fn, "rb") as f: blob = f.read() # pad blob to a specific size if pad: while len(blob) < int(pad, 16): - blob += b'\0' + blob += b"\0" # create DFU footer with checksum blob += struct.pack( - '\.\"\-]{0,128}.xml.gz.?[aj]?[sc]?[ca]?t?$" ) HEADS_UPDATES_DIR = "/boot/updates" -WARNING_COLOR = '\033[93m' +WARNING_COLOR = "\033[93m" class FwupdReceiveUpdates: @@ -58,7 +46,7 @@ class FwupdReceiveUpdates: file_path -- absolute path to the file sha -- SHA256 checksum of the file """ - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: c_sha = hashlib.sha256(f.read()).hexdigest() if c_sha != sha: self.clean_cache() @@ -71,13 +59,11 @@ class FwupdReceiveUpdates: Keyword argument: updatevm - domain to be checked """ - cmd = ['qubes-prefs', '--force-root', 'updatevm'] + cmd = ["qubes-prefs", "--force-root", "updatevm"] p = subprocess.check_output(cmd) - source = p.decode('ascii').rstrip() + source = p.decode("ascii").rstrip() if source != updatevm and "sys-whonix" != updatevm: - raise Exception( - f'Domain {updatevm} not allowed to send dom0 updates' - ) + raise Exception(f"Domain {updatevm} not allowed to send dom0 updates") def _verify_received(self, files_path, regex_pattern, updatevm): """Checks if sent files match regex filename pattern. @@ -90,14 +76,14 @@ class FwupdReceiveUpdates: """ for untrusted_f in os.listdir(files_path): if not regex_pattern.match(untrusted_f): - raise Exception(f'Domain {updatevm} sent unexpected file') + raise Exception(f"Domain {updatevm} sent unexpected file") f = untrusted_f - assert '/' not in f - assert '\0' not in f - assert '\x1b' not in f + assert "/" not in f + assert "\0" not in f + assert "\x1b" not in f path_f = os.path.join(files_path, f) if os.path.islink(path_f) or not os.path.isfile(path_f): - raise Exception(f'Domain {updatevm} sent not regular file') + raise Exception(f"Domain {updatevm} sent not regular file") def _create_dirs(self, *args): """Method creates directories. @@ -105,7 +91,7 @@ class FwupdReceiveUpdates: Keyword arguments: *args -- paths to be created """ - qubes_gid = grp.getgrnam('qubes').gr_gid + qubes_gid = grp.getgrnam("qubes").gr_gid self.old_umask = os.umask(0o002) if args is None: raise Exception("Creating directories failed, no paths given.") @@ -128,19 +114,12 @@ class FwupdReceiveUpdates: archive_path -- absolute path to archive file output_path -- absolute path to the output directory """ - cmd_extract = [ - "gcab", - "-x", - f"--directory={output_path}", - f"{archive_path}" - ] + cmd_extract = ["gcab", "-x", f"--directory={output_path}", f"{archive_path}"] shutil.copy(archive_path, FWUPD_DOM0_UPDATES_DIR) p = subprocess.Popen(cmd_extract, stdout=subprocess.PIPE) - p.communicate()[0].decode('ascii') + p.communicate()[0].decode("ascii") if p.returncode != 0: - raise Exception( - f'gcab: Error while extracting {archive_path}.' - ) + raise Exception(f"gcab: Error while extracting {archive_path}.") def _jcat_verification(self, file_path, file_directory): """Verifies sha1 and sha256 checksum, GPG signature, @@ -150,25 +129,16 @@ class FwupdReceiveUpdates: file_path -- absolute path to jcat file file_directory -- absolute path to the directory to jcat file location """ - cmd_jcat = [ - "jcat-tool", - "verify", - f"{file_path}", - "--public-keys", - FWUPD_PKI - ] + cmd_jcat = ["jcat-tool", "verify", f"{file_path}", "--public-keys", FWUPD_PKI] p = subprocess.Popen( - cmd_jcat, - cwd=file_directory, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE + cmd_jcat, cwd=file_directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, __ = p.communicate() - verification = stdout.decode('utf-8') + verification = stdout.decode("utf-8") print(verification) if p.returncode != 0: self.clean_cache() - raise Exception('jcat-tool: Verification failed') + raise Exception("jcat-tool: Verification failed") def handle_fw_update(self, updatevm, sha, filename): """Copies firmware update archives from the updateVM. @@ -179,45 +149,31 @@ class FwupdReceiveUpdates: filename -- name of the firmware update archive """ fwupd_firmware_file_regex = re.compile(filename) - dom0_firmware_untrusted_path = os.path.join( - FWUPD_DOM0_UNTRUSTED_DIR, - filename - ) - updatevm_firmware_file_path = os.path.join( - FWUPD_VM_UPDATES_DIR, - filename - ) + dom0_firmware_untrusted_path = os.path.join(FWUPD_DOM0_UNTRUSTED_DIR, filename) + updatevm_firmware_file_path = os.path.join(FWUPD_VM_UPDATES_DIR, filename) self._check_domain(updatevm) if os.path.exists(FWUPD_DOM0_UNTRUSTED_DIR): shutil.rmtree(FWUPD_DOM0_UNTRUSTED_DIR) self._create_dirs(FWUPD_DOM0_UPDATES_DIR, FWUPD_DOM0_UNTRUSTED_DIR) - cmd_copy = 'qvm-run --pass-io %s %s > %s' % ( + cmd_copy = "qvm-run --pass-io %s %s > %s" % ( updatevm, "'cat %s'" % updatevm_firmware_file_path, - dom0_firmware_untrusted_path + dom0_firmware_untrusted_path, ) p = subprocess.Popen(cmd_copy, shell=True) p.wait() if p.returncode != 0: - raise Exception('qvm-run: Copying firmware file failed!!') + raise Exception("qvm-run: Copying firmware file failed!!") self._verify_received( - FWUPD_DOM0_UNTRUSTED_DIR, - fwupd_firmware_file_regex, - updatevm + FWUPD_DOM0_UNTRUSTED_DIR, fwupd_firmware_file_regex, updatevm ) self._check_shasum(dom0_firmware_untrusted_path, sha) untrusted_dir_name = filename.replace(".cab", "") - self._extract_archive( - dom0_firmware_untrusted_path, - FWUPD_DOM0_UNTRUSTED_DIR - ) - signature_name = os.path.join( - FWUPD_DOM0_UNTRUSTED_DIR, - "firmware*.jcat" - ) + self._extract_archive(dom0_firmware_untrusted_path, FWUPD_DOM0_UNTRUSTED_DIR) + signature_name = os.path.join(FWUPD_DOM0_UNTRUSTED_DIR, "firmware*.jcat") file_path = glob.glob(signature_name) if not file_path: raise FileNotFoundError("jcat file not found!") @@ -227,10 +183,7 @@ class FwupdReceiveUpdates: untrusted_dir_name = "trusted" verified_file = os.path.join(FWUPD_DOM0_UPDATES_DIR, filename) self.arch_name = "trusted.cab" - self.arch_path = os.path.join( - FWUPD_DOM0_UPDATES_DIR, - self.arch_name - ) + self.arch_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, self.arch_name) shutil.move(verified_file, self.arch_path) else: self.arch_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, filename) @@ -245,59 +198,46 @@ class FwupdReceiveUpdates: updatevm -- update VM name """ if metadata_url: - metadata_name = metadata_url.replace( - FWUPD_DOWNLOAD_PREFIX, - "" - ) - self.metadata_file = os.path.join( - FWUPD_DOM0_METADATA_DIR, - metadata_name - ) - self.metadata_file_jcat = self.metadata_file + '.jcat' + metadata_name = metadata_url.replace(FWUPD_DOWNLOAD_PREFIX, "") + self.metadata_file = os.path.join(FWUPD_DOM0_METADATA_DIR, metadata_name) + self.metadata_file_jcat = self.metadata_file + ".jcat" else: self.metadata_file = FWUPD_DOM0_METADATA_FILE self.metadata_file_jcat = FWUPD_DOM0_METADATA_JCAT self.metadata_file_updatevm = self.metadata_file.replace( - FWUPD_DOM0_METADATA_DIR, - FWUPD_VM_METADATA_DIR + FWUPD_DOM0_METADATA_DIR, FWUPD_VM_METADATA_DIR ) self.metadata_file_jcat_updatevm = self.metadata_file_jcat.replace( - FWUPD_DOM0_METADATA_DIR, - FWUPD_VM_METADATA_DIR + FWUPD_DOM0_METADATA_DIR, FWUPD_VM_METADATA_DIR ) self._check_domain(updatevm) self._create_dirs(FWUPD_DOM0_METADATA_DIR) cmd_file = "'cat %s'" % self.metadata_file_updatevm cmd_jcat = "'cat %s'" % self.metadata_file_jcat_updatevm - cmd_copy_metadata_file = 'qvm-run --pass-io %s %s > %s' % ( + cmd_copy_metadata_file = "qvm-run --pass-io %s %s > %s" % ( updatevm, cmd_file, - self.metadata_file + self.metadata_file, ) - cmd_copy_metadata_jcat = 'qvm-run --pass-io %s %s > %s' % ( + cmd_copy_metadata_jcat = "qvm-run --pass-io %s %s > %s" % ( updatevm, cmd_jcat, - self.metadata_file_jcat + self.metadata_file_jcat, ) p = subprocess.Popen(cmd_copy_metadata_file, shell=True) p.wait() if p.returncode != 0: - raise Exception('qvm-run: Copying metadata file failed!!') + raise Exception("qvm-run: Copying metadata file failed!!") p = subprocess.Popen(cmd_copy_metadata_jcat, shell=True) p.wait() if p.returncode != 0: raise Exception('qvm-run": Copying metadata jcat failed!!') self._verify_received( - FWUPD_DOM0_METADATA_DIR, - FWUPD_METADATA_FILES_REGEX, - updatevm - ) - self._jcat_verification( - self.metadata_file_jcat, - FWUPD_DOM0_METADATA_DIR + FWUPD_DOM0_METADATA_DIR, FWUPD_METADATA_FILES_REGEX, updatevm ) + self._jcat_verification(self.metadata_file_jcat, FWUPD_DOM0_METADATA_DIR) os.umask(self.old_umask) def clean_cache(self, usbvm=False): diff --git a/contrib/qubes/src/qubes_fwupd_heads.py b/contrib/qubes/src/qubes_fwupd_heads.py index 0acd374ed..7c1837370 100644 --- a/contrib/qubes/src/qubes_fwupd_heads.py +++ b/contrib/qubes/src/qubes_fwupd_heads.py @@ -18,20 +18,13 @@ FWUPDTOOL = "/bin/fwupdtool" BOOT = "/boot" HEADS_UPDATES_DIR = os.path.join(BOOT, "updates") -EXIT_CODES = { - "ERROR": 1, - "SUCCESS": 0, - "NOTHING_TO_DO": 2, -} +EXIT_CODES = {"ERROR": 1, "SUCCESS": 0, "NOTHING_TO_DO": 2} class FwupdHeads: def _get_hwids(self): cmd_hwids = [FWUPDTOOL, "hwids"] - p = subprocess.Popen( - cmd_hwids, - stdout=subprocess.PIPE - ) + p = subprocess.Popen(cmd_hwids, stdout=subprocess.PIPE) self.dom0_hwids_info = p.communicate()[0].decode() if p.returncode != 0: raise Exception("fwudp-qubes: Getting hwids info failed") @@ -46,12 +39,8 @@ class FwupdHeads: for line in hwids: if line.startswith("BiosVersion: CBET4000 "): self.heads_version = line.replace( - "BiosVersion: CBET4000 ", - "" - ).replace( - " heads", - "" - ) + "BiosVersion: CBET4000 ", "" + ).replace(" heads", "") else: print("Device is not running under the heads firmware!!") print("Exiting...") @@ -62,10 +51,7 @@ class FwupdHeads: Parse metadata info. """ cmd_metadata = ["zcat", metadata_file] - p = subprocess.Popen( - cmd_metadata, - stdout=subprocess.PIPE - ) + p = subprocess.Popen(cmd_metadata, stdout=subprocess.PIPE) self.metadata_info = p.communicate()[0].decode() if p.returncode != 0: raise Exception("fwudp-qubes: Parsing metadata failed") @@ -90,14 +76,18 @@ class FwupdHeads: return EXIT_CODES["NOTHING_TO_DO"] for release in heads_metadata_info.find("releases").findall("release"): release_ver = release.get("version") - if (self.heads_version == "heads" or - l_ver(release_ver) > l_ver(self.heads_version)): - if (not self.heads_update_version or - l_ver(release_ver) > l_ver(self.heads_update_version)): + if self.heads_version == "heads" or l_ver(release_ver) > l_ver( + self.heads_version + ): + if not self.heads_update_version or l_ver(release_ver) > l_ver( + self.heads_update_version + ): self.heads_update_url = release.find("location").text for sha in release.findall("checksum"): - if (".cab" in sha.attrib["filename"] - and sha.attrib["type"] == "sha256"): + if ( + ".cab" in sha.attrib["filename"] + and sha.attrib["type"] == "sha256" + ): self.heads_update_sha = sha.text self.heads_update_version = release_ver if self.heads_update_url: @@ -110,23 +100,14 @@ class FwupdHeads: """ Copies heads update to the boot path """ - heads_boot_path = os.path.join( - HEADS_UPDATES_DIR, - self.heads_update_version - ) + heads_boot_path = os.path.join(HEADS_UPDATES_DIR, self.heads_update_version) update_path = arch_path.replace(".cab", "/firmware.rom") - heads_update_path = os.path.join( - heads_boot_path, - "firmware.rom" - ) + heads_update_path = os.path.join(heads_boot_path, "firmware.rom") if not os.path.exists(HEADS_UPDATES_DIR): os.mkdir(HEADS_UPDATES_DIR) if os.path.exists(heads_update_path): - print( - f"Heads Update == {self.heads_update_version} " - "already downloaded." - ) + print(f"Heads Update == {self.heads_update_version} " "already downloaded.") return EXIT_CODES["NOTHING_TO_DO"] else: os.mkdir(heads_boot_path) diff --git a/contrib/qubes/src/qubes_fwupd_update.py b/contrib/qubes/src/qubes_fwupd_update.py index c2872dd41..a62fd07d3 100644 --- a/contrib/qubes/src/qubes_fwupd_update.py +++ b/contrib/qubes/src/qubes_fwupd_update.py @@ -17,10 +17,10 @@ FWUPD_VM_DOWNLOAD = "/usr/libexec/qubes-fwupd/fwupd_download_updates.py" FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates") FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/" -SPECIAL_CHAR_REGEX = re.compile(r'%20|&|\||#') -UPDATEVM_REGEX = re.compile(r'^sys-') +SPECIAL_CHAR_REGEX = re.compile(r"%20|&|\||#") +UPDATEVM_REGEX = re.compile(r"^sys-") -WARNING_COLOR = '\033[93m' +WARNING_COLOR = "\033[93m" class FwupdUpdate: @@ -30,7 +30,7 @@ class FwupdUpdate: Keyword arguments: *args -- paths to be created """ - qubes_gid = grp.getgrnam('qubes').gr_gid + qubes_gid = grp.getgrnam("qubes").gr_gid self.old_umask = os.umask(0o002) if args is None: raise Exception("Creating directories failed, no paths given.") @@ -46,15 +46,8 @@ class FwupdUpdate: ) def _specify_updatevm(self): - cmd_updatevm = [ - "qubes-prefs", - "--force-root", - "updatevm", - ] - p = subprocess.Popen( - cmd_updatevm, - stdout=subprocess.PIPE - ) + cmd_updatevm = ["qubes-prefs", "--force-root", "updatevm"] + p = subprocess.Popen(cmd_updatevm, stdout=subprocess.PIPE) self.updatevm = p.communicate()[0].decode().split("\n")[0] if p.returncode != 0 and not UPDATEVM_REGEX.match(self.updatevm): self.updatevm = None @@ -62,14 +55,8 @@ class FwupdUpdate: def _check_updatevm(self): """Checks if usbvm is running""" - cmd_xl_list = [ - "xl", - "list" - ] - p = subprocess.Popen( - cmd_xl_list, - stdout=subprocess.PIPE - ) + cmd_xl_list = ["xl", "list"] + p = subprocess.Popen(cmd_xl_list, stdout=subprocess.PIPE) output = p.communicate()[0].decode() if p.returncode != 0: raise Exception("fwudp-qubes: Firmware downgrade failed") @@ -112,10 +99,10 @@ class FwupdUpdate: "--pass-io", self.updatevm, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"{FWUPD_VM_DOWNLOAD} --metadata' f' --url={metadata_url}"' - ) + ), ] else: cmd_metadata = [ @@ -123,9 +110,9 @@ class FwupdUpdate: "--pass-io", self.updatevm, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"{FWUPD_VM_DOWNLOAD} --metadata"' - ) + ), ] p = subprocess.Popen(cmd_metadata) p.wait() @@ -155,10 +142,10 @@ class FwupdUpdate: "--pass-io", self.updatevm, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"{FWUPD_VM_DOWNLOAD} --url={self.enc_url}' f' --sha={sha}"' - ) + ), ] p = subprocess.Popen(cmd_firmware_download) p.wait() diff --git a/contrib/qubes/src/qubes_fwupdmgr.py b/contrib/qubes/src/qubes_fwupdmgr.py index 9b83c0fbc..a9a8ce21d 100755 --- a/contrib/qubes/src/qubes_fwupdmgr.py +++ b/contrib/qubes/src/qubes_fwupdmgr.py @@ -32,42 +32,25 @@ try: from fwupd_receive_updates import FwupdReceiveUpdates except ModuleNotFoundError: raise ModuleNotFoundError( - "qubes-fwupd modules not found. " - "You may need to reinstall package." + "qubes-fwupd modules not found. " "You may need to reinstall package." ) FWUPD_DOM0_DIR = "/root/.cache/fwupd" FWUPD_DOM0_METADATA_DIR = os.path.join(FWUPD_DOM0_DIR, "metadata") FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates") FWUPD_DOM0_METADATA_SIGNATURE = os.path.join( - FWUPD_DOM0_METADATA_DIR, - "firmware.xml.gz.asc" -) -FWUPD_DOM0_METADATA_FILE = os.path.join( - FWUPD_DOM0_METADATA_DIR, - "firmware.xml.gz" -) -FWUPD_DOM0_METADATA_JCAT = os.path.join( - FWUPD_DOM0_METADATA_DIR, - "firmware.xml.gz.jcat" + FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz.asc" ) +FWUPD_DOM0_METADATA_FILE = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz") +FWUPD_DOM0_METADATA_JCAT = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz.jcat") FWUPD_VM_LOG = os.path.join(FWUPD_DOM0_DIR, "usbvm-devices.log") FWUPD_VM_VALIDATE = "/usr/libexec/qubes-fwupd/fwupd_usbvm_validate.py" FWUPD_VM_DIR = "/home/user/.cache/fwupd" FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -FWUPD_VM_METADATA_SIGNATURE = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz.asc" -) -FWUPD_VM_METADATA_FILE = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz" -) -FWUPD_VM_METADATA_JCAT = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz.jcat" -) +FWUPD_VM_METADATA_SIGNATURE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.asc") +FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") +FWUPD_VM_METADATA_JCAT = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat") FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/" FWUPDMGR = "/bin/fwupdmgr" @@ -78,11 +61,9 @@ USBVM_N = "sys-usb" BIOS_UPDATE_FLAG = os.path.join(FWUPD_DOM0_DIR, "bios_update") LVFS_TESTING_DOM0_FLAG = os.path.join(FWUPD_DOM0_DIR, "lvfs_testing") LVFS_TESTING_USBVM_FLAG = os.path.join(FWUPD_VM_DIR, "lvfs_testing") -METADATA_REFRESH_REGEX = re.compile( - r"^Successfully refreshed metadata manually$" -) +METADATA_REFRESH_REGEX = re.compile(r"^Successfully refreshed metadata manually$") -SPECIAL_CHAR_REGEX = re.compile(r'%20|&|\||#') +SPECIAL_CHAR_REGEX = re.compile(r"%20|&|\||#") HELP = { @@ -100,28 +81,20 @@ HELP = { "update": "Update chosen device to latest firmware version", "update-heads": "Updates heads firmware to the latest version", "downgrade": "Downgrade chosen device to chosen firmware version", - "clean": "Delete all cached update files\n" + "clean": "Delete all cached update files\n", } ], "Flags": [ { "--whonix": "Download firmware updates via Tor", "--device": "Specify device for heads update (default - x230)", - "--url": "Address of the custom metadata remote server\n" - } - ], - "Help": [ - { - "-h --help": "Show help options\n" + "--url": "Address of the custom metadata remote server\n", } ], + "Help": [{"-h --help": "Show help options\n"}], } -EXIT_CODES = { - "ERROR": 1, - "SUCCESS": 0, - "NOTHING_TO_DO": 2, -} +EXIT_CODES = {"ERROR": 1, "SUCCESS": 0, "NOTHING_TO_DO": 2} class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): @@ -143,7 +116,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "qvm-run", "--pass-io", USBVM_N, - f'script --quiet --return --command "{FWUPD_VM_VALIDATE} dirs"' + f'script --quiet --return --command "{FWUPD_VM_VALIDATE} dirs"', ] p = subprocess.Popen(cmd_validate_dirs) p.wait() @@ -158,7 +131,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "qvm-run", "--pass-io", USBVM_N, - f'script --quiet --return --command "{arch_validate}"' + f'script --quiet --return --command "{arch_validate}"', ] p = subprocess.Popen(cmd_validate_arch) p.wait() @@ -168,18 +141,17 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): def _copy_usbvm_metadata(self): """Copies metadata files to usbvm.""" self.metadata_file_usbvm = self.metadata_file.replace( - FWUPD_DOM0_METADATA_DIR, - FWUPD_VM_METADATA_DIR + FWUPD_DOM0_METADATA_DIR, FWUPD_VM_METADATA_DIR ) self.metadata_file_jcat_usbvm = self.metadata_file_usbvm + ".jcat" cat_file = f"cat > {self.metadata_file_usbvm}" cmd_copy_file = ( - f'cat {self.metadata_file} | ' + f"cat {self.metadata_file} | " f'qvm-run --nogui --pass-io {USBVM_N} "{cat_file}"' ) cat_jcat = f"cat > {self.metadata_file_jcat_usbvm}" cmd_copy_jcat = ( - f'cat {self.metadata_file_jcat} | ' + f"cat {self.metadata_file_jcat} | " f'qvm-run --nogui --pass-io {USBVM_N} "{cat_jcat}"' ) p = subprocess.Popen(cmd_copy_file, shell=True) @@ -195,15 +167,12 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): """Checks GPG signature of metadata files in usbvm.""" usbvm_cmd = f'"{FWUPD_VM_VALIDATE} metadata"' if metadata_url: - usbvm_cmd = ( - f'"{FWUPD_VM_VALIDATE} metadata --url={metadata_url}"' - ) + usbvm_cmd = f'"{FWUPD_VM_VALIDATE} metadata --url={metadata_url}"' cmd_validate_metadata = [ "qvm-run", "--pass-io", USBVM_N, - 'script --quiet --return --command' - f' {usbvm_cmd}' + "script --quiet --return --command" f" {usbvm_cmd}", ] p = subprocess.Popen(cmd_validate_metadata) p.wait() @@ -218,10 +187,10 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "--pass-io", USBVM_N, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"{FWUPDMGR} refresh {self.metadata_file_usbvm} ' f'{sig_metadata_file} {self.lvfs}"' - ) + ), ] p = subprocess.Popen(cmd_refresh_metadata) p.wait() @@ -238,8 +207,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): output_path = os.path.join(FWUPD_VM_UPDATES_DIR, arch_name) cat_file = f"cat > {output_path}" cmd_copy_file = ( - f'cat {arch_path} | ' - f'qvm-run --nogui --pass-io {USBVM_N} "{cat_file}"' + f"cat {arch_path} | " f'qvm-run --nogui --pass-io {USBVM_N} "{cat_file}"' ) p = subprocess.Popen(cmd_copy_file, shell=True) p.wait() @@ -257,8 +225,8 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "qvm-run", "--pass-io", USBVM_N, - f'script --quiet --return --command' - f' "{FWUPDMGR} install {arch_path}" /dev/null' + f"script --quiet --return --command" + f' "{FWUPDMGR} install {arch_path}" /dev/null', ] p = subprocess.Popen(CMD_update) p.wait() @@ -276,8 +244,8 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "qvm-run", "--pass-io", USBVM_N, - f'script --quiet --return --command' - f' "{FWUPDMGR} --allow-older install {arch_path}" /dev/null' + f"script --quiet --return --command" + f' "{FWUPDMGR} --allow-older install {arch_path}" /dev/null', ] p = subprocess.Popen(CMD_downgrade) p.wait() @@ -290,7 +258,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "qvm-run", "--pass-io", USBVM_N, - f'script --quiet --return --command "{FWUPD_VM_VALIDATE} clean"' + f'script --quiet --return --command "{FWUPD_VM_VALIDATE} clean"', ] p = subprocess.Popen(cmd_clean) p.wait() @@ -299,12 +267,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): def _enable_lvfs_testing_dom0(self): """Checks and enable lvfs-testing for custom metadata in dom0""" - cmd_lvfs_testing = [ - FWUPDMGR, - "enable-remote", - "-y", - "lvfs-testing" - ] + cmd_lvfs_testing = [FWUPDMGR, "enable-remote", "-y", "lvfs-testing"] if not os.path.exists(LVFS_TESTING_DOM0_FLAG): p = subprocess.Popen(cmd_lvfs_testing) p.wait() @@ -321,27 +284,24 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "--pass-io", USBVM_N, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"{FWUPDMGR} enable-remote -y lvfs-testing"' - ) + ), ] cmd_validate_flag = [ "qvm-run", "--pass-io", USBVM_N, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"ls {LVFS_TESTING_USBVM_FLAG} &>/dev/null"' - ) + ), ] cmd_touch_flag = [ "qvm-run", "--pass-io", USBVM_N, - ( - 'script --quiet --return --command ' - f'"touch {LVFS_TESTING_USBVM_FLAG}"' - ) + ("script --quiet --return --command " f'"touch {LVFS_TESTING_USBVM_FLAG}"'), ] flag = subprocess.Popen(cmd_validate_flag) flag.wait() @@ -365,11 +325,8 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): """ if metadata_url: metadata_name = metadata_url.replace(FWUPD_DOWNLOAD_PREFIX, "") - self.metadata_file = os.path.join( - FWUPD_DOM0_METADATA_DIR, - metadata_name - ) - self.metadata_file_jcat = self.metadata_file + '.jcat' + self.metadata_file = os.path.join(FWUPD_DOM0_METADATA_DIR, metadata_name) + self.metadata_file_jcat = self.metadata_file + ".jcat" self.lvfs = "lvfs-testing" self._enable_lvfs_testing_dom0() self._enable_lvfs_testing_usbvm(usbvm=usbvm) @@ -388,12 +345,9 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "refresh", self.metadata_file, self.metadata_file_jcat, - self.lvfs + self.lvfs, ] - p = subprocess.Popen( - cmd_refresh, - stdout=subprocess.PIPE - ) + p = subprocess.Popen(cmd_refresh, stdout=subprocess.PIPE) self.output = p.communicate()[0].decode() print(self.output) if p.returncode != 0: @@ -403,14 +357,8 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): def _get_dom0_updates(self): """Gathers infromations about available updates.""" - cmd_get_dom0_updates = [ - FWUPDAGENT, - "get-updates" - ] - p = subprocess.Popen( - cmd_get_dom0_updates, - stdout=subprocess.PIPE - ) + cmd_get_dom0_updates = [FWUPDAGENT, "get-updates"] + p = subprocess.Popen(cmd_get_dom0_updates, stdout=subprocess.PIPE) self.dom0_updates_info = p.communicate()[0].decode() if p.returncode != 0 and p.returncode != 2: raise Exception("fwudp-qubes: Getting available updates failed") @@ -424,17 +372,19 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): self.dom0_updates_info_dict = json.loads(updates_info) self.dom0_updates_list = [ { - "Name": device["Name"], - "Version": device["Version"], - "Releases": [ - { - "Version": update["Version"], - "Url": update["Uri"], - "Checksum": update["Checksum"][-1], - "Description": update["Description"] - } for update in device["Releases"] - ] - } for device in self.dom0_updates_info_dict["Devices"] + "Name": device["Name"], + "Version": device["Version"], + "Releases": [ + { + "Version": update["Version"], + "Url": update["Uri"], + "Checksum": update["Checksum"][-1], + "Description": update["Description"], + } + for update in device["Releases"] + ], + } + for device in self.dom0_updates_info_dict["Devices"] ] def _download_firmware_updates(self, url, sha, whonix=False): @@ -476,22 +426,20 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): self._updates_crawler(updates_dict["dom0"]) if usbvm: self._updates_crawler( - updates_dict["usbvm"], - usbvm=True, - prefix=dom0_updates_num + updates_dict["usbvm"], usbvm=True, prefix=dom0_updates_num ) while True: try: print("If you want to abandon process press 'N'.") choice = input("Otherwise choose a device number: ") - if choice == 'N' or choice == 'n': + if choice == "N" or choice == "n": return EXIT_CODES["NOTHING_TO_DO"] - device_num = int(choice)-1 + device_num = int(choice) - 1 if 0 <= device_num < len(updates_list): if not downgrade: if device_num >= dom0_updates_num: - return "usbvm", device_num-dom0_updates_num + return "usbvm", device_num - dom0_updates_num else: return "dom0", device_num break @@ -519,9 +467,9 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): print(f" Description:{description}") print("If you want to abandon downgrade process press N.") choice = input("Otherwise choose downgrade number: ") - if choice == 'N' or choice == 'n': + if choice == "N" or choice == "n": return EXIT_CODES["NOTHING_TO_DO"] - downgrade_num = int(choice)-1 + downgrade_num = int(choice) - 1 if 0 <= downgrade_num < len(releases): if device_num >= dom0_updates_num: device_abs_num = device_num - dom0_updates_num @@ -555,11 +503,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): Keywords arguments: arch_path - absolute path to firmware update archive """ - cmd_install = [ - FWUPDMGR, - "install", - arch_path - ] + cmd_install = [FWUPDMGR, "install", arch_path] p = subprocess.Popen(cmd_install) p.wait() if p.returncode != 0: @@ -567,19 +511,11 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): def _read_dmi(self): """Reads BIOS information from DMI.""" - cmd_dmidecode_version = [ - "dmidecode", - "-s", - "bios-version" - ] + cmd_dmidecode_version = ["dmidecode", "-s", "bios-version"] p = subprocess.Popen(cmd_dmidecode_version, stdout=subprocess.PIPE) p.wait() self.dmi_version = p.communicate()[0].decode() - cmd_dmidecode = [ - "dmidecode", - "-t", - "bios" - ] + cmd_dmidecode = ["dmidecode", "-t", "bios"] p = subprocess.Popen(cmd_dmidecode, stdout=subprocess.PIPE) p.wait() if p.returncode != 0: @@ -604,20 +540,12 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): if vendor not in dmi_info: raise ValueError("Wrong firmware provider.") if not downgrade and l_ver(version) <= l_ver(self.dmi_version): - raise ValueError( - f"{version} < {self.dmi_version} Downgrade not allowed" - ) + raise ValueError(f"{version} < {self.dmi_version} Downgrade not allowed") def _get_dom0_devices(self): """Gathers information about devices connected in dom0.""" - cmd_get_dom0_devices = [ - FWUPDAGENT, - "get-devices" - ] - p = subprocess.Popen( - cmd_get_dom0_devices, - stdout=subprocess.PIPE - ) + cmd_get_dom0_devices = [FWUPDAGENT, "get-devices"] + p = subprocess.Popen(cmd_get_dom0_devices, stdout=subprocess.PIPE) self.dom0_devices_info = p.communicate()[0].decode() if p.returncode != 0: raise Exception("fwudp-qubes: Getting devices info failed") @@ -629,15 +557,11 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): usbvm_cmd = f'"{FWUPDAGENT} get-devices"' log_file = f" > {FWUPD_VM_LOG}" cmd_get_usbvm_devices = ( - f'qvm-run --nogui --pass-io {USBVM_N} {usbvm_cmd}{log_file}' - ) - p = subprocess.Popen( - cmd_get_usbvm_devices, - shell=True + f"qvm-run --nogui --pass-io {USBVM_N} {usbvm_cmd}{log_file}" ) + p = subprocess.Popen(cmd_get_usbvm_devices, shell=True) p.wait() - if (p.returncode != 0 and p.returncode != 2 - and not os.path.exists(FWUPD_VM_LOG)): + if p.returncode != 0 and p.returncode != 2 and not os.path.exists(FWUPD_VM_LOG): raise Exception("fwudp-qubes: Getting usbvm devices info failed") if not os.path.exists(FWUPD_VM_LOG): raise Exception("usbvm device info log does not exist") @@ -658,7 +582,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): { "Name": device["Name"], "Version": device["Version"], - "Releases": [] + "Releases": [], } ) current_version = device["Version"] @@ -669,7 +593,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "Version": update["Version"], "Url": update["Uri"], "Checksum": update["Checksum"][-1], - "Description": update["Description"] + "Description": update["Description"], } ) if not self.usbvm_updates_list[-1]["Releases"]: @@ -691,13 +615,11 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): self._parse_usbvm_updates(raw) update_dict = { "usbvm": self.usbvm_updates_list, - "dom0": self.dom0_updates_list + "dom0": self.dom0_updates_list, } ret_input = self._user_input(update_dict, usbvm=True) else: - update_dict = { - "dom0": self.dom0_updates_list - } + update_dict = {"dom0": self.dom0_updates_list} ret_input = self._user_input(update_dict) if ret_input == EXIT_CODES["NOTHING_TO_DO"]: exit(EXIT_CODES["NOTHING_TO_DO"]) @@ -741,10 +663,11 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): "Version": downgrade["Version"], "Description": downgrade["Description"], "Url": downgrade["Uri"], - "Checksum": downgrade["Checksum"][-1] - } for downgrade in device["Releases"] + "Checksum": downgrade["Checksum"][-1], + } + for downgrade in device["Releases"] if l_ver(downgrade["Version"]) < l_ver(version) - ] + ], } ) return downgrades @@ -755,12 +678,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): Keywords arguments: arch_path - absolute path to firmware downgrade archive """ - cmd_install = [ - FWUPDMGR, - "--allow-older", - "install", - arch_path - ] + cmd_install = [FWUPDMGR, "--allow-older", "install", arch_path] p = subprocess.Popen(cmd_install) p.wait() if p.returncode != 0: @@ -780,19 +698,10 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): with open(FWUPD_VM_LOG) as usbvm_device_info: raw = usbvm_device_info.read() usbvm_downgrades = self._parse_downgrades(raw) - downgrade_dict = { - "usbvm": usbvm_downgrades, - "dom0": dom0_downgrades - } - ret_input = self._user_input( - downgrade_dict, - downgrade=True, - usbvm=True - ) + downgrade_dict = {"usbvm": usbvm_downgrades, "dom0": dom0_downgrades} + ret_input = self._user_input(downgrade_dict, downgrade=True, usbvm=True) else: - downgrade_dict = { - "dom0": dom0_downgrades - } + downgrade_dict = {"dom0": dom0_downgrades} ret_input = self._user_input(downgrade_dict, downgrade=True) if ret_input == EXIT_CODES["NOTHING_TO_DO"]: exit(EXIT_CODES["NOTHING_TO_DO"]) @@ -800,18 +709,14 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): releases = downgrade_dict[vm_name][device_choice]["Releases"] downgrade_url = releases[downgrade_choice]["Url"] downgrade_sha = releases[downgrade_choice]["Checksum"] - self._download_firmware_updates( - downgrade_url, - downgrade_sha, - whonix=whonix - ) + self._download_firmware_updates(downgrade_url, downgrade_sha, whonix=whonix) if downgrade_dict[vm_name][device_choice]["Name"] == "System Firmware": Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True) extracted_path = self.arch_path.replace(".cab", "") self._verify_dmi( extracted_path, downgrade_dict[vm_name][device_choice]["Version"], - downgrade=True + downgrade=True, ) if vm_name == "dom0": self._install_dom0_firmware_downgrade(self.arch_path) @@ -828,13 +733,14 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): updev_dict -- update/device information dictionary level -- level of the tree """ + def _tabs(key_word): - return key_word + '\t'*(4 - int(len(key_word)/8)) + return key_word + "\t" * (4 - int(len(key_word) / 8)) decorator = "===================================" - print(2*decorator) + print(2 * decorator) for updev_key in updev_dict: - style = '\t'*level + style = "\t" * level output = style + _tabs(updev_key + ":") if len(updev_key) > 12: continue @@ -844,7 +750,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): continue if updev_key == "Name": print(style + updev_dict["Name"]) - print(2*decorator) + print(2 * decorator) continue if isinstance(updev_dict[updev_key], str): print(output + updev_dict[updev_key]) @@ -853,9 +759,9 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): elif isinstance(updev_dict[updev_key][0], str): for i, data in enumerate(updev_dict[updev_key]): if i == 0: - print(output + u'\u00B7' + data) + print(output + "\u00B7" + data) continue - print(style + _tabs(' ') + u'\u00B7' + data) + print(style + _tabs(" ") + "\u00B7" + data) elif isinstance(updev_dict[updev_key][0], dict): if level == 0 and help_f is True: print(output) @@ -866,7 +772,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): print(f"{USBVM_N} {output}") for nested_dict in updev_dict[updev_key]: - self._output_crawler(nested_dict, level+1) + self._output_crawler(nested_dict, level + 1) def _updates_crawler(self, updates_list, usbvm=False, prefix=0): """Prints updates information for dom0 and usbvm @@ -899,10 +805,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): print(f" Current firmware version:\t {device['Version']}") for update in device["Releases"]: print(decorator) - print( - " Firmware update " - f"version:\t {update['Version']}" - ) + print(" Firmware update " f"version:\t {update['Version']}") print(f" URL:\t {update['Url']}") print(f" SHA256 checksum:\t {update['Checksum']}") description = update["Description"].replace("

", "") @@ -960,14 +863,8 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): def check_usbvm(self): """Checks if usbvm is running""" - cmd_xl_list = [ - "xl", - "list" - ] - p = subprocess.Popen( - cmd_xl_list, - stdout=subprocess.PIPE - ) + cmd_xl_list = ["xl", "list"] + p = subprocess.Popen(cmd_xl_list, stdout=subprocess.PIPE) self.output = p.communicate()[0].decode() if p.returncode != 0: raise Exception("fwudp-qubes: Firmware downgrade failed") @@ -1011,13 +908,9 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): """ self._check_fwupdtool_version() if metadata_url: - custom_metadata_name = metadata_url.replace( - FWUPD_DOWNLOAD_PREFIX, - "" - ) + custom_metadata_name = metadata_url.replace(FWUPD_DOWNLOAD_PREFIX, "") self.metadata_file = os.path.join( - FWUPD_DOM0_METADATA_DIR, - custom_metadata_name + FWUPD_DOM0_METADATA_DIR, custom_metadata_name ) else: self.metadata_file = FWUPD_DOM0_METADATA_FILE @@ -1028,10 +921,7 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): return EXIT_CODES["NOTHING_TO_DO"] if self._parse_heads_updates(device) == EXIT_CODES["NOTHING_TO_DO"]: return EXIT_CODES["NOTHING_TO_DO"] - self._download_firmware_updates( - self.heads_update_url, - self.heads_update_sha - ) + self._download_firmware_updates(self.heads_update_url, self.heads_update_sha) return_code = self._copy_heads_firmware(self.arch_path) if return_code == EXIT_CODES["NOTHING_TO_DO"]: exit(EXIT_CODES["NOTHING_TO_DO"]) @@ -1041,9 +931,9 @@ class QubesFwupdmgr(FwupdHeads, FwupdUpdate, FwupdReceiveUpdates): try: print("An update requires a reboot to complete.") choice = input("Do you want to restart now? (Y|N)\n") - if choice == 'N' or choice == 'n': + if choice == "N" or choice == "n": return EXIT_CODES["SUCCESS"] - elif choice == 'Y' or choice == 'y': + elif choice == "Y" or choice == "y": print("Rebooting...") os.system("reboot") else: @@ -1117,11 +1007,7 @@ def main(): elif sys.argv[1] == "refresh" and "--whonix" not in sys.argv: q.refresh_metadata(usbvm=sys_usb, metadata_url=metadata_url) elif sys.argv[1] == "refresh" and "--whonix" in sys.argv: - q.refresh_metadata( - usbvm=sys_usb, - whonix=True, - metadata_url=metadata_url - ) + q.refresh_metadata(usbvm=sys_usb, whonix=True, metadata_url=metadata_url) elif sys.argv[1] == "update-heads" and "--whonix" not in sys.argv: q.heads_update(device=device, metadata_url=metadata_url) elif sys.argv[1] == "update-heads" and "--whonix" in sys.argv: @@ -1131,5 +1017,5 @@ def main(): exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/contrib/qubes/src/vms/fwupd_common_vm.py b/contrib/qubes/src/vms/fwupd_common_vm.py index 473d85da5..c8eda536d 100644 --- a/contrib/qubes/src/vms/fwupd_common_vm.py +++ b/contrib/qubes/src/vms/fwupd_common_vm.py @@ -16,7 +16,7 @@ import subprocess FWUPD_VM_DIR = "/home/user/.cache/fwupd" FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -WARNING_COLOR = '\033[93m' +WARNING_COLOR = "\033[93m" FWUPD_PKI = "/etc/pki/fwupd" @@ -27,7 +27,7 @@ class FwupdVmCommon: Keyword arguments: *args -- paths to be created """ - qubes_gid = grp.getgrnam('qubes').gr_gid + qubes_gid = grp.getgrnam("qubes").gr_gid self.old_umask = os.umask(0o002) if args is None: raise Exception("Creating directories failed, no paths given.") @@ -49,14 +49,11 @@ class FwupdVmCommon: file_path -- absolute path to the file sha -- SHA256 checksum of the file """ - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: c_sha = hashlib.sha256(f.read()).hexdigest() if c_sha != sha: self.clean_vm_cache() - raise ValueError( - "Computed checksum %s did NOT match %s. " % - (c_sha, sha) - ) + raise ValueError("Computed checksum %s did NOT match %s. " % (c_sha, sha)) def validate_vm_dirs(self): """Validates and creates directories""" @@ -80,25 +77,16 @@ class FwupdVmCommon: file_path -- absolute path to jcat file file_directory -- absolute path to the directory to jcat file location """ - cmd_jcat = [ - "jcat-tool", - "verify", - f"{file_path}", - "--public-keys", - FWUPD_PKI - ] + cmd_jcat = ["jcat-tool", "verify", f"{file_path}", "--public-keys", FWUPD_PKI] p = subprocess.Popen( - cmd_jcat, - cwd=file_directory, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE + cmd_jcat, cwd=file_directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, __ = p.communicate() - verification = stdout.decode('utf-8') + verification = stdout.decode("utf-8") print(verification) if p.returncode != 0: self.clean_vm_cache() - raise Exception('jcat-tool: Verification failed') + raise Exception("jcat-tool: Verification failed") def clean_vm_cache(self): """Removes updates data""" diff --git a/contrib/qubes/src/vms/fwupd_download_updates.py b/contrib/qubes/src/vms/fwupd_download_updates.py index a6505174f..d4596c8a6 100644 --- a/contrib/qubes/src/vms/fwupd_download_updates.py +++ b/contrib/qubes/src/vms/fwupd_download_updates.py @@ -41,12 +41,7 @@ class DownloadData(FwupdVmCommon): metadata_url = METADATA_URL else: metadata_url = self.custom_url - cmd_metadata = [ - "wget", - "-P", - FWUPD_VM_METADATA_DIR, - metadata_url - ] + cmd_metadata = ["wget", "-P", FWUPD_VM_METADATA_DIR, metadata_url] p = subprocess.Popen(cmd_metadata) p.wait() if p.returncode != 0: @@ -62,12 +57,7 @@ class DownloadData(FwupdVmCommon): metadata_url = METADATA_URL else: metadata_url = self.custom_url - cmd_metadata = [ - "wget", - "-P", - FWUPD_VM_METADATA_DIR, - f"{metadata_url}.jcat" - ] + cmd_metadata = ["wget", "-P", FWUPD_VM_METADATA_DIR, f"{metadata_url}.jcat"] p = subprocess.Popen(cmd_metadata) p.wait() if p.returncode != 0: @@ -81,20 +71,13 @@ class DownloadData(FwupdVmCommon): """Downloads default metadata and its signatures""" if url is not None: self.custom_url = url - custom_metadata_name = url.replace( - FWUPD_DOWNLOAD_PREFIX, - "" - ) + custom_metadata_name = url.replace(FWUPD_DOWNLOAD_PREFIX, "") self.metadata_file = os.path.join( - FWUPD_VM_METADATA_DIR, - custom_metadata_name + FWUPD_VM_METADATA_DIR, custom_metadata_name ) else: self.custom_url = None - self.metadata_file = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz" - ) + self.metadata_file = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") self.validate_vm_dirs() self._download_metadata_file() self._download_metadata_jcat() @@ -110,12 +93,7 @@ class DownloadData(FwupdVmCommon): self.arch_name = url.replace("https://fwupd.org/downloads/", "") self._decrypt_update_url(url) update_path = os.path.join(FWUPD_VM_UPDATES_DIR, self.arch_name) - cmd_update = [ - "wget", - "-O", - update_path, - self.dec_url - ] + cmd_update = ["wget", "-O", update_path, self.dec_url] p = subprocess.Popen(cmd_update) p.wait() if p.returncode != 0: @@ -145,5 +123,5 @@ def main(): raise Exception("Invalid command!!!") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/contrib/qubes/src/vms/fwupd_usbvm_validate.py b/contrib/qubes/src/vms/fwupd_usbvm_validate.py index d8435b46d..973cd0f14 100644 --- a/contrib/qubes/src/vms/fwupd_usbvm_validate.py +++ b/contrib/qubes/src/vms/fwupd_usbvm_validate.py @@ -18,14 +18,8 @@ from fwupd_common_vm import FwupdVmCommon FWUPD_VM_DIR = "/home/user/.cache/fwupd" FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -FWUPD_VM_METADATA_JCAT = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz.jcat" -) -FWUPD_VM_METADATA_FILE = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz" -) +FWUPD_VM_METADATA_JCAT = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat") +FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") FWUPDMGR = "/bin/fwupdmgr" FWUPD_DOWNLOAD_PREFIX = "https://fwupd.org/downloads/" @@ -41,18 +35,14 @@ class FwupdUsbvmUpdates(FwupdVmCommon): """ for untrusted_f in os.listdir(files_path): if not regex_pattern.match(untrusted_f): - raise Exception( - 'Dom0 sent unexpected file' - ) + raise Exception("Dom0 sent unexpected file") f = untrusted_f - assert '/' not in f - assert '\0' not in f - assert '\x1b' not in f + assert "/" not in f + assert "\0" not in f + assert "\x1b" not in f path_f = os.path.join(files_path, f) if os.path.islink(path_f) or not os.path.isfile(path_f): - raise Exception( - 'Dom0 sent not regular file' - ) + raise Exception("Dom0 sent not regular file") def _extract_archive(self, archive_path, output_path): """Extracts archive file to the specified directory. @@ -61,39 +51,22 @@ class FwupdUsbvmUpdates(FwupdVmCommon): archive_path -- absolute path to archive file output_path -- absolute path to the output directory """ - cmd_extract = [ - "gcab", - "-x", - f"--directory={output_path}", - f"{archive_path}" - ] + cmd_extract = ["gcab", "-x", f"--directory={output_path}", f"{archive_path}"] p = subprocess.Popen(cmd_extract, stdout=subprocess.PIPE) - p.communicate()[0].decode('ascii') + p.communicate()[0].decode("ascii") if p.returncode != 0: - raise Exception( - 'gcab: Error while extracting %s.' % - archive_path - ) + raise Exception("gcab: Error while extracting %s." % archive_path) def validate_metadata(self, metadata_url=None): """Validates received the metadata files.""" print("Running validation of the metadata files") if metadata_url: - metadata_name = metadata_url.replace( - FWUPD_DOWNLOAD_PREFIX, - "" - ) - metadata_file = os.path.join( - FWUPD_VM_METADATA_DIR, - metadata_name - ) + metadata_name = metadata_url.replace(FWUPD_DOWNLOAD_PREFIX, "") + metadata_file = os.path.join(FWUPD_VM_METADATA_DIR, metadata_name) else: metadata_file = FWUPD_VM_METADATA_FILE try: - self._jcat_verification( - f"{metadata_file}.jcat", - FWUPD_VM_METADATA_DIR - ) + self._jcat_verification(f"{metadata_file}.jcat", FWUPD_VM_METADATA_DIR) except Exception as e: print(str(e), file=sys.stderr) self.clean_vm_cache() @@ -113,10 +86,7 @@ class FwupdUsbvmUpdates(FwupdVmCommon): arch_temp = os.path.join(output_path, archive_name) os.mkdir(output_path) shutil.copyfile(archive_path, arch_temp) - self._extract_archive( - arch_temp, - output_path - ) + self._extract_archive(arch_temp, output_path) signature_name = os.path.join(output_path, "firmware*.jcat") file_path = glob.glob(signature_name) try: @@ -145,8 +115,7 @@ def main(): f.clean_vm_cache() elif sys.argv[1] == "updates" and len(sys.argv) < 4: raise Exception( - "Invalid number of arguments.\n" - "Expected archive path and checksum." + "Invalid number of arguments.\n" "Expected archive path and checksum." ) elif sys.argv[1] == "updates" and not len(sys.argv) < 4: f.validate_updates(sys.argv[2], sys.argv[3]) @@ -154,5 +123,5 @@ def main(): raise Exception("Invalid command") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/contrib/qubes/test/test_qubes_fwupd_heads.py b/contrib/qubes/test/test_qubes_fwupd_heads.py index a3cccd57b..248b52971 100644 --- a/contrib/qubes/test/test_qubes_fwupd_heads.py +++ b/contrib/qubes/test/test_qubes_fwupd_heads.py @@ -26,21 +26,15 @@ QUBES_FWUPDMGR_BINDIR = "/usr/sbin/qubes-fwupdmgr" class TestQubesFwupdHeads(unittest.TestCase): def setUp(self): if os.path.exists(QUBES_FWUPDMGR_REPO): - self.qfwupd = imp.load_source( - "qubes_fwupdmgr", - QUBES_FWUPDMGR_REPO - ) + self.qfwupd = imp.load_source("qubes_fwupdmgr", QUBES_FWUPDMGR_REPO) elif os.path.exists(QUBES_FWUPDMGR_BINDIR): - self.qfwupd = imp.load_source( - "qubes_fwupdmgr", - QUBES_FWUPDMGR_BINDIR - ) + self.qfwupd = imp.load_source("qubes_fwupdmgr", QUBES_FWUPDMGR_BINDIR) self.q = qf_heads.FwupdHeads() self.maxDiff = 2000 self.captured_output = io.StringIO() sys.stdout = self.captured_output - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_get_hwids(self): self.q._check_fwupdtool_version() self.q._get_hwids() @@ -56,12 +50,11 @@ class TestQubesFwupdHeads(unittest.TestCase): self.q._gather_firmware_version() self.assertEqual(self.q.heads_version, "0.2.2") - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_parse_metadata(self): qmgr = self.qfwupd.QubesFwupdmgr() qmgr.metadata_file = CUSTOM_METADATA.replace( - "https://fwupd.org/downloads", - self.qfwupd.FWUPD_DOM0_METADATA_DIR + "https://fwupd.org/downloads", self.qfwupd.FWUPD_DOM0_METADATA_DIR ) qmgr._download_metadata(metadata_url=CUSTOM_METADATA) self.q._parse_metadata(qmgr.metadata_file) @@ -74,16 +67,12 @@ class TestQubesFwupdHeads(unittest.TestCase): self.assertEqual(return_code, 0) self.assertEqual( self.q.heads_update_url, - "https://fwupd.org/downloads/e747a435bf24fd6081b77b6704b39cec5fa2dcf62e0ca6b86d8a6460121a1d07-heads_coreboot_x230-v0_2_3.cab" + "https://fwupd.org/downloads/e747a435bf24fd6081b77b6704b39cec5fa2dcf62e0ca6b86d8a6460121a1d07-heads_coreboot_x230-v0_2_3.cab", ) self.assertEqual( - self.q.heads_update_sha, - "1a54e69ca2b58d1218035115d481480eaf4c66e4" - ) - self.assertEqual( - self.q.heads_update_version, - "0.2.3" + self.q.heads_update_sha, "1a54e69ca2b58d1218035115d481480eaf4c66e4" ) + self.assertEqual(self.q.heads_update_version, "0.2.3") def test_check_heads_updates_no_updates(self): self.q.metadata_info = HEADS_XML @@ -98,30 +87,24 @@ class TestQubesFwupdHeads(unittest.TestCase): self.assertEqual(return_code, 0) self.assertEqual( self.q.heads_update_url, - "https://fwupd.org/downloads/e747a435bf24fd6081b77b6704b39cec5fa2dcf62e0ca6b86d8a6460121a1d07-heads_coreboot_x230-v0_2_3.cab" + "https://fwupd.org/downloads/e747a435bf24fd6081b77b6704b39cec5fa2dcf62e0ca6b86d8a6460121a1d07-heads_coreboot_x230-v0_2_3.cab", ) self.assertEqual( - self.q.heads_update_sha, - "1a54e69ca2b58d1218035115d481480eaf4c66e4" - ) - self.assertEqual( - self.q.heads_update_version, - "0.2.3" + self.q.heads_update_sha, "1a54e69ca2b58d1218035115d481480eaf4c66e4" ) + self.assertEqual(self.q.heads_update_version, "0.2.3") - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_copy_heads_firmware(self): qmgr = self.qfwupd.QubesFwupdmgr() self.q.heads_update_url = "https://fwupd.org/downloads/e747a435bf24fd6081b77b6704b39cec5fa2dcf62e0ca6b86d8a6460121a1d07-heads_coreboot_x230-v0_2_3.cab" self.q.heads_update_sha = "1a54e69ca2b58d1218035115d481480eaf4c66e4" self.q.heads_update_version = "0.2.3" qmgr._download_firmware_updates( - self.q.heads_update_url, - self.q.heads_update_sha + self.q.heads_update_url, self.q.heads_update_sha ) heads_boot_path = os.path.join( - qf_heads.HEADS_UPDATES_DIR, - self.q.heads_update_version + qf_heads.HEADS_UPDATES_DIR, self.q.heads_update_version ) if os.path.exists(heads_boot_path): shutil.rmtree(heads_boot_path) @@ -131,5 +114,5 @@ class TestQubesFwupdHeads(unittest.TestCase): self.assertTrue(os.path.exists(firmware_path)) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/contrib/qubes/test/test_qubes_fwupdmgr.py b/contrib/qubes/test/test_qubes_fwupdmgr.py index cede254d7..104d29fff 100755 --- a/contrib/qubes/test/test_qubes_fwupdmgr.py +++ b/contrib/qubes/test/test_qubes_fwupdmgr.py @@ -26,39 +26,23 @@ QUBES_FWUPDMGR_REPO = "./src/qubes_fwupdmgr.py" QUBES_FWUPDMGR_BINDIR = "/usr/sbin/qubes-fwupdmgr" if os.path.exists(QUBES_FWUPDMGR_REPO): - qfwupd = imp.load_source( - "qubes_fwupdmgr", - QUBES_FWUPDMGR_REPO - ) + qfwupd = imp.load_source("qubes_fwupdmgr", QUBES_FWUPDMGR_REPO) elif os.path.exists(QUBES_FWUPDMGR_BINDIR): - qfwupd = imp.load_source( - "qubes_fwupdmgr", - QUBES_FWUPDMGR_BINDIR - ) + qfwupd = imp.load_source("qubes_fwupdmgr", QUBES_FWUPDMGR_BINDIR) FWUPD_DOM0_DIR = "/root/.cache/fwupd" FWUPD_DOM0_UPDATES_DIR = os.path.join(FWUPD_DOM0_DIR, "updates") FWUPD_DOM0_UNTRUSTED_DIR = os.path.join(FWUPD_DOM0_UPDATES_DIR, "untrusted") FWUPD_VM_LOG = os.path.join(FWUPD_DOM0_DIR, "usbvm-devices.log") FWUPD_DOM0_METADATA_DIR = os.path.join(FWUPD_DOM0_DIR, "metadata") -FWUPD_DOM0_METADATA_FILE = os.path.join( - FWUPD_DOM0_METADATA_DIR, - "firmware.xml.gz" -) -FWUPD_DOM0_METADATA_FILE_JCAT = os.path.join( - FWUPD_DOM0_METADATA_DIR, - "firmware.xml.gz" -) +FWUPD_DOM0_METADATA_FILE = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz") +FWUPD_DOM0_METADATA_FILE_JCAT = os.path.join(FWUPD_DOM0_METADATA_DIR, "firmware.xml.gz") FWUPD_VM_DIR = "/home/user/.cache/fwupd" FWUPD_VM_UPDATES_DIR = os.path.join(FWUPD_VM_DIR, "updates") FWUPD_VM_METADATA_DIR = os.path.join(FWUPD_VM_DIR, "metadata") -FWUPD_VM_METADATA_FILE = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz" -) +FWUPD_VM_METADATA_FILE = os.path.join(FWUPD_VM_METADATA_DIR, "firmware.xml.gz") FWUPD_VM_METADATA_FILE_JCAT = os.path.join( - FWUPD_VM_METADATA_DIR, - "firmware.xml.gz.jcat" + FWUPD_VM_METADATA_DIR, "firmware.xml.gz.jcat" ) REQUIRED_DEV = "Requires device not connected" REQUIRED_USBVM = "Requires sys-usb" @@ -73,7 +57,7 @@ CUSTOM_METADATA = "https://fwupd.org/downloads/firmware-3c81bfdc9db5c8a42c09d380 def check_usbvm(): """Checks if sys-usb is running""" - if 'qubes' not in platform.release(): + if "qubes" not in platform.release(): return False q = qfwupd.QubesFwupdmgr() q.check_usbvm() @@ -82,7 +66,7 @@ def check_usbvm(): def device_connected_dom0(): """Checks if the testing device is connected in dom0""" - if 'qubes' not in platform.release(): + if "qubes" not in platform.release(): return False q = qfwupd.QubesFwupdmgr() q._get_dom0_devices() @@ -104,7 +88,7 @@ def device_connected_usbvm(): def check_whonix_updatevm(): """Checks if the sys-whonix is running""" - if 'qubes' not in platform.release(): + if "qubes" not in platform.release(): return False q = qfwupd.QubesFwupdmgr() q.check_usbvm() @@ -118,7 +102,7 @@ class TestQubesFwupdmgr(unittest.TestCase): self.captured_output = io.StringIO() sys.stdout = self.captured_output - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_download_metadata(self): self.q.metadata_file = FWUPD_DOM0_METADATA_FILE self.q._download_metadata() @@ -144,13 +128,12 @@ class TestQubesFwupdmgr(unittest.TestCase): msg="Metadata signature does not exist", ) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_download_custom_metadata(self): self.q.metadata_file = CUSTOM_METADATA.replace( - "https://fwupd.org/downloads", - FWUPD_DOM0_METADATA_DIR + "https://fwupd.org/downloads", FWUPD_DOM0_METADATA_DIR ) - self.q.metadata_file_jcat = self.q.metadata_file + '.jcat' + self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" self.q._download_metadata(metadata_url=CUSTOM_METADATA) self.assertTrue( os.path.exists(self.q.metadata_file), @@ -161,22 +144,22 @@ class TestQubesFwupdmgr(unittest.TestCase): msg="Metadata signature does not exist", ) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_refresh_metadata_dom0(self): self.q.refresh_metadata() self.assertEqual( self.q.output, - 'Successfully refreshed metadata manually\n', - msg="Metadata refresh failed." + "Successfully refreshed metadata manually\n", + msg="Metadata refresh failed.", ) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_refresh_metadata_dom0_custom(self): self.q.refresh_metadata(metadata_url=CUSTOM_METADATA) self.assertEqual( self.q.output, - 'Successfully refreshed metadata manually\n', - msg="Metadata refresh failed." + "Successfully refreshed metadata manually\n", + msg="Metadata refresh failed.", ) @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) @@ -184,8 +167,8 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q.refresh_metadata(usbvm=True) self.assertEqual( self.q.output, - 'Successfully refreshed metadata manually\n', - msg="Metadata refresh failed." + "Successfully refreshed metadata manually\n", + msg="Metadata refresh failed.", ) @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) @@ -193,8 +176,8 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q.refresh_metadata(usbvm=True, metadata_url=CUSTOM_METADATA) self.assertEqual( self.q.output, - 'Successfully refreshed metadata manually\n', - msg="Metadata refresh failed." + "Successfully refreshed metadata manually\n", + msg="Metadata refresh failed.", ) @unittest.skipUnless(check_whonix_updatevm(), "Requires sys-whonix") @@ -202,64 +185,55 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q.refresh_metadata(whonix=True) self.assertEqual( self.q.output, - 'Successfully refreshed metadata manually\n', - msg="Metadata refresh failed." + "Successfully refreshed metadata manually\n", + msg="Metadata refresh failed.", ) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_get_dom0_updates(self): self.q._get_dom0_updates() self.assertIn( - "Devices", - self.q.dom0_updates_info, - msg="Getting available updates failed" + "Devices", self.q.dom0_updates_info, msg="Getting available updates failed" ) def test_parse_updates_info(self): self.q._parse_dom0_updates_info(UPDATE_INFO) self.assertEqual( - self.q.dom0_updates_list[0]["Name"], - "ColorHug2", - msg="Wrong device name" + self.q.dom0_updates_list[0]["Name"], "ColorHug2", msg="Wrong device name" ) self.assertEqual( - self.q.dom0_updates_list[0]["Version"], - "2.0.6", - msg="Wrong update version" + self.q.dom0_updates_list[0]["Version"], "2.0.6", msg="Wrong update version" ) self.assertEqual( self.q.dom0_updates_list[0]["Releases"][0]["Url"], "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab", - msg="Wrong update URL" + msg="Wrong update URL", ) self.assertEqual( self.q.dom0_updates_list[0]["Releases"][0]["Checksum"], "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda", - msg="Wrong checksum" + msg="Wrong checksum", ) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_download_firmware_updates(self): self.q._download_firmware_updates( "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab", - "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda" + "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda", ) update_path = os.path.join( FWUPD_DOM0_UPDATES_DIR, - "0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7" + "0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7", ) self.assertTrue(os.path.exists(update_path)) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_download_firmware_special_char(self): self.q._download_firmware_updates( "https://fwupd.org/downloads/bc334d8b098f2e91603c5f7dfdc837fb01797bbe-Dell%20XPS%2015%209560&Precision%205520%20System%20BIOS_Ver.1.18.0.cab", - "86d9e5e35b0b264be1bb1e49ec16ccd1330390423bfe962267a58c27be7712b8" - ) - update_path = os.path.join( - FWUPD_DOM0_UPDATES_DIR, - "trusted" + "86d9e5e35b0b264be1bb1e49ec16ccd1330390423bfe962267a58c27be7712b8", ) + update_path = os.path.join(FWUPD_DOM0_UPDATES_DIR, "trusted") self.assertTrue(os.path.exists(update_path)) @unittest.skipUnless(check_whonix_updatevm(), "Requires sys-whonix") @@ -271,62 +245,53 @@ class TestQubesFwupdmgr(unittest.TestCase): ) update_path = os.path.join( FWUPD_DOM0_UPDATES_DIR, - "0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7" + "0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7", ) self.assertTrue(os.path.exists(update_path)) def test_user_input_empty_dict(self): - downgrade_dict = { - "usbvm": [], - "dom0": [] - } + downgrade_dict = {"usbvm": [], "dom0": []} self.assertEqual(self.q._user_input(downgrade_dict), 2) def test_user_input_n(self): - user_input = ['sth', 'n'] - with patch('builtins.input', side_effect=user_input): + user_input = ["sth", "n"] + with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) downgrade_dict = { "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list + "dom0": self.q.dom0_updates_list, } - choice = self.q._user_input( - downgrade_dict, - usbvm=True - ) + choice = self.q._user_input(downgrade_dict, usbvm=True) self.assertEqual(choice, 2) - user_input = ['sth', 'N'] - with patch('builtins.input', side_effect=user_input): + user_input = ["sth", "N"] + with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) downgrade_dict = { "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list + "dom0": self.q.dom0_updates_list, } - choice = self.q._user_input( - downgrade_dict, - usbvm=True - ) + choice = self.q._user_input(downgrade_dict, usbvm=True) self.assertEqual(choice, 2) def test_user_input_choice(self): - user_input = ['6', '1'] - with patch('builtins.input', side_effect=user_input): + user_input = ["6", "1"] + with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) updates_dict = { "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list + "dom0": self.q.dom0_updates_list, } key, choice = self.q._user_input(updates_dict) self.assertEqual(key, "dom0") self.assertEqual(choice, 0) def test_user_input_choice_usbvm(self): - user_input = ['6', '2'] - with patch('builtins.input', side_effect=user_input): + user_input = ["6", "2"] + with patch("builtins.input", side_effect=user_input): self.q._parse_dom0_updates_info(UPDATE_INFO) updates_dict = { "usbvm": self.q.dom0_updates_list, - "dom0": self.q.dom0_updates_list + "dom0": self.q.dom0_updates_list, } key, choice = self.q._user_input(updates_dict, usbvm=True) self.assertEqual(key, "usbvm") @@ -338,18 +303,15 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q._parse_parameters(update_dict, "dom0", 0) self.assertEqual( self.q.url, - "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab" + "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab", ) self.assertEqual( self.q.sha, - "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda" - ) - self.assertEqual( - self.q.version, - "2.0.7" + "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda", ) + self.assertEqual(self.q.version, "2.0.7") - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_clean_cache_dom0(self): self.q.clean_cache() self.assertFalse(os.path.exists(FWUPD_DOM0_METADATA_DIR)) @@ -365,28 +327,20 @@ class TestQubesFwupdmgr(unittest.TestCase): "qvm-run", "--pass-io", "sys-usb", - f"! [ -d {FWUPD_VM_METADATA_DIR} ]" + f"! [ -d {FWUPD_VM_METADATA_DIR} ]", ] p = subprocess.Popen(cmd_validate_metadata) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Creating metadata directory failed" - ) + self.assertEqual(p.returncode, 0, msg="Creating metadata directory failed") cmd_validate_udpdate = [ "qvm-run", "--pass-io", "sys-usb", - f"! [ -d {FWUPD_VM_UPDATES_DIR} ]" + f"! [ -d {FWUPD_VM_UPDATES_DIR} ]", ] p = subprocess.Popen(cmd_validate_udpdate) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Cleaning update directory failed" - ) + self.assertEqual(p.returncode, 0, msg="Cleaning update directory failed") def test_output_crawler(self): crawler_output = io.StringIO() @@ -394,17 +348,16 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q._output_crawler(json.loads(UPDATE_INFO), 0) with open("test/logs/get_devices.log", "r") as get_devices: self.assertEqual( - get_devices.read(), - crawler_output.getvalue().strip() + "\n" + get_devices.read(), crawler_output.getvalue().strip() + "\n" ) sys.stdout = self.captured_output - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_get_dom0_devices(self): self.q._get_dom0_devices() self.assertIsNotNone(self.q.dom0_devices_info) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_get_devices_qubes_dom0(self): get_devices_output = io.StringIO() sys.stdout = get_devices_output @@ -441,45 +394,36 @@ class TestQubesFwupdmgr(unittest.TestCase): sys.stdout = help_output self.q.help() with open("test/logs/help.log", "r") as help_log: - self.assertEqual( - help_log.read(), - help_output.getvalue().strip() + "\n" - ) + self.assertEqual(help_log.read(), help_output.getvalue().strip() + "\n") sys.stdout = self.captured_output @patch( - 'test.test_qubes_fwupdmgr.qfwupd.QubesFwupdmgr._read_dmi', - return_value=DMI_DECODE + "test.test_qubes_fwupdmgr.qfwupd.QubesFwupdmgr._read_dmi", + return_value=DMI_DECODE, ) def test_verify_dmi(self, output): self.q.dmi_version = "P.1.0" self.q._verify_dmi("test/logs/", "P1.1") @patch( - 'test.test_qubes_fwupdmgr.qfwupd.QubesFwupdmgr._read_dmi', - return_value=DMI_DECODE + "test.test_qubes_fwupdmgr.qfwupd.QubesFwupdmgr._read_dmi", + return_value=DMI_DECODE, ) def test_verify_dmi_wrong_vendor(self, output): with self.assertRaises(ValueError) as wrong_vendor: self.q.dmi_version = "P.1.0" self.q._verify_dmi("test/logs/metainfo_name/", "P1.1") - self.assertIn( - "Wrong firmware provider.", - str(wrong_vendor.exception) - ) + self.assertIn("Wrong firmware provider.", str(wrong_vendor.exception)) @patch( - 'test.test_qubes_fwupdmgr.qfwupd.QubesFwupdmgr._read_dmi', - return_value=DMI_DECODE + "test.test_qubes_fwupdmgr.qfwupd.QubesFwupdmgr._read_dmi", + return_value=DMI_DECODE, ) def test_verify_dmi_version(self, output): self.q.dmi_version = "P1.0" with self.assertRaises(ValueError) as downgrade: self.q._verify_dmi("test/logs/metainfo_version/", "P0.1") - self.assertIn( - "P0.1 < P1.0 Downgrade not allowed", - str(downgrade.exception) - ) + self.assertIn("P0.1 < P1.0 Downgrade not allowed", str(downgrade.exception)) @unittest.skipUnless(device_connected_dom0(), REQUIRED_DEV) def test_downgrade_firmware_dom0(self): @@ -494,8 +438,8 @@ class TestQubesFwupdmgr(unittest.TestCase): break if old_version is None: self.fail("Test device not found") - user_input = [str(number+1), '1'] - with patch('builtins.input', side_effect=user_input): + user_input = [str(number + 1), "1"] + with patch("builtins.input", side_effect=user_input): self.q.downgrade_firmware() self.q._get_dom0_devices() downgrades = self.q._parse_downgrades(self.q.dom0_devices_info) @@ -503,16 +447,13 @@ class TestQubesFwupdmgr(unittest.TestCase): self.assertGreater(l_ver(old_version), l_ver(new_version)) @unittest.skipUnless( - check_whonix_updatevm() and device_connected_usbvm(), - REQUIRED_DEV + check_whonix_updatevm() and device_connected_usbvm(), REQUIRED_DEV ) def test_update_n_downgrade_firmware_whonix(self): old_version = None self.q.clean_cache(usbvm=True) self.q._get_dom0_devices() - dom0_downgrades = self.q._parse_downgrades( - self.q.dom0_devices_info - ) + dom0_downgrades = self.q._parse_downgrades(self.q.dom0_devices_info) self.q._get_usbvm_devices() with open(FWUPD_VM_LOG) as usbvm_device_info: raw = usbvm_device_info.read() @@ -525,8 +466,8 @@ class TestQubesFwupdmgr(unittest.TestCase): break if old_version is None: self.fail("Test device not found") - user_input = [str(number+1+len(dom0_downgrades)), '1'] - with patch('builtins.input', side_effect=user_input): + user_input = [str(number + 1 + len(dom0_downgrades)), "1"] + with patch("builtins.input", side_effect=user_input): self.q.downgrade_firmware(usbvm=True, whonix=True) self.q._get_usbvm_devices() with open(FWUPD_VM_LOG) as usbvm_device_info: @@ -550,8 +491,8 @@ class TestQubesFwupdmgr(unittest.TestCase): break if old_version is None: self.fail("Test device not found") - user_input = [str(number+1+len(self.q.dom0_updates_list)), '1'] - with patch('builtins.input', side_effect=user_input): + user_input = [str(number + 1 + len(self.q.dom0_updates_list)), "1"] + with patch("builtins.input", side_effect=user_input): self.q.update_firmware(usbvm=True, whonix=True) self.q._get_usbvm_devices() with open(FWUPD_VM_LOG) as usbvm_device_info: @@ -571,9 +512,7 @@ class TestQubesFwupdmgr(unittest.TestCase): def test_downgrade_firmware_usbvm(self): old_version = None self.q._get_dom0_devices() - dom0_downgrades = self.q._parse_downgrades( - self.q.dom0_devices_info - ) + dom0_downgrades = self.q._parse_downgrades(self.q.dom0_devices_info) self.q._get_usbvm_devices() with open(FWUPD_VM_LOG) as usbvm_device_info: raw = usbvm_device_info.read() @@ -586,8 +525,8 @@ class TestQubesFwupdmgr(unittest.TestCase): break if old_version is None: self.fail("Test device not found") - user_input = [str(number+1+len(dom0_downgrades)), '1'] - with patch('builtins.input', side_effect=user_input): + user_input = [str(number + 1 + len(dom0_downgrades)), "1"] + with patch("builtins.input", side_effect=user_input): self.q.downgrade_firmware(usbvm=True) self.q._get_usbvm_devices() with open(FWUPD_VM_LOG) as usbvm_device_info: @@ -598,95 +537,62 @@ class TestQubesFwupdmgr(unittest.TestCase): def test_parse_downgrades(self): downgrades = self.q._parse_downgrades(GET_DEVICES) - self.assertEqual( - downgrades[0]["Name"], - "ColorHug2" - ) - self.assertEqual( - downgrades[0]["Version"], - "2.0.6" - ) - self.assertEqual( - downgrades[0]["Releases"][0]["Version"], - "2.0.5" - ) + self.assertEqual(downgrades[0]["Name"], "ColorHug2") + self.assertEqual(downgrades[0]["Version"], "2.0.6") + self.assertEqual(downgrades[0]["Releases"][0]["Version"], "2.0.5") self.assertEqual( downgrades[0]["Releases"][0]["Url"], - "https://fwupd.org/downloads/f7dd4ab29fa610438571b8b62b26b0b0e57bb35b-hughski-colorhug2-2.0.5.cab" + "https://fwupd.org/downloads/f7dd4ab29fa610438571b8b62b26b0b0e57bb35b-hughski-colorhug2-2.0.5.cab", ) self.assertEqual( downgrades[0]["Releases"][0]["Checksum"], - "8cd379eb2e1467e4fda92c20650306dc7e598b1d421841bbe19d9ed6ea01e3ee" + "8cd379eb2e1467e4fda92c20650306dc7e598b1d421841bbe19d9ed6ea01e3ee", ) def test_parse_downgrades_no_version(self): downgrades = self.q._parse_downgrades(GET_DEVICES_NO_VERSION) - self.assertEqual( - downgrades[0]["Name"], - "ColorHug2" - ) - self.assertEqual( - downgrades[0]["Version"], - "2.0.6" - ) - self.assertEqual( - downgrades[0]["Releases"][0]["Version"], - "2.0.5" - ) + self.assertEqual(downgrades[0]["Name"], "ColorHug2") + self.assertEqual(downgrades[0]["Version"], "2.0.6") + self.assertEqual(downgrades[0]["Releases"][0]["Version"], "2.0.5") self.assertEqual( downgrades[0]["Releases"][0]["Url"], - "https://fwupd.org/downloads/f7dd4ab29fa610438571b8b62b26b0b0e57bb35b-hughski-colorhug2-2.0.5.cab" + "https://fwupd.org/downloads/f7dd4ab29fa610438571b8b62b26b0b0e57bb35b-hughski-colorhug2-2.0.5.cab", ) self.assertEqual( downgrades[0]["Releases"][0]["Checksum"], - "4ee9dfa38df3b810f739d8a19d13da1b3175fb87" + "4ee9dfa38df3b810f739d8a19d13da1b3175fb87", ) def test_user_input_downgrade_usbvm(self): - user_input = ['2', '6', 'sth', '2.2.1', '', ' ', '\0', '2'] - with patch('builtins.input', side_effect=user_input): + user_input = ["2", "6", "sth", "2.2.1", "", " ", "\0", "2"] + with patch("builtins.input", side_effect=user_input): downgrade_list = self.q._parse_downgrades(GET_DEVICES) - downgrade_dict = { - "usbvm": downgrade_list, - "dom0": downgrade_list - } + downgrade_dict = {"usbvm": downgrade_list, "dom0": downgrade_list} key, device_choice, downgrade_choice = self.q._user_input( - downgrade_dict, - downgrade=True, - usbvm=True + downgrade_dict, downgrade=True, usbvm=True ) self.assertEqual(key, "usbvm") self.assertEqual(device_choice, 0) self.assertEqual(downgrade_choice, 1) def test_user_input_downgrade_dom0(self): - user_input = ['1', '6', 'sth', '2.2.1', '', ' ', '\0', '2'] - with patch('builtins.input', side_effect=user_input): + user_input = ["1", "6", "sth", "2.2.1", "", " ", "\0", "2"] + with patch("builtins.input", side_effect=user_input): downgrade_list = self.q._parse_downgrades(GET_DEVICES) - downgrade_dict = { - "dom0": downgrade_list - } + downgrade_dict = {"dom0": downgrade_list} key, device_choice, downgrade_choice = self.q._user_input( - downgrade_dict, - downgrade=True, + downgrade_dict, downgrade=True ) self.assertEqual(key, "dom0") self.assertEqual(device_choice, 0) self.assertEqual(downgrade_choice, 1) def test_user_input_downgrade_N(self): - user_input = ['N'] - with patch('builtins.input', side_effect=user_input): + user_input = ["N"] + with patch("builtins.input", side_effect=user_input): downgrade_list = self.q._parse_downgrades(GET_DEVICES) - downgrade_dict = { - "usbvm": downgrade_list, - "dom0": downgrade_list - } - N_choice = self.q._user_input( - downgrade_dict, - downgrade=True, - usbvm=True - ) + downgrade_dict = {"usbvm": downgrade_list, "dom0": downgrade_list} + N_choice = self.q._user_input(downgrade_dict, downgrade=True, usbvm=True) self.assertEqual(N_choice, 2) @unittest.skipUnless(device_connected_dom0(), REQUIRED_DEV) @@ -703,8 +609,8 @@ class TestQubesFwupdmgr(unittest.TestCase): break if old_version is None: self.fail("Test device not found") - user_input = [str(number+1)] - with patch('builtins.input', side_effect=user_input): + user_input = [str(number + 1)] + with patch("builtins.input", side_effect=user_input): self.q.update_firmware() self.q._get_dom0_devices() dom0_devices_info_dict = json.loads(self.q.dom0_devices_info) @@ -736,8 +642,8 @@ class TestQubesFwupdmgr(unittest.TestCase): break if old_version is None: self.fail("Test device not found") - user_input = [str(number+1+len(self.q.dom0_updates_list)), '1'] - with patch('builtins.input', side_effect=user_input): + user_input = [str(number + 1 + len(self.q.dom0_updates_list)), "1"] + with patch("builtins.input", side_effect=user_input): self.q.update_firmware(usbvm=True) self.q._get_usbvm_devices() with open(FWUPD_VM_LOG) as usbvm_device_info: @@ -766,13 +672,13 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q.usbvm_updates_list[0]["Releases"], [ { - 'Checksum': '32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda', - 'Description': '

This release fixes prevents the firmware returning an ' - 'error when the remote SHA1 hash was never sent.

', - 'Url': 'https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab', - 'Version': '2.0.7' + "Checksum": "32c4a2c9be787cdf1d757c489d6455bd7bb14053425180b6d331c37e1ccc1cda", + "Description": "

This release fixes prevents the firmware returning an " + "error when the remote SHA1 hash was never sent.

", + "Url": "https://fwupd.org/downloads/0a29848de74d26348bc5a6e24fc9f03778eddf0e-hughski-colorhug2-2.0.7.cab", + "Version": "2.0.7", } - ] + ], ) def test_parse_usbvm_updates_no_updates_available(self): @@ -786,8 +692,7 @@ class TestQubesFwupdmgr(unittest.TestCase): self.q._updates_crawler(self.q.usbvm_updates_list, usbvm=True) with open("test/logs/get_updates.log", "r") as getupdates: self.assertEqual( - getupdates.read(), - crawler_output.getvalue().strip() + "\n" + getupdates.read(), crawler_output.getvalue().strip() + "\n" ) sys.stdout = self.captured_output @@ -798,33 +703,25 @@ class TestQubesFwupdmgr(unittest.TestCase): "qvm-run", "--pass-io", "sys-usb", - f"[ -d {FWUPD_VM_METADATA_DIR} ]" + f"[ -d {FWUPD_VM_METADATA_DIR} ]", ] p = subprocess.Popen(cmd_validate_metadata) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Creating metadata directory failed" - ) + self.assertEqual(p.returncode, 0, msg="Creating metadata directory failed") cmd_validate_udpdate = [ "qvm-run", "--pass-io", "sys-usb", - f"[ -d {FWUPD_VM_UPDATES_DIR} ]" + f"[ -d {FWUPD_VM_UPDATES_DIR} ]", ] p = subprocess.Popen(cmd_validate_udpdate) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Creating update directory failed" - ) + self.assertEqual(p.returncode, 0, msg="Creating update directory failed") @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) def test_copy_usbvm_metadata(self): self.q.metadata_file = FWUPD_DOM0_METADATA_FILE - self.q.metadata_file_jcat = self.q.metadata_file + '.jcat' + self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" self.q._download_metadata() self.q._validate_usbvm_dirs() self.q._copy_usbvm_metadata() @@ -832,30 +729,22 @@ class TestQubesFwupdmgr(unittest.TestCase): "qvm-run", "--pass-io", "sys-usb", - f"[ -f {FWUPD_VM_METADATA_FILE} ]" + f"[ -f {FWUPD_VM_METADATA_FILE} ]", ] p = subprocess.Popen(cmd_validate_metadata_file) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Metadata file does not exist" - ) + self.assertEqual(p.returncode, 0, msg="Metadata file does not exist") cmd_validate_metadata_jcat = [ "qvm-run", "--pass-io", "sys-usb", - f"[ -f {FWUPD_VM_METADATA_FILE_JCAT} ]" + f"[ -f {FWUPD_VM_METADATA_FILE_JCAT} ]", ] p = subprocess.Popen(cmd_validate_metadata_jcat) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Metadata jcat signature does not exist" - ) + self.assertEqual(p.returncode, 0, msg="Metadata jcat signature does not exist") - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_enable_lvfs_testing_dom0(self): if os.path.exists(LVFS_TESTING_DOM0_FLAG): os.remove(LVFS_TESTING_DOM0_FLAG) @@ -869,18 +758,15 @@ class TestQubesFwupdmgr(unittest.TestCase): "--pass-io", USBVM_N, ( - 'script --quiet --return --command ' + "script --quiet --return --command " f'"ls {LVFS_TESTING_USBVM_FLAG} &>/dev/null"' - ) + ), ] cmd_rm_flag = [ "qvm-run", "--pass-io", USBVM_N, - ( - 'script --quiet --return --command ' - f'"rm {LVFS_TESTING_USBVM_FLAG}"' - ) + ("script --quiet --return --command " f'"rm {LVFS_TESTING_USBVM_FLAG}"'), ] flag = subprocess.Popen(cmd_validate_flag) flag.wait() @@ -897,7 +783,7 @@ class TestQubesFwupdmgr(unittest.TestCase): @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) def test_validate_usbvm_metadata(self): self.q.metadata_file = FWUPD_DOM0_METADATA_FILE - self.q.metadata_file_jcat = self.q.metadata_file + '.jcat' + self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" self.q._download_metadata() self.q._validate_usbvm_dirs() self.q._copy_usbvm_metadata() @@ -906,7 +792,7 @@ class TestQubesFwupdmgr(unittest.TestCase): @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) def test_refresh_usbvm_metadata(self): self.q.metadata_file = FWUPD_DOM0_METADATA_FILE - self.q.metadata_file_jcat = self.q.metadata_file + '.jcat' + self.q.metadata_file_jcat = self.q.metadata_file + ".jcat" self.q.lvfs = "lvfs" self.q._download_metadata() self.q._validate_usbvm_dirs() @@ -922,28 +808,20 @@ class TestQubesFwupdmgr(unittest.TestCase): "qvm-run", "--pass-io", "sys-usb", - f"! [ -d {FWUPD_VM_METADATA_DIR} ]" + f"! [ -d {FWUPD_VM_METADATA_DIR} ]", ] p = subprocess.Popen(cmd_validate_metadata) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Cleaning metadata directory failed" - ) + self.assertEqual(p.returncode, 0, msg="Cleaning metadata directory failed") cmd_validate_udpdate = [ "qvm-run", "--pass-io", "sys-usb", - f"! [ -d {FWUPD_VM_METADATA_DIR} ]" + f"! [ -d {FWUPD_VM_METADATA_DIR} ]", ] p = subprocess.Popen(cmd_validate_udpdate) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Cleaning update directory failed" - ) + self.assertEqual(p.returncode, 0, msg="Cleaning update directory failed") @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) def test_validate_usbvm_archive(self): @@ -952,46 +830,33 @@ class TestQubesFwupdmgr(unittest.TestCase): name = url.replace("https://fwupd.org/downloads/", "") self.q._clean_usbvm() self.q._validate_usbvm_dirs() - self.q._download_firmware_updates( - url, - sha - ) - self.q._copy_firmware_updates( - name - ) - self.q._validate_usbvm_archive( - name, - sha - ) + self.q._download_firmware_updates(url, sha) + self.q._copy_firmware_updates(name) + self.q._validate_usbvm_archive(name, sha) cmd_validate_udpdate = [ "qvm-run", "--pass-io", "sys-usb", - "[ -f %s ]" % - os.path.join(FWUPD_VM_UPDATES_DIR, name) + "[ -f %s ]" % os.path.join(FWUPD_VM_UPDATES_DIR, name), ] p = subprocess.Popen(cmd_validate_udpdate) p.wait() - self.assertEqual( - p.returncode, - 0, - msg="Archive validation failed" - ) + self.assertEqual(p.returncode, 0, msg="Archive validation failed") - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_check_usbvm(self): self.q.check_usbvm() self.assertIn(XL_LIST_LOG, self.q.output) - @unittest.skipUnless('qubes' in platform.release(), "Requires Qubes OS") + @unittest.skipUnless("qubes" in platform.release(), "Requires Qubes OS") def test_bios_refresh_metadata(self): sys_usb = self.q.check_usbvm() Path(BIOS_UPDATE_FLAG).touch(mode=0o644, exist_ok=True) self.q.refresh_metadata_after_bios_update(usbvm=sys_usb) self.assertEqual( self.q.output, - 'Successfully refreshed metadata manually\n', - msg="Metadata refresh failed." + "Successfully refreshed metadata manually\n", + msg="Metadata refresh failed.", ) @unittest.skipUnless(check_usbvm(), REQUIRED_USBVM) @@ -1006,5 +871,5 @@ class TestQubesFwupdmgr(unittest.TestCase): self.assertFalse(os.path.exists(trusted_path.replace(".cab", ""))) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/contrib/standalone-installer/assets/header.py b/contrib/standalone-installer/assets/header.py index 558cf333e..1a47d61f7 100644 --- a/contrib/standalone-installer/assets/header.py +++ b/contrib/standalone-installer/assets/header.py @@ -13,7 +13,7 @@ import shutil import tempfile import zipfile -TAG = b'#\x00' +TAG = b"#\x00" def parse_args(): @@ -23,19 +23,19 @@ def parse_args(): parser.add_argument("--directory", help="Directory to extract to") parser.add_argument( "--cleanup", - action='store_true', + action="store_true", help="Remove tools when done with installation", ) parser.add_argument( - "--verbose", action='store_true', help="Run the tool in verbose mode" + "--verbose", action="store_true", help="Run the tool in verbose mode" ) parser.add_argument( "--allow-reinstall", - action='store_true', + action="store_true", help="Allow re-installing existing firmware versions", ) parser.add_argument( - "--allow-older", action='store_true', help="Allow downgrading firmware versions" + "--allow-older", action="store_true", help="Allow downgrading firmware versions" ) parser.add_argument( "command", choices=["install", "extract"], help="Command to run" @@ -61,7 +61,7 @@ def bytes_slicer(length, source): def get_zip(): script = os.path.realpath(__file__) bytes_out = io.BytesIO() - with open(script, 'rb') as source: + with open(script, "rb") as source: for line in source: if not line.startswith(TAG): continue @@ -71,7 +71,7 @@ def get_zip(): def unzip(destination): zipf = get_zip() - source = zipfile.ZipFile(zipf, 'r') + source = zipfile.ZipFile(zipf, "r") for item in source.namelist(): # extract handles the sanitization source.extract(item, destination) @@ -83,7 +83,7 @@ def copy_cabs(source, target): cabs = [] for root, dirs, files in os.walk(source): for f in files: - if f.endswith('.cab'): + if f.endswith(".cab"): origf = os.path.join(root, f) shutil.copy(origf, target) cabs.append(os.path.join(target, f)) @@ -91,31 +91,31 @@ def copy_cabs(source, target): def install_snap(directory, verbose, allow_reinstall, allow_older, uninstall): - app = 'fwupd' - common = '/root/snap/%s/common' % app + app = "fwupd" + common = "/root/snap/%s/common" % app # check if snap is installed - with open(os.devnull, 'w') as devnull: - subprocess.run(['snap'], check=True, stdout=devnull, stderr=devnull) + with open(os.devnull, "w") as devnull: + subprocess.run(["snap"], check=True, stdout=devnull, stderr=devnull) # check existing installed - cmd = ['snap', 'list', app] - with open(os.devnull, 'w') as devnull: + cmd = ["snap", "list", app] + with open(os.devnull, "w") as devnull: if verbose: print(cmd) ret = subprocess.run(cmd, stdout=devnull, stderr=devnull) if ret.returncode == 0: - cmd = ['snap', 'remove', app] + cmd = ["snap", "remove", app] if verbose: print(cmd) subprocess.run(cmd, check=True) # install the snap - cmd = ['snap', 'ack', os.path.join(directory, 'fwupd.assert')] + cmd = ["snap", "ack", os.path.join(directory, "fwupd.assert")] if verbose: print(cmd) subprocess.run(cmd, check=True) - cmd = ['snap', 'install', '--classic', os.path.join(directory, 'fwupd.snap')] + cmd = ["snap", "install", "--classic", os.path.join(directory, "fwupd.snap")] if verbose: print(cmd) subprocess.run(cmd, check=True) @@ -125,7 +125,7 @@ def install_snap(directory, verbose, allow_reinstall, allow_older, uninstall): # run the snap for cab in cabs: - cmd = ["%s.fwupdmgr" % app, 'install', cab] + cmd = ["%s.fwupdmgr" % app, "install", cab] if allow_reinstall: cmd += ["--allow-reinstall"] if allow_older: @@ -141,61 +141,61 @@ def install_snap(directory, verbose, allow_reinstall, allow_older, uninstall): # cleanup if uninstall: - cmd = ['snap', 'remove', app] + cmd = ["snap", "remove", app] if verbose: print(cmd) subprocess.run(cmd) def install_flatpak(directory, verbose, allow_reinstall, allow_older, uninstall): - app = 'org.freedesktop.fwupd' - common = '%s/.var/app/%s' % (os.getenv('HOME'), app) + app = "org.freedesktop.fwupd" + common = "%s/.var/app/%s" % (os.getenv("HOME"), app) - with open(os.devnull, 'w') as devnull: + with open(os.devnull, "w") as devnull: if not verbose: output = devnull else: output = None # look for dependencies - dep = 'org.gnome.Platform/x86_64/3.30' - repo = 'flathub' - repo_url = 'https://flathub.org/repo/flathub.flatpakrepo' - cmd = ['flatpak', 'info', dep] + dep = "org.gnome.Platform/x86_64/3.30" + repo = "flathub" + repo_url = "https://flathub.org/repo/flathub.flatpakrepo" + cmd = ["flatpak", "info", dep] if verbose: print(cmd) ret = subprocess.run(cmd, stdout=output, stderr=output) # not installed if ret.returncode != 0: # look for remotes - cmd = ['flatpak', 'remote-info', repo, dep] + cmd = ["flatpak", "remote-info", repo, dep] if verbose: print(cmd) ret = subprocess.run(cmd, stdout=output, stderr=output) # not enabled, enable it if ret.returncode != 0: - cmd = ['flatpak', 'remote-add', repo, repo_url] + cmd = ["flatpak", "remote-add", repo, repo_url] if verbose: print(cmd) ret = subprocess.run(cmd, stderr=output) # install dep - cmd = ['flatpak', 'install', repo, dep] + cmd = ["flatpak", "install", repo, dep] if verbose: print(cmd) ret = subprocess.run(cmd) # check existing installed - cmd = ['flatpak', 'info', app] + cmd = ["flatpak", "info", app] if verbose: print(cmd) ret = subprocess.run(cmd, stdout=output, stderr=output) if ret.returncode == 0: - cmd = ['flatpak', 'remove', app] + cmd = ["flatpak", "remove", app] if verbose: print(cmd) subprocess.run(cmd, check=True) # install the flatpak - cmd = ['flatpak', 'install', os.path.join(directory, 'fwupd.flatpak')] + cmd = ["flatpak", "install", os.path.join(directory, "fwupd.flatpak")] if verbose: print(cmd) subprocess.run(cmd, check=True) @@ -205,7 +205,7 @@ def install_flatpak(directory, verbose, allow_reinstall, allow_older, uninstall) # run command for cab in cabs: - cmd = ['flatpak', 'run', app, 'install', cab] + cmd = ["flatpak", "run", app, "install", cab] if allow_reinstall: cmd += ["--allow-reinstall"] if allow_older: @@ -221,7 +221,7 @@ def install_flatpak(directory, verbose, allow_reinstall, allow_older, uninstall) # cleanup if uninstall: - cmd = ['flatpak', 'remove', app] + cmd = ["flatpak", "remove", app] if verbose: print(cmd) subprocess.run(cmd) @@ -261,10 +261,10 @@ def remove_packaged_version(pkg, cache): res = False while True: res = input("Remove now (Y/N)? ") - if res.lower() == 'n': + if res.lower() == "n": res = False break - if res.lower() == 'y': + if res.lower() == "y": res = True break if res: @@ -279,11 +279,11 @@ def install_builtin(directory, verbose, allow_reinstall, allow_older): cabs = [] for root, dirs, files in os.walk(directory): for f in files: - if f.endswith('.cab'): + if f.endswith(".cab"): cabs.append(os.path.join(root, f)) # run command for cab in cabs: - cmd = ['fwupdmgr', 'install', cab] + cmd = ["fwupdmgr", "install", cab] if allow_reinstall: cmd += ["--allow-reinstall"] if allow_older: @@ -310,11 +310,11 @@ def run_installation(directory, verbose, allow_reinstall, allow_older, uninstall return # determine what self extracting binary has - if os.path.exists(os.path.join(directory, 'fwupd.snap')) and os.path.exists( - os.path.join(directory, 'fwupd.assert') + if os.path.exists(os.path.join(directory, "fwupd.snap")) and os.path.exists( + os.path.join(directory, "fwupd.assert") ): try_snap = True - if os.path.exists(os.path.join(directory, 'fwupd.flatpak')): + if os.path.exists(os.path.join(directory, "fwupd.flatpak")): try_flatpak = True if try_snap: @@ -330,9 +330,9 @@ def run_installation(directory, verbose, allow_reinstall, allow_older, uninstall install_flatpak(directory, verbose, allow_reinstall, allow_older, uninstall) -if __name__ == '__main__': +if __name__ == "__main__": args = parse_args() - if 'extract' in args.command: + if "extract" in args.command: if args.allow_reinstall: error( "allow-reinstall argument doesn't make sense with command %s" @@ -358,7 +358,7 @@ if __name__ == '__main__': ) if os.getuid() != 0: error("This tool must be run as root") - with tempfile.TemporaryDirectory(prefix='fwupd') as target: + with tempfile.TemporaryDirectory(prefix="fwupd") as target: unzip(target) run_installation( target, diff --git a/contrib/standalone-installer/make.py b/contrib/standalone-installer/make.py index 4b1c0bfc1..93a01c980 100755 --- a/contrib/standalone-installer/make.py +++ b/contrib/standalone-installer/make.py @@ -28,12 +28,12 @@ def parse_args(): ) parser.add_argument( "--disable-snap-download", - action='store_true', + action="store_true", help="Don't download support for snap", ) parser.add_argument( "--disable-flatpak-download", - action='store_true', + action="store_true", help="Don't download support for flatpak", ) parser.add_argument( @@ -46,7 +46,7 @@ def parse_args(): "cab", help="CAB file or directory containing CAB files to automatically install", ) - parser.add_argument('target', help='target file to create') + parser.add_argument("target", help="target file to create") args = parser.parse_args() return args @@ -74,80 +74,80 @@ def generate_installer(directory, target): source = os.path.join(root, f) archive_fname = source.split(directory)[1] archive.write(source, archive_fname) - if 'DEBUG' in os.environ: + if "DEBUG" in os.environ: print(archive.namelist()) archive.close() - with open(target, 'ab') as bytes_out: + with open(target, "ab") as bytes_out: encoded = b64encode(buffer.getvalue()) for section in bytes_slicer(64, encoded): bytes_out.write(TAG) bytes_out.write(section) - bytes_out.write(b'\n') + bytes_out.write(b"\n") def download_snap(directory, channel): - cmd = ['snap', 'download', 'fwupd'] + cmd = ["snap", "download", "fwupd"] if channel is not None: - cmd += ['--channel', channel] - if 'DEBUG' in os.environ: + cmd += ["--channel", channel] + if "DEBUG" in os.environ: print(cmd) subprocess.run(cmd, cwd=directory, check=True) for f in os.listdir(directory): # the signatures associated with the snap if f.endswith(".assert"): shutil.move( - os.path.join(directory, f), os.path.join(directory, 'fwupd.assert') + os.path.join(directory, f), os.path.join(directory, "fwupd.assert") ) # the snap binary itself elif f.endswith(".snap"): shutil.move( - os.path.join(directory, f), os.path.join(directory, 'fwupd.snap') + os.path.join(directory, f), os.path.join(directory, "fwupd.snap") ) def download_cab_file(directory, uri): - cmd = ['wget', uri] - if 'DEBUG' in os.environ: + cmd = ["wget", uri] + if "DEBUG" in os.environ: print(cmd) subprocess.run(cmd, cwd=directory, check=True) def download_flatpak(directory): - dep = 'org.freedesktop.fwupd' - flatpak_dir = os.path.join(os.getenv('HOME'), '.local', 'share', 'flatpak') - verbose = 'DEBUG' in os.environ + dep = "org.freedesktop.fwupd" + flatpak_dir = os.path.join(os.getenv("HOME"), ".local", "share", "flatpak") + verbose = "DEBUG" in os.environ # check if we have installed locally already or not - if not os.path.exists(os.path.join(flatpak_dir, 'app', dep)): + if not os.path.exists(os.path.join(flatpak_dir, "app", dep)): # install into local user's repo cmd = [ - 'flatpak', - 'install', - '--user', - 'https://www.flathub.org/repo/appstream/org.freedesktop.fwupd.flatpakref', - '--no-deps', - '-y', + "flatpak", + "install", + "--user", + "https://www.flathub.org/repo/appstream/org.freedesktop.fwupd.flatpakref", + "--no-deps", + "-y", ] if verbose: print(cmd) subprocess.run(cmd, cwd=directory, check=True) # generate a bundle - repo = os.path.join(flatpak_dir, 'repo') - cmd = ['flatpak', 'build-bundle', repo, 'fwupd.flatpak', dep, 'stable'] + repo = os.path.join(flatpak_dir, "repo") + cmd = ["flatpak", "build-bundle", repo, "fwupd.flatpak", dep, "stable"] if verbose: print(cmd) subprocess.run(cmd, cwd=directory, check=True) -if __name__ == '__main__': +if __name__ == "__main__": args = parse_args() if not args.cab.startswith("http"): local = args.cab - with tempfile.TemporaryDirectory(prefix='fwupd') as directory: + with tempfile.TemporaryDirectory(prefix="fwupd") as directory: if local: if not os.path.exists(local): error("%s doesn't exist" % local) diff --git a/data/device-tests/hardware.py b/data/device-tests/hardware.py index 7f1e1907a..bf8553fd8 100755 --- a/data/device-tests/hardware.py +++ b/data/device-tests/hardware.py @@ -14,7 +14,7 @@ import glob import json from termcolor import colored -gi.require_version('Fwupd', '2.0') +gi.require_version("Fwupd", "2.0") from gi.repository import Fwupd from gi.repository import Gio @@ -22,16 +22,16 @@ from gi.repository import GLib def _get_cache_file(fn): - cachedir = os.path.expanduser('~/.cache/fwupdmgr') + cachedir = os.path.expanduser("~/.cache/fwupdmgr") if not os.path.exists(cachedir): os.makedirs(cachedir) cachefn = os.path.join(cachedir, fn) if not os.path.exists(cachefn): - url = 'https://fwupd.org/downloads/' + fn + url = "https://fwupd.org/downloads/" + fn print("Downloading", url) r = requests.get(url) r.raise_for_status() - f = open(cachefn, 'wb') + f = open(cachefn, "wb") f.write(r.content) f.close() return cachefn @@ -40,25 +40,25 @@ def _get_cache_file(fn): class DeviceTest: def __init__(self, obj): self.client = Fwupd.Client.new() - self.name = obj.get('name', 'Unknown') - self.guids = obj.get('guids', []) - self.releases = obj.get('releases', []) - self.has_runtime = obj.get('runtime', True) - self.interactive = obj.get('interactive', False) - self.disabled = obj.get('disabled', False) - self.protocol = obj.get('protocol', None) + self.name = obj.get("name", "Unknown") + self.guids = obj.get("guids", []) + self.releases = obj.get("releases", []) + self.has_runtime = obj.get("runtime", True) + self.interactive = obj.get("interactive", False) + self.disabled = obj.get("disabled", False) + self.protocol = obj.get("protocol", None) def _info(self, msg): - print(colored('[INFO]'.ljust(10), 'blue'), msg) + print(colored("[INFO]".ljust(10), "blue"), msg) def _warn(self, msg): - print(colored('[WARN]'.ljust(10), 'yellow'), msg) + print(colored("[WARN]".ljust(10), "yellow"), msg) def _failed(self, msg): - print(colored('[FAILED]'.ljust(10), 'red'), msg) + print(colored("[FAILED]".ljust(10), "red"), msg) def _success(self, msg): - print(colored('[SUCCESS]'.ljust(10), 'green'), msg) + print(colored("[SUCCESS]".ljust(10), "green"), msg) def _get_by_device_guids(self): cancellable = Gio.Cancellable.new() @@ -72,82 +72,82 @@ class DeviceTest: def run(self): - print('Running test on {}'.format(self.name)) + print("Running test on {}".format(self.name)) dev = self._get_by_device_guids() if not dev: - self._warn('no {} attached'.format(self.name)) + self._warn("no {} attached".format(self.name)) return - self._info('Current version {}'.format(dev.get_version())) + self._info("Current version {}".format(dev.get_version())) # apply each file for obj in self.releases: - ver = obj.get('version') - fn = obj.get('file') - repeat = obj.get('repeat', 1) + ver = obj.get("version") + fn = obj.get("file") + repeat = obj.get("repeat", 1) try: fn_cache = _get_cache_file(fn) except requests.exceptions.HTTPError as e: - self._failed('Failed to download: {}'.format(str(e))) + self._failed("Failed to download: {}".format(str(e))) return # some hardware updates more than one partition with the same firmware for cnt in range(0, repeat): if dev.get_version() == ver: flags = Fwupd.InstallFlags.ALLOW_REINSTALL - self._info('Reinstalling version {}'.format(ver)) + self._info("Reinstalling version {}".format(ver)) else: flags = Fwupd.InstallFlags.ALLOW_OLDER - self._info('Installing version {}'.format(ver)) + self._info("Installing version {}".format(ver)) cancellable = Gio.Cancellable.new() try: self.client.install(dev.get_id(), fn_cache, flags, cancellable) except GLib.Error as e: - if str(e).find('no HWIDs matched') != -1: - self._info('Skipping as {}'.format(e)) + if str(e).find("no HWIDs matched") != -1: + self._info("Skipping as {}".format(e)) continue - self._failed('Could not install: {}'.format(e)) + self._failed("Could not install: {}".format(e)) return # verify version if self.has_runtime: dev = self._get_by_device_guids() if not dev: - self._failed('Device did not come back: ' + self.name) + self._failed("Device did not come back: " + self.name) return if not dev.get_version(): - self._failed('No version set after flash for: ' + self.name) + self._failed("No version set after flash for: " + self.name) return if cnt == repeat - 1 and dev.get_version() != ver: - self._failed('Got: ' + dev.get_version() + ', expected: ' + ver) + self._failed("Got: " + dev.get_version() + ", expected: " + ver) return - self._success('Installed {}'.format(dev.get_version())) + self._success("Installed {}".format(dev.get_version())) else: - self._success('Assumed success (no runtime)') + self._success("Assumed success (no runtime)") # wait for device to settle? time.sleep(2) -if __name__ == '__main__': +if __name__ == "__main__": # get manifests to parse device_fns = [] if len(sys.argv) == 1: - device_fns.extend(glob.glob('devices/*.json')) + device_fns.extend(glob.glob("devices/*.json")) else: for fn in sys.argv[1:]: device_fns.append(fn) # run each test for fn in sorted(device_fns): - print('{}:'.format(fn)) - with open(fn, 'r') as f: + print("{}:".format(fn)) + with open(fn, "r") as f: try: obj = json.load(f) except json.decoder.JSONDecodeError as e: - print('Failed to parse {}: {}'.format(fn, e)) + print("Failed to parse {}: {}".format(fn, e)) continue t = DeviceTest(obj) if t.disabled: diff --git a/libfwupdplugin/fu-hash.py b/libfwupdplugin/fu-hash.py index 1010937fd..3de3fadf2 100644 --- a/libfwupdplugin/fu-hash.py +++ b/libfwupdplugin/fu-hash.py @@ -21,15 +21,15 @@ def usage(return_code): sys.exit(return_code) -if __name__ == '__main__': - if {'-?', '--help', '--usage'}.intersection(set(sys.argv)): +if __name__ == "__main__": + if {"-?", "--help", "--usage"}.intersection(set(sys.argv)): usage(0) if len(sys.argv) < 3: usage(1) m = hashlib.sha256() for argv in sys.argv[2:]: - with open(argv, 'rb') as f: + with open(argv, "rb") as f: m.update(f.read()) - with open(sys.argv[1], 'w') as f2: - f2.write('#pragma once\n') + with open(sys.argv[1], "w") as f2: + f2.write("#pragma once\n") f2.write('#define FU_BUILD_HASH "%s"\n' % m.hexdigest()) diff --git a/plugins/dfu/contrib/parse-avrdude-conf.py b/plugins/dfu/contrib/parse-avrdude-conf.py index 06117a3a9..76f112306 100755 --- a/plugins/dfu/contrib/parse-avrdude-conf.py +++ b/plugins/dfu/contrib/parse-avrdude-conf.py @@ -13,26 +13,26 @@ from difflib import SequenceMatcher # finds a part using the ID def _find_part_by_id(parts, part_id): for part in parts: - if 'id' not in part: + if "id" not in part: continue - if part['id'] == part_id: + if part["id"] == part_id: return part return None # finds a memory layout for a part, climbing up the tree to the parent if reqd. def _find_mem_layout(parts, part): - if 'memory-application' in part: - memory_flash = part['memory-application'] + if "memory-application" in part: + memory_flash = part["memory-application"] if memory_flash: return memory_flash # look at the parent - if 'parent' in part: - parent = _find_part_by_id(parts, part['parent']) + if "parent" in part: + parent = _find_part_by_id(parts, part["parent"]) if parent: return _find_mem_layout(parts, parent) - print('no parent ', part['parent'], 'found for', part['id']) + print("no parent ", part["parent"], "found for", part["id"]) return None @@ -47,8 +47,8 @@ def _parse_parts(fn_source): for line in open(fn_source).readlines(): # try to clean up crazy syntax - line = line.replace('\n', '') - if line.endswith(';'): + line = line.replace("\n", "") + if line.endswith(";"): line = line[:-1] # ignore blank lines @@ -59,42 +59,42 @@ def _parse_parts(fn_source): # count how many spaces deep this is lvl = 0 for char in line: - if char != ' ': + if char != " ": break lvl = lvl + 1 # ignore comments line = line.strip() - if line[0] == '#': + if line[0] == "#": continue # level 0 of hell if lvl == 0: - if line.startswith('part'): + if line.startswith("part"): memory_id = None part = {} parts.append(part) - if line.startswith('part parent '): - part['parent'] = line[13:].replace('"', '') + if line.startswith("part parent "): + part["parent"] = line[13:].replace('"', "") continue # level 4 of hell if lvl == 4: - if line.startswith('memory'): - memory_id = 'memory-' + line[7:].replace('"', '') + if line.startswith("memory"): + memory_id = "memory-" + line[7:].replace('"', "") part[memory_id] = {} continue - split = line.split('=') + split = line.split("=") if len(split) != 2: - print('ignoring', line) + print("ignoring", line) continue - part[split[0].strip()] = split[1].strip().replace('"', '') + part[split[0].strip()] = split[1].strip().replace('"', "") continue # level 8 of hell if lvl == 8: if memory_id: - split = line.split('=') + split = line.split("=") if len(split) != 2: continue memory = part[memory_id] @@ -117,52 +117,52 @@ def _write_quirks(parts, fn_destination): for part in parts: # ignore meta parts with deprecated names - if 'desc' not in part: + if "desc" not in part: continue - if 'signature' not in part: + if "signature" not in part: continue # find the layout mem_part = _find_mem_layout(parts, part) if not mem_part: - print("no memory layout for", part['desc']) + print("no memory layout for", part["desc"]) continue - if not 'size' in mem_part: - print("no memory size for", part['desc']) + if not "size" in mem_part: + print("no memory size for", part["desc"]) continue - if mem_part['size'].startswith('0x'): - size = int(mem_part['size'], 16) + if mem_part["size"].startswith("0x"): + size = int(mem_part["size"], 16) else: - size = int(mem_part['size'], 10) + size = int(mem_part["size"], 10) # output the line for the quirk - chip_id = '0x' + part['signature'].replace('0x', '').replace(' ', '') - mem_layout = '@Flash/0x0/1*%.0iKg' % int(size / 1024) + chip_id = "0x" + part["signature"].replace("0x", "").replace(" ", "") + mem_layout = "@Flash/0x0/1*%.0iKg" % int(size / 1024) # merge duplicate quirks if chip_id in results: result = results[chip_id] - result['desc'] = _get_longest_substring(result['desc'], part['desc']) + result["desc"] = _get_longest_substring(result["desc"], part["desc"]) else: result = {} - result['desc'] = part['desc'] - result['size'] = size - result['mem_layout'] = mem_layout + result["desc"] = part["desc"] + result["size"] = size + result["mem_layout"] = mem_layout results[chip_id] = result for chip_id in results: result = results[chip_id] outp.append( - '# ' + result['desc'] + ' [USER] USER=0x%x' % result['size'] + '\n' + "# " + result["desc"] + " [USER] USER=0x%x" % result["size"] + "\n" ) - outp.append(chip_id + '=' + result['mem_layout'] + '\n\n') + outp.append(chip_id + "=" + result["mem_layout"] + "\n\n") # write file print("writing", fn_destination) - open(fn_destination, 'w').writelines(outp) + open(fn_destination, "w").writelines(outp) -if __name__ == '__main__': +if __name__ == "__main__": if len(sys.argv) != 3: print("USAGE: %s avrdude.conf tmp.quirk" % sys.argv[0]) sys.exit(1) diff --git a/plugins/uefi-capsule/efi/generate_binary.py b/plugins/uefi-capsule/efi/generate_binary.py index 449d8abb3..0193e50c0 100755 --- a/plugins/uefi-capsule/efi/generate_binary.py +++ b/plugins/uefi-capsule/efi/generate_binary.py @@ -50,15 +50,7 @@ def _run_objcopy(args): def _run_genpeimg(args): # this is okay if it does not exist - argv = [ - "genpeimg", - "-d", - "+d", - "+n", - "-d", - "+s", - args.outfile, - ] + argv = ["genpeimg", "-d", "+d", "+n", "-d", "+s", args.outfile] try: subprocess.run(argv, check=True) except FileNotFoundError as _: @@ -69,23 +61,11 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( - "--objcopy", - default="objcopy", - help="Binary file to use for objcopy", - ) - parser.add_argument( - "--arch", - default="x86_64", - help="EFI architecture", - ) - parser.add_argument( - "infile", - help="Input file", - ) - parser.add_argument( - "outfile", - help="Output file", + "--objcopy", default="objcopy", help="Binary file to use for objcopy" ) + parser.add_argument("--arch", default="x86_64", help="EFI architecture") + parser.add_argument("infile", help="Input file") + parser.add_argument("outfile", help="Output file") _args = parser.parse_args() _run_objcopy(_args) _run_genpeimg(_args) diff --git a/plugins/uefi-capsule/efi/generate_sbat.py b/plugins/uefi-capsule/efi/generate_sbat.py index 1f12ca112..e42b365b6 100755 --- a/plugins/uefi-capsule/efi/generate_sbat.py +++ b/plugins/uefi-capsule/efi/generate_sbat.py @@ -81,40 +81,18 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( - "--cc", - default="gcc", - help="Compiler to use for generating sbat object", + "--cc", default="gcc", help="Compiler to use for generating sbat object" ) parser.add_argument( - "--objcopy", - default="objcopy", - help="Binary file to use for objcopy", + "--objcopy", default="objcopy", help="Binary file to use for objcopy" ) + parser.add_argument("--project-name", help="SBAT project name") + parser.add_argument("--project-version", help="SBAT project version") + parser.add_argument("--sbat-version", default=1, type=int, help="SBAT version") parser.add_argument( - "--project-name", - help="SBAT project name", - ) - parser.add_argument( - "--project-version", - help="SBAT project version", - ) - parser.add_argument( - "--sbat-version", - default=1, - type=int, - help="SBAT version", - ) - parser.add_argument( - "--sbat-generation", - default=1, - type=int, - help="SBAT generation", - ) - parser.add_argument( - "--sbat-distro-id", - default=None, - help="SBAT distribution ID" + "--sbat-generation", default=1, type=int, help="SBAT generation" ) + parser.add_argument("--sbat-distro-id", default=None, help="SBAT distribution ID") parser.add_argument( "--sbat-distro-generation", default=None, @@ -122,29 +100,16 @@ if __name__ == "__main__": help="SBAT distribution generation", ) parser.add_argument( - "--sbat-distro-summary", - default=None, - help="SBAT distribution summary", + "--sbat-distro-summary", default=None, help="SBAT distribution summary" ) parser.add_argument( - "--sbat-distro-pkgname", - default=None, - help="SBAT distribution package name", + "--sbat-distro-pkgname", default=None, help="SBAT distribution package name" ) parser.add_argument( - "--sbat-distro-version", - default=None, - help="SBAT distribution version", - ) - parser.add_argument( - "--sbat-distro-url", - default=None, - help="SBAT distribution URL", - ) - parser.add_argument( - "outfile", - help="Output file", + "--sbat-distro-version", default=None, help="SBAT distribution version" ) + parser.add_argument("--sbat-distro-url", default=None, help="SBAT distribution URL") + parser.add_argument("outfile", help="Output file") _args = parser.parse_args() _generate_sbat(_args)