Use black to format python source in a consistent manner

No code changes.
This commit is contained in:
Richard Hughes 2020-04-09 09:19:41 +01:00
parent 39dc902a42
commit fe11927eef
17 changed files with 534 additions and 281 deletions

View File

@ -6,6 +6,7 @@ import sys
import subprocess import subprocess
import os import os
def main(): def main():
parser = argparse.ArgumentParser(description='Run afl-fuzz on all cores') parser = argparse.ArgumentParser(description='Run afl-fuzz on all cores')
parser.add_argument('--input', '-i', help='fuzzing input directory') parser.add_argument('--input', '-i', help='fuzzing input directory')
@ -26,8 +27,17 @@ def main():
# run the main instance # run the main instance
envp = None envp = None
argv = ['afl-fuzz', '-m300', '-i', args.input, '-o', args.output, argv = [
'-M', 'fuzzer00', args.path] 'afl-fuzz',
'-m300',
'-i',
args.input,
'-o',
args.output,
'-M',
'fuzzer00',
args.path,
]
if args.command: if args.command:
argv.append(args.command) argv.append(args.command)
argv.append('@@') argv.append('@@')
@ -37,8 +47,17 @@ def main():
# run the secondary instances # run the secondary instances
cs = [] cs = []
for i in range(1, os.cpu_count()): for i in range(1, os.cpu_count()):
argv = ['afl-fuzz', '-m300', '-i', args.input, '-o', args.output, argv = [
'-S', 'fuzzer%02i' % i, args.path] 'afl-fuzz',
'-m300',
'-i',
args.input,
'-o',
args.output,
'-S',
'fuzzer%02i' % i,
args.path,
]
if args.command: if args.command:
argv.append(args.command) argv.append(args.command)
argv.append('@@') argv.append('@@')
@ -54,5 +73,6 @@ def main():
c.terminate() c.terminate()
return 0 return 0
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())

View File

@ -4,68 +4,77 @@ import os
import json import json
import shutil import shutil
def prepare (target):
#clone the flatpak json def prepare(target):
# clone the flatpak json
cmd = ['git', 'submodule', 'update', '--remote', 'contrib/flatpak'] cmd = ['git', 'submodule', 'update', '--remote', 'contrib/flatpak']
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
#clone the submodules for that # clone the submodules for that
cmd = ['git', 'submodule', 'update', '--init', '--remote', 'shared-modules/'] cmd = ['git', 'submodule', 'update', '--init', '--remote', 'shared-modules/']
subprocess.run (cmd, cwd='contrib/flatpak', check=True) subprocess.run(cmd, cwd='contrib/flatpak', check=True)
#parse json # parse json
if os.path.isdir ('build'): if os.path.isdir('build'):
shutil.rmtree ('build') shutil.rmtree('build')
data = {} data = {}
with open ('contrib/flatpak/org.freedesktop.fwupd.json', 'r') as rfd: with open('contrib/flatpak/org.freedesktop.fwupd.json', 'r') as rfd:
data = json.load (rfd, strict=False) data = json.load(rfd, strict=False)
platform = 'runtime/%s/x86_64/%s' % (data['runtime'], data['runtime-version']) platform = 'runtime/%s/x86_64/%s' % (data['runtime'], data['runtime-version'])
sdk = 'runtime/%s/x86_64/%s' % (data['sdk'], data['runtime-version']) sdk = 'runtime/%s/x86_64/%s' % (data['sdk'], data['runtime-version'])
num_modules = len (data['modules']) num_modules = len(data['modules'])
#update to build from master # update to build from master
data["branch"] = "master" data["branch"] = "master"
for index in range(0, num_modules): for index in range(0, num_modules):
module = data['modules'][index] module = data['modules'][index]
if type (module) != dict or not 'name' in module: if type(module) != dict or not 'name' in module:
continue continue
name = module['name'] name = module['name']
if not 'fwupd' in name: if not 'fwupd' in name:
continue continue
data['modules'][index]['sources'][0].pop ('url') data['modules'][index]['sources'][0].pop('url')
data['modules'][index]['sources'][0].pop ('sha256') data['modules'][index]['sources'][0].pop('sha256')
data['modules'][index]['sources'][0]['type'] = 'dir' data['modules'][index]['sources'][0]['type'] = 'dir'
data['modules'][index]['sources'][0]['skip'] = [".git"] data['modules'][index]['sources'][0]['skip'] = [".git"]
data['modules'][index]['sources'][0]['path'] = ".." data['modules'][index]['sources'][0]['path'] = ".."
#write json # write json
os.mkdir('build') os.mkdir('build')
with open (target, 'w') as wfd: with open(target, 'w') as wfd:
json.dump(data, wfd, indent=4) json.dump(data, wfd, indent=4)
os.symlink ('../contrib/flatpak/shared-modules','build/shared-modules') os.symlink('../contrib/flatpak/shared-modules', 'build/shared-modules')
# install the runtimes (parsed from json!) # install the runtimes (parsed from json!)
repo = 'flathub' repo = 'flathub'
repo_url = 'https://dl.flathub.org/repo/flathub.flatpakrepo' repo_url = 'https://dl.flathub.org/repo/flathub.flatpakrepo'
print ("Installing dependencies") print("Installing dependencies")
cmd = ['flatpak', 'remote-add', '--if-not-exists', repo, repo_url] cmd = ['flatpak', 'remote-add', '--if-not-exists', repo, repo_url]
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
cmd = ['flatpak', 'install', '--assumeyes', repo, sdk] cmd = ['flatpak', 'install', '--assumeyes', repo, sdk]
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
cmd = ['flatpak', 'install', '--assumeyes', repo, platform] cmd = ['flatpak', 'install', '--assumeyes', repo, platform]
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
def build (target): def build(target):
cmd = ['flatpak-builder', '--repo=repo', '--force-clean', '--disable-rofiles-fuse', 'build-dir', target] cmd = [
subprocess.run (cmd, check=True) 'flatpak-builder',
'--repo=repo',
'--force-clean',
'--disable-rofiles-fuse',
'build-dir',
target,
]
subprocess.run(cmd, check=True)
cmd = ['flatpak', 'build-bundle', 'repo', 'fwupd.flatpak', 'org.freedesktop.fwupd'] cmd = ['flatpak', 'build-bundle', 'repo', 'fwupd.flatpak', 'org.freedesktop.fwupd']
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
if __name__ == '__main__': if __name__ == '__main__':
t = os.path.join ('build', 'org.freedesktop.fwupd.json') t = os.path.join('build', 'org.freedesktop.fwupd.json')
prepare (t) prepare(t)
build (t) build(t)
# to run from the builddir: # to run from the builddir:
# sudo flatpak-builder --run build-dir org.freedesktop.fwupd.json /app/libexec/fwupd/fwupdtool get-devices # sudo flatpak-builder --run build-dir org.freedesktop.fwupd.json /app/libexec/fwupd/fwupdtool get-devices

View File

@ -8,8 +8,9 @@ import os
import sys import sys
import xml.etree.ElementTree as etree import xml.etree.ElementTree as etree
def parse_control_dependencies(requested_type): def parse_control_dependencies(requested_type):
TARGET=os.getenv('OS') TARGET = os.getenv('OS')
deps = [] deps = []
dep = '' dep = ''
@ -25,11 +26,13 @@ def parse_control_dependencies(requested_type):
SUBOS = split[1] SUBOS = split[1]
else: else:
import lsb_release import lsb_release
OS = lsb_release.get_distro_information()['ID'].lower() OS = lsb_release.get_distro_information()['ID'].lower()
import platform import platform
SUBOS = platform.machine() SUBOS = platform.machine()
tree = etree.parse(os.path.join(os.path.dirname (sys.argv[0]), "dependencies.xml")) tree = etree.parse(os.path.join(os.path.dirname(sys.argv[0]), "dependencies.xml"))
root = tree.getroot() root = tree.getroot()
for child in root: for child in root:
if not "type" in child.attrib or not "id" in child.attrib: if not "type" in child.attrib or not "id" in child.attrib:
@ -80,6 +83,7 @@ def parse_control_dependencies(requested_type):
deps.append(dep) deps.append(dep)
return deps return deps
def update_debian_control(target): def update_debian_control(target):
control_in = os.path.join(target, 'control.in') control_in = os.path.join(target, 'control.in')
control_out = os.path.join(target, 'control') control_out = os.path.join(target, 'control')
@ -102,7 +106,8 @@ def update_debian_control(target):
else: else:
wfd.write(line) wfd.write(line)
def update_debian_copyright (directory):
def update_debian_copyright(directory):
copyright_in = os.path.join(directory, 'copyright.in') copyright_in = os.path.join(directory, 'copyright.in')
copyright_out = os.path.join(directory, 'copyright') copyright_out = os.path.join(directory, 'copyright')
@ -114,13 +119,13 @@ def update_debian_copyright (directory):
copyrights = [] copyrights = []
for root, dirs, files in os.walk('.'): for root, dirs, files in os.walk('.'):
for file in files: for file in files:
target = os.path.join (root, file) target = os.path.join(root, file)
#skip translations and license file # skip translations and license file
if target.startswith('./po/') or file == "COPYING": if target.startswith('./po/') or file == "COPYING":
continue continue
try: try:
with open(target, 'r') as rfd: with open(target, 'r') as rfd:
#read about the first few lines of the file only # read about the first few lines of the file only
lines = rfd.readlines(220) lines = rfd.readlines(220)
except UnicodeDecodeError: except UnicodeDecodeError:
continue continue
@ -128,8 +133,10 @@ def update_debian_copyright (directory):
continue continue
for line in lines: for line in lines:
if 'Copyright (C) ' in line: if 'Copyright (C) ' in line:
parts = line.split ('Copyright (C)')[1].strip() #split out the copyright header parts = line.split('Copyright (C)')[
partition = parts.partition(' ')[2] # remove the year string 1
].strip() # split out the copyright header
partition = parts.partition(' ')[2] # remove the year string
copyrights += ["%s" % partition] copyrights += ["%s" % partition]
copyrights = "\n\t ".join(sorted(set(copyrights))) copyrights = "\n\t ".join(sorted(set(copyrights)))
with open(copyright_in, 'r') as rfd: with open(copyright_in, 'r') as rfd:
@ -145,6 +152,7 @@ def update_debian_copyright (directory):
else: else:
wfd.write(line) wfd.write(line)
directory = os.path.join (os.getcwd(), 'debian')
directory = os.path.join(os.getcwd(), 'debian')
update_debian_control(directory) update_debian_control(directory)
update_debian_copyright(directory) update_debian_copyright(directory)

View File

@ -10,6 +10,7 @@ import sys
import argparse import argparse
import xml.etree.ElementTree as etree import xml.etree.ElementTree as etree
def parse_dependencies(OS, SUBOS, requested_type): def parse_dependencies(OS, SUBOS, requested_type):
deps = [] deps = []
dep = '' dep = ''
@ -39,24 +40,24 @@ def parse_dependencies(OS, SUBOS, requested_type):
deps.append(dep) deps.append(dep)
return deps return deps
if __name__ == '__main__': if __name__ == '__main__':
try: try:
import distro import distro
target = distro.linux_distribution()[0] target = distro.linux_distribution()[0]
except ModuleNotFoundError: except ModuleNotFoundError:
target = None target = None
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-o", "--os", parser.add_argument(
default=target, "-o",
choices=["fedora", "--os",
"centos", default=target,
"flatpak", choices=["fedora", "centos", "flatpak", "debian", "ubuntu", "arch"],
"debian", help="dependencies for OS",
"ubuntu", )
"arch"],
help="dependencies for OS")
args = parser.parse_args() args = parser.parse_args()
target = os.getenv('OS', args.os) target = os.getenv('OS', args.os)

View File

@ -11,6 +11,7 @@ import tempfile
import shutil import shutil
from generate_dependencies import parse_dependencies from generate_dependencies import parse_dependencies
def get_container_cmd(): def get_container_cmd():
'''return docker or podman as container manager''' '''return docker or podman as container manager'''
@ -19,8 +20,9 @@ def get_container_cmd():
if shutil.which('podman'): if shutil.which('podman'):
return 'podman' return 'podman'
directory = os.path.dirname(sys.argv[0]) directory = os.path.dirname(sys.argv[0])
TARGET=os.getenv('OS') TARGET = os.getenv('OS')
if TARGET is None: if TARGET is None:
print("Missing OS environment variable") print("Missing OS environment variable")
@ -58,19 +60,23 @@ with open(out.name, 'w') as wfd:
wfd.write("RUN yum -y install \\\n") wfd.write("RUN yum -y install \\\n")
elif OS == "debian" or OS == "ubuntu": elif OS == "debian" or OS == "ubuntu":
wfd.write("RUN apt update -qq && \\\n") wfd.write("RUN apt update -qq && \\\n")
wfd.write("\tDEBIAN_FRONTEND=noninteractive apt install -yq --no-install-recommends\\\n") wfd.write(
"\tDEBIAN_FRONTEND=noninteractive apt install -yq --no-install-recommends\\\n"
)
elif OS == "arch": elif OS == "arch":
wfd.write("RUN pacman -Syu --noconfirm --needed\\\n") wfd.write("RUN pacman -Syu --noconfirm --needed\\\n")
for i in range(0, len(deps)): for i in range(0, len(deps)):
if i < len(deps)-1: if i < len(deps) - 1:
wfd.write("\t%s \\\n" % deps[i]) wfd.write("\t%s \\\n" % deps[i])
else: else:
wfd.write("\t%s \n" % deps[i]) wfd.write("\t%s \n" % deps[i])
elif line == "%%%ARCH_SPECIFIC_COMMAND%%%\n": elif line == "%%%ARCH_SPECIFIC_COMMAND%%%\n":
if OS == "debian" and SUBOS == "s390x": if OS == "debian" and SUBOS == "s390x":
#add sources # add sources
wfd.write('RUN cat /etc/apt/sources.list | sed "s/deb/deb-src/" >> /etc/apt/sources.list\n') wfd.write(
#add new architecture 'RUN cat /etc/apt/sources.list | sed "s/deb/deb-src/" >> /etc/apt/sources.list\n'
)
# add new architecture
wfd.write('RUN dpkg --add-architecture %s\n' % SUBOS) wfd.write('RUN dpkg --add-architecture %s\n' % SUBOS)
elif line == "%%%OS%%%\n": elif line == "%%%OS%%%\n":
wfd.write("ENV OS %s\n" % TARGET) wfd.write("ENV OS %s\n" % TARGET)
@ -83,5 +89,5 @@ with open(out.name, 'w') as wfd:
args += ['--build-arg=http_proxy=%s' % os.environ['http_proxy']] args += ['--build-arg=http_proxy=%s' % os.environ['http_proxy']]
if 'https_proxy' in os.environ: if 'https_proxy' in os.environ:
args += ['--build-arg=https_proxy=%s' % os.environ['https_proxy']] args += ['--build-arg=https_proxy=%s' % os.environ['https_proxy']]
args += [ "-f", "./%s" % os.path.basename(out.name), "."] args += ["-f", "./%s" % os.path.basename(out.name), "."]
subprocess.check_call(args) subprocess.check_call(args)

View File

@ -13,6 +13,7 @@ CAPSULE_FLAGS_PERSIST_ACROSS_RESET = 0x00010000
CAPSULE_FLAGS_POPULATE_SYSTEM_TABLE = 0x00020000 CAPSULE_FLAGS_POPULATE_SYSTEM_TABLE = 0x00020000
CAPSULE_FLAGS_INITIATE_RESET = 0x00040000 CAPSULE_FLAGS_INITIATE_RESET = 0x00040000
def add_header(infile, outfile, gd, fl=None): def add_header(infile, outfile, gd, fl=None):
# parse GUID from command line # parse GUID from command line
try: try:
@ -21,6 +22,7 @@ def add_header(infile, outfile, gd, fl=None):
print(e) print(e)
return 1 return 1
import struct import struct
try: try:
with open(infile, 'rb') as f: with open(infile, 'rb') as f:
bin_data = f.read() bin_data = f.read()
@ -45,7 +47,11 @@ def add_header(infile, outfile, gd, fl=None):
bin_data = bin_data[hdrsz_old:] bin_data = bin_data[hdrsz_old:]
# set header flags # set header flags
flags = CAPSULE_FLAGS_PERSIST_ACROSS_RESET | CAPSULE_FLAGS_INITIATE_RESET | CAPSULE_FLAGS_POPULATE_SYSTEM_TABLE flags = (
CAPSULE_FLAGS_PERSIST_ACROSS_RESET
| CAPSULE_FLAGS_INITIATE_RESET
| CAPSULE_FLAGS_POPULATE_SYSTEM_TABLE
)
if fl: if fl:
flags = int(fl, 16) flags = int(fl, 16)
@ -64,6 +70,7 @@ def add_header(infile, outfile, gd, fl=None):
print('PayloadSz: 0x%04x' % imgsz) print('PayloadSz: 0x%04x' % imgsz)
return 0 return 0
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add capsule header on firmware') parser = argparse.ArgumentParser(description='Add capsule header on firmware')
parser.add_argument('--guid', help='GUID of the device', required=True) parser.add_argument('--guid', help='GUID of the device', required=True)

View File

@ -9,6 +9,7 @@ import struct
import zlib import zlib
import argparse import argparse
def main(bin_fn, dfu_fn, pad, vid, pid, rev): def main(bin_fn, dfu_fn, pad, vid, pid, rev):
# read binary file # read binary file
@ -21,27 +22,32 @@ def main(bin_fn, dfu_fn, pad, vid, pid, rev):
blob += b'\0' blob += b'\0'
# create DFU footer with checksum # create DFU footer with checksum
blob += struct.pack('<HHHH3sB', blob += struct.pack(
int(rev, 16), # version '<HHHH3sB',
int(pid, 16), # PID int(rev, 16), # version
int(vid, 16), # VID int(pid, 16), # PID
0x0100, # DFU version int(vid, 16), # VID
b'UFD', # signature 0x0100, # DFU version
0x10) # hdrlen b'UFD', # signature
crc32 = zlib.crc32(blob) ^ 0xffffffff 0x10,
) # hdrlen
crc32 = zlib.crc32(blob) ^ 0xFFFFFFFF
blob += struct.pack('<L', crc32) blob += struct.pack('<L', crc32)
# write binary file # write binary file
with open(dfu_fn, 'wb') as f: with open(dfu_fn, 'wb') as f:
f.write(blob) f.write(blob)
if __name__ == "__main__": if __name__ == "__main__":
# parse args # parse args
parser = argparse.ArgumentParser(description='Add DFU footer on firmware') parser = argparse.ArgumentParser(description='Add DFU footer on firmware')
parser.add_argument('--bin', help='Path to the .bin file', required=True) parser.add_argument('--bin', help='Path to the .bin file', required=True)
parser.add_argument('--dfu', help='Output DFU file path', required=True) parser.add_argument('--dfu', help='Output DFU file path', required=True)
parser.add_argument('--pad', help='Pad to a specific size, e.g. 0x4000', default=None) parser.add_argument(
'--pad', help='Pad to a specific size, e.g. 0x4000', default=None
)
parser.add_argument('--vid', help='Vendor ID, e.g. 0x273f', required=True) parser.add_argument('--vid', help='Vendor ID, e.g. 0x273f', required=True)
parser.add_argument('--pid', help='Product ID, e.g. 0x1002', required=True) parser.add_argument('--pid', help='Product ID, e.g. 0x1002', required=True)
parser.add_argument('--rev', help='Revision, e.g. 0x1000', required=True) parser.add_argument('--rev', help='Revision, e.g. 0x1000', required=True)

View File

@ -21,6 +21,7 @@ def cd(path):
yield yield
os.chdir(prev_cwd) os.chdir(prev_cwd)
firmware_metainfo_template = """ firmware_metainfo_template = """
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<component type="firmware"> <component type="firmware">
@ -52,7 +53,9 @@ firmware_metainfo_template = """
def make_firmware_metainfo(firmware_info, dst): def make_firmware_metainfo(firmware_info, dst):
local_info = vars(firmware_info) local_info = vars(firmware_info)
local_info["firmware_id"] = local_info["device_guid"][0:8] local_info["firmware_id"] = local_info["device_guid"][0:8]
firmware_metainfo = firmware_metainfo_template.format(**local_info, timestamp=time.time()) firmware_metainfo = firmware_metainfo_template.format(
**local_info, timestamp=time.time()
)
with open(os.path.join(dst, 'firmware.metainfo.xml'), 'w') as f: with open(os.path.join(dst, 'firmware.metainfo.xml'), 'w') as f:
f.write(firmware_metainfo) f.write(firmware_metainfo)
@ -71,16 +74,22 @@ def get_firmware_bin(root, bin_path, dst):
def create_firmware_cab(exe, folder): def create_firmware_cab(exe, folder):
with cd(folder): with cd(folder):
if os.name == "nt": if os.name == "nt":
directive = os.path.join (folder, "directive") directive = os.path.join(folder, "directive")
with open (directive, 'w') as wfd: with open(directive, 'w') as wfd:
wfd.write('.OPTION EXPLICIT\r\n') wfd.write('.OPTION EXPLICIT\r\n')
wfd.write('.Set CabinetNameTemplate=firmware.cab\r\n') wfd.write('.Set CabinetNameTemplate=firmware.cab\r\n')
wfd.write('.Set DiskDirectory1=.\r\n') wfd.write('.Set DiskDirectory1=.\r\n')
wfd.write('firmware.bin\r\n') wfd.write('firmware.bin\r\n')
wfd.write('firmware.metainfo.xml\r\n') wfd.write('firmware.metainfo.xml\r\n')
command = ['makecab.exe', '/f', directive] command = ['makecab.exe', '/f', directive]
else: else:
command = ['gcab', '--create', 'firmware.cab', 'firmware.bin', 'firmware.metainfo.xml'] command = [
'gcab',
'--create',
'firmware.cab',
'firmware.bin',
'firmware.metainfo.xml',
]
subprocess.check_call(command) subprocess.check_call(command)
@ -104,19 +113,50 @@ def main(args):
print('Done') print('Done')
shutil.copy(os.path.join(dir, 'firmware.cab'), args.out) shutil.copy(os.path.join(dir, 'firmware.cab'), args.out)
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create fwupd packaged from windows executables') parser = argparse.ArgumentParser(
parser.add_argument('--firmware-name', help='Name of the firmware package can be customized (e.g. DellTBT)', required=True) description='Create fwupd packaged from windows executables'
parser.add_argument('--firmware-summary', help='One line description of the firmware package') )
parser.add_argument('--firmware-description', help='Longer description of the firmware package') parser.add_argument(
parser.add_argument('--device-guid', help='GUID of the device this firmware will run on, this *must* match the output of one of the GUIDs in `fwupdmgr get-devices`', required=True) '--firmware-name',
help='Name of the firmware package can be customized (e.g. DellTBT)',
required=True,
)
parser.add_argument(
'--firmware-summary', help='One line description of the firmware package'
)
parser.add_argument(
'--firmware-description', help='Longer description of the firmware package'
)
parser.add_argument(
'--device-guid',
help='GUID of the device this firmware will run on, this *must* match the output of one of the GUIDs in `fwupdmgr get-devices`',
required=True,
)
parser.add_argument('--firmware-homepage', help='Website for the firmware provider') parser.add_argument('--firmware-homepage', help='Website for the firmware provider')
parser.add_argument('--contact-info', help='Email address of the firmware developer') parser.add_argument(
parser.add_argument('--developer-name', help='Name of the firmware developer', required=True) '--contact-info', help='Email address of the firmware developer'
parser.add_argument('--release-version', help='Version number of the firmware package', required=True) )
parser.add_argument('--release-description', help='Description of the firmware release') parser.add_argument(
parser.add_argument('--exe', help='(optional) Executable file to extract firmware from') '--developer-name', help='Name of the firmware developer', required=True
parser.add_argument('--bin', help='Path to the .bin file (Relative if inside the executable; Absolute if outside) to use as the firmware image', required=True) )
parser.add_argument(
'--release-version',
help='Version number of the firmware package',
required=True,
)
parser.add_argument(
'--release-description', help='Description of the firmware release'
)
parser.add_argument(
'--exe', help='(optional) Executable file to extract firmware from'
)
parser.add_argument(
'--bin',
help='Path to the .bin file (Relative if inside the executable; Absolute if outside) to use as the firmware image',
required=True,
)
parser.add_argument('--out', help='Output cab file path', required=True) parser.add_argument('--out', help='Output cab file path', required=True)
args = parser.parse_args() args = parser.parse_args()

View File

@ -9,12 +9,14 @@ import os.path
import sys import sys
import tempfile import tempfile
import gi import gi
gi.require_version('Fwupd', '2.0') gi.require_version('Fwupd', '2.0')
from gi.repository import Fwupd #pylint: disable=wrong-import-position from gi.repository import Fwupd # pylint: disable=wrong-import-position
from simple_client import install, check_exists from simple_client import install, check_exists
from add_capsule_header import add_header from add_capsule_header import add_header
from firmware_packager import make_firmware_metainfo, create_firmware_cab from firmware_packager import make_firmware_metainfo, create_firmware_cab
class Variables: class Variables:
def __init__(self, device_guid, version): def __init__(self, device_guid, version):
self.device_guid = device_guid self.device_guid = device_guid
@ -27,16 +29,18 @@ class Variables:
self.release_version = version self.release_version = version
self.release_description = "Unknown" self.release_description = "Unknown"
def parse_args(): def parse_args():
"""Parse arguments for this client""" """Parse arguments for this client"""
import argparse import argparse
parser = argparse.ArgumentParser(description="Interact with fwupd daemon") parser = argparse.ArgumentParser(description="Interact with fwupd daemon")
parser.add_argument('exe', nargs='?', help='exe file') parser.add_argument('exe', nargs='?', help='exe file')
parser.add_argument('deviceid', nargs='?', parser.add_argument('deviceid', nargs='?', help='DeviceID to operate on(optional)')
help='DeviceID to operate on(optional)')
args = parser.parse_args() args = parser.parse_args()
return args return args
def generate_cab(infile, directory, guid, version): def generate_cab(infile, directory, guid, version):
output = os.path.join(directory, "firmware.bin") output = os.path.join(directory, "firmware.bin")
ret = add_header(infile, output, guid) ret = add_header(infile, output, guid)
@ -49,10 +53,11 @@ def generate_cab(infile, directory, guid, version):
print("Generated CAB file %s" % cab) print("Generated CAB file %s" % cab)
return cab return cab
def find_uefi_device(client, deviceid): def find_uefi_device(client, deviceid):
devices = client.get_devices() devices = client.get_devices()
for item in devices: for item in devices:
#match the device we were given # match the device we were given
if deviceid: if deviceid:
if item.get_id() != deviceid: if item.get_id() != deviceid:
continue continue
@ -65,10 +70,11 @@ def find_uefi_device(client, deviceid):
# return the first hit for UEFI plugin # return the first hit for UEFI plugin
if item.get_plugin() == 'uefi': if item.get_plugin() == 'uefi':
print("Installing to %s" % item.get_name()) print("Installing to %s" % item.get_name())
return item.get_guid_default(),item.get_id(),item.get_version() return item.get_guid_default(), item.get_id(), item.get_version()
print("Couldn't find any UEFI devices") print("Couldn't find any UEFI devices")
sys.exit(1) sys.exit(1)
def prompt_reboot(): def prompt_reboot():
print("An update requires a reboot to complete") print("An update requires a reboot to complete")
while True: while True:
@ -78,18 +84,20 @@ def prompt_reboot():
break break
if res.lower() != 'y': if res.lower() != 'y':
continue continue
#reboot using logind # reboot using logind
obj = dbus.SystemBus().get_object('org.freedesktop.login1', obj = dbus.SystemBus().get_object(
'/org/freedesktop/login1') 'org.freedesktop.login1', '/org/freedesktop/login1'
)
obj.Reboot(True, dbus_interface='org.freedesktop.login1.Manager') obj.Reboot(True, dbus_interface='org.freedesktop.login1.Manager')
if __name__ == '__main__': if __name__ == '__main__':
ARGS = parse_args() ARGS = parse_args()
CLIENT = Fwupd.Client() CLIENT = Fwupd.Client()
CLIENT.connect() CLIENT.connect()
check_exists(ARGS.exe) check_exists(ARGS.exe)
directory = tempfile.mkdtemp() directory = tempfile.mkdtemp()
guid, deviceid, version=find_uefi_device(CLIENT, ARGS.deviceid) guid, deviceid, version = find_uefi_device(CLIENT, ARGS.deviceid)
cab = generate_cab(ARGS.exe, directory, guid, version) cab = generate_cab(ARGS.exe, directory, guid, version)
install(CLIENT, cab, deviceid, True, True) install(CLIENT, cab, deviceid, True, True)
prompt_reboot() prompt_reboot()

View File

@ -5,11 +5,14 @@ import sys
import os import os
import gi import gi
from gi.repository import GLib from gi.repository import GLib
gi.require_version('Fwupd', '2.0')
from gi.repository import Fwupd #pylint: disable=wrong-import-position
class Progress(): gi.require_version('Fwupd', '2.0')
from gi.repository import Fwupd # pylint: disable=wrong-import-position
class Progress:
"""Class to track the signal changes of progress events""" """Class to track the signal changes of progress events"""
def __init__(self): def __init__(self):
self.device = None self.device = None
self.status = None self.status = None
@ -31,55 +34,68 @@ class Progress():
self.percent = percent self.percent = percent
status_str = "[" status_str = "["
for i in range(0, 50): for i in range(0, 50):
if i < percent/2: if i < percent / 2:
status_str += '*' status_str += '*'
else: else:
status_str += ' ' status_str += ' '
status_str += "] %d%% %s" %(percent, status) status_str += "] %d%% %s" % (percent, status)
self.erase = len(status_str) self.erase = len(status_str)
sys.stdout.write(status_str) sys.stdout.write(status_str)
sys.stdout.flush() sys.stdout.flush()
if 'idle' in status: if 'idle' in status:
sys.stdout.write("\n") sys.stdout.write("\n")
def parse_args(): def parse_args():
"""Parse arguments for this client""" """Parse arguments for this client"""
import argparse import argparse
parser = argparse.ArgumentParser(description="Interact with fwupd daemon") parser = argparse.ArgumentParser(description="Interact with fwupd daemon")
parser.add_argument("--allow-older", action="store_true", parser.add_argument(
help="Install older payloads(default False)") "--allow-older",
parser.add_argument("--allow-reinstall", action="store_true", action="store_true",
help="Reinstall payloads(default False)") help="Install older payloads(default False)",
parser.add_argument("command", choices=["get-devices", )
"get-details", parser.add_argument(
"install"], help="What to do") "--allow-reinstall",
action="store_true",
help="Reinstall payloads(default False)",
)
parser.add_argument(
"command", choices=["get-devices", "get-details", "install"], help="What to do"
)
parser.add_argument('cab', nargs='?', help='CAB file') parser.add_argument('cab', nargs='?', help='CAB file')
parser.add_argument('deviceid', nargs='?', parser.add_argument('deviceid', nargs='?', help='DeviceID to operate on(optional)')
help='DeviceID to operate on(optional)')
args = parser.parse_args() args = parser.parse_args()
return args return args
def get_devices(client): def get_devices(client):
"""Use fwupd client to fetch devices""" """Use fwupd client to fetch devices"""
devices = client.get_devices() devices = client.get_devices()
for item in devices: for item in devices:
print(item.to_string()) print(item.to_string())
def get_details(client, cab): def get_details(client, cab):
"""Use fwupd client to fetch details for a CAB file""" """Use fwupd client to fetch details for a CAB file"""
devices = client.get_details(cab, None) devices = client.get_details(cab, None)
for device in devices: for device in devices:
print(device.to_string()) print(device.to_string())
def status_changed(client, spec, progress): #pylint: disable=unused-argument
"""Signal emitted by fwupd daemon indicating status changed"""
progress.status_changed(client.get_percentage(),
Fwupd.status_to_string(client.get_status()))
def device_changed(client, device, progress): #pylint: disable=unused-argument def status_changed(client, spec, progress): # pylint: disable=unused-argument
"""Signal emitted by fwupd daemon indicating status changed"""
progress.status_changed(
client.get_percentage(), Fwupd.status_to_string(client.get_status())
)
def device_changed(client, device, progress): # pylint: disable=unused-argument
"""Signal emitted by fwupd daemon indicating active device changed""" """Signal emitted by fwupd daemon indicating active device changed"""
progress.device_changed(device.get_name()) progress.device_changed(device.get_name())
def install(client, cab, target, older, reinstall): def install(client, cab, target, older, reinstall):
"""Use fwupd client to install CAB file to applicable devices""" """Use fwupd client to install CAB file to applicable devices"""
# FWUPD_DEVICE_ID_ANY # FWUPD_DEVICE_ID_ANY
@ -97,12 +113,13 @@ def install(client, cab, target, older, reinstall):
parent.connect('notify::status', status_changed, progress) parent.connect('notify::status', status_changed, progress)
try: try:
client.install(target, cab, flags, None) client.install(target, cab, flags, None)
except GLib.Error as glib_err: #pylint: disable=catching-non-exception except GLib.Error as glib_err: # pylint: disable=catching-non-exception
progress.status_changed(0, 'idle') progress.status_changed(0, 'idle')
print("%s" % glib_err) print("%s" % glib_err)
sys.exit(1) sys.exit(1)
print("\n") print("\n")
def check_exists(cab): def check_exists(cab):
"""Check that CAB file exists""" """Check that CAB file exists"""
if not cab: if not cab:
@ -112,6 +129,7 @@ def check_exists(cab):
print("%s doesn't exist or isn't a file" % cab) print("%s doesn't exist or isn't a file" % cab)
sys.exit(1) sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':
ARGS = parse_args() ARGS = parse_args()
CLIENT = Fwupd.Client() CLIENT = Fwupd.Client()

View File

@ -5,18 +5,22 @@ import sys
import os import os
import subprocess import subprocess
def _do_msgattrib(fn): def _do_msgattrib(fn):
argv = ['msgattrib', argv = [
'--no-location', 'msgattrib',
'--translated', '--no-location',
'--no-wrap', '--translated',
'--sort-output', '--no-wrap',
fn, '--sort-output',
'--output-file=' + fn] fn,
'--output-file=' + fn,
]
ret = subprocess.run(argv) ret = subprocess.run(argv)
if ret.returncode != 0: if ret.returncode != 0:
return return
def _do_nukeheader(fn): def _do_nukeheader(fn):
clean_lines = [] clean_lines = []
with open(fn) as f: with open(fn) as f:
@ -32,10 +36,12 @@ def _do_nukeheader(fn):
with open(fn, 'w') as f: with open(fn, 'w') as f:
f.writelines(clean_lines) f.writelines(clean_lines)
def _process_file(fn): def _process_file(fn):
_do_msgattrib(fn) _do_msgattrib(fn)
_do_nukeheader(fn) _do_nukeheader(fn)
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) == 1: if len(sys.argv) == 1:
print('path required') print('path required')

View File

@ -13,6 +13,7 @@ from pkg_resources import parse_version
XMLNS = '{http://www.gtk.org/introspection/core/1.0}' XMLNS = '{http://www.gtk.org/introspection/core/1.0}'
XMLNS_C = '{http://www.gtk.org/introspection/c/1.0}' XMLNS_C = '{http://www.gtk.org/introspection/c/1.0}'
def usage(return_code): def usage(return_code):
""" print usage and exit with the supplied return code """ """ print usage and exit with the supplied return code """
if return_code == 0: if return_code == 0:
@ -22,6 +23,7 @@ def usage(return_code):
out.write("usage: %s <NAME> <INPUT> <OUTPUT>\n" % sys.argv[0]) out.write("usage: %s <NAME> <INPUT> <OUTPUT>\n" % sys.argv[0])
sys.exit(return_code) sys.exit(return_code)
class LdVersionScript: class LdVersionScript:
""" Rasterize some text """ """ Rasterize some text """
@ -58,14 +60,18 @@ class LdVersionScript:
for node in cls.findall(XMLNS + 'method'): for node in cls.findall(XMLNS + 'method'):
version_tmp = self._add_node(node) version_tmp = self._add_node(node)
if version_tmp: if version_tmp:
if not version_lowest or parse_version(version_tmp) < parse_version(version_lowest): if not version_lowest or parse_version(version_tmp) < parse_version(
version_lowest
):
version_lowest = version_tmp version_lowest = version_tmp
# add the constructor # add the constructor
for node in cls.findall(XMLNS + 'constructor'): for node in cls.findall(XMLNS + 'constructor'):
version_tmp = self._add_node(node) version_tmp = self._add_node(node)
if version_tmp: if version_tmp:
if not version_lowest or parse_version(version_tmp) < parse_version(version_lowest): if not version_lowest or parse_version(version_tmp) < parse_version(
version_lowest
):
version_lowest = version_tmp version_lowest = version_tmp
# finally add the get_type symbol # finally add the get_type symbol
@ -107,6 +113,7 @@ class LdVersionScript:
oldversion = version oldversion = version
return verout return verout
if __name__ == '__main__': if __name__ == '__main__':
if {'-?', '--help', '--usage'}.intersection(set(sys.argv)): if {'-?', '--help', '--usage'}.intersection(set(sys.argv)):
usage(0) usage(0)

View File

@ -8,11 +8,13 @@ import struct
import glob import glob
from collections import namedtuple from collections import namedtuple
class Record(object): class Record(object):
def __init__(self, filename, cns): def __init__(self, filename, cns):
self.filename = filename self.filename = filename
self.cns = cns self.cns = cns
def load_pci_ids(): def load_pci_ids():
pci_vendors = {} pci_vendors = {}
pci_vendors[0x1987] = 'Freescale' pci_vendors[0x1987] = 'Freescale'
@ -29,9 +31,11 @@ def load_pci_ids():
break break
return pci_vendors return pci_vendors
def _data_to_utf8(s): def _data_to_utf8(s):
return s.decode('utf-8', 'replace').replace('\0', ' ') return s.decode('utf-8', 'replace').replace('\0', ' ')
def main(): def main():
# open files # open files
@ -41,47 +45,67 @@ def main():
if len(blob) != 4096: if len(blob) != 4096:
print('WARNING: ignoring %s of size %i' % (fn, len(blob))) print('WARNING: ignoring %s of size %i' % (fn, len(blob)))
continue continue
Cns = namedtuple('Cns', Cns = namedtuple(
'vid ssvid sn mn fr rab ieee cmic mdts cntlid ver ' \ 'Cns',
'rtd3r rtd3e oaes ctratt rrls rsvd102 oacs acl aerl ' \ 'vid ssvid sn mn fr rab ieee cmic mdts cntlid ver '
'frmw lpa elpe npss avscc apsta wctemp cctemp mtfa ' \ 'rtd3r rtd3e oaes ctratt rrls rsvd102 oacs acl aerl '
'hmpre hmmin tnvmcap unvmcap rpmbs edstt dsto fwug ' \ 'frmw lpa elpe npss avscc apsta wctemp cctemp mtfa '
'kas hctma mntmt mxtmt sanicap hmminds hmmaxd ' \ 'hmpre hmmin tnvmcap unvmcap rpmbs edstt dsto fwug '
'nsetidmax rsvd340 anatt anacap anagrpmax nanagrpid ' \ 'kas hctma mntmt mxtmt sanicap hmminds hmmaxd '
'rsvd352 sqes cqes maxcmd nn oncs fuses fna vwc awun ' \ 'nsetidmax rsvd340 anatt anacap anagrpmax nanagrpid '
'awupf nvscc nwpc acwu rsvd534 sgls mnan rsvd544 ' \ 'rsvd352 sqes cqes maxcmd nn oncs fuses fna vwc awun '
'subnqn rsvd1024 ioccsz iorcsz icdoff ctrattr msdbd ' \ 'awupf nvscc nwpc acwu rsvd534 sgls mnan rsvd544 '
'rsvd1804 psd vs') 'subnqn rsvd1024 ioccsz iorcsz icdoff ctrattr msdbd '
'rsvd1804 psd vs',
)
try: try:
cns = Cns._make(struct.unpack('<HH20s40s8sB3pBBHIIIIIH154pHBBBBBBBBHHHII16p' \ cns = Cns._make(
'16pIHBBHHHHIIHH2pBBII160pBBHIHHBBHHBBH2pII' \ struct.unpack(
'224p256p768pIIHBB244p1024p1024p', blob)) '<HH20s40s8sB3pBBHIIIIIH154pHBBBBBBBBHHHII16p'
'16pIHBBHHHHIIHH2pBBII160pBBHIHHBBHHBBH2pII'
'224p256p768pIIHBB244p1024p1024p',
blob,
)
)
except struct.error as e: except struct.error as e:
print('WARNING: ignoring %s of size %i' % (fn, len(blob))) print('WARNING: ignoring %s of size %i' % (fn, len(blob)))
continue continue
records.append(Record(fn, cns)) records.append(Record(fn, cns))
# try to sort in sane way # try to sort in sane way
records = sorted(records, records = sorted(
key=lambda k: str(k.cns.vid) + k.cns.mn.decode('utf-8', 'replace') + k.cns.sn.decode('utf-8', 'replace'), records,
reverse=True) key=lambda k: str(k.cns.vid)
+ k.cns.mn.decode('utf-8', 'replace')
+ k.cns.sn.decode('utf-8', 'replace'),
reverse=True,
)
# export csv # export csv
with open('all.csv', 'w', newline='') as csvfile: with open('all.csv', 'w', newline='') as csvfile:
exp = csv.writer(csvfile) exp = csv.writer(csvfile)
exp.writerow(['id', 'vid', 'sn', 'mn', 'fr', exp.writerow(
'rrls', 'frmw', 'fwug', 'subnqn', 'vs']) ['id', 'vid', 'sn', 'mn', 'fr', 'rrls', 'frmw', 'fwug', 'subnqn', 'vs']
)
for r in records: for r in records:
cns = r.cns cns = r.cns
sn = cns.sn.decode('utf-8', 'replace').replace('\0', ' ') sn = cns.sn.decode('utf-8', 'replace').replace('\0', ' ')
mn = cns.mn.decode('utf-8', 'replace').replace('\0', ' ') mn = cns.mn.decode('utf-8', 'replace').replace('\0', ' ')
fr = cns.fr.decode('utf-8', 'replace').replace('\0', ' ') fr = cns.fr.decode('utf-8', 'replace').replace('\0', ' ')
exp.writerow([os.path.basename(r.filename)[:6], exp.writerow(
'%04x' % cns.vid, [
sn, mn, fr, cns.rrls, os.path.basename(r.filename)[:6],
'%02x' % cns.frmw, '%04x' % cns.vid,
cns.fwug, cns.subnqn, sn,
binascii.hexlify(cns.vs)]) mn,
fr,
cns.rrls,
'%02x' % cns.frmw,
cns.fwug,
cns.subnqn,
binascii.hexlify(cns.vs),
]
)
# frmw stats # frmw stats
s1ro_cnt = 0 s1ro_cnt = 0
@ -92,7 +116,7 @@ def main():
s1ro_cnt += 1 s1ro_cnt += 1
if (r.cns.frmw & 0x10) >> 4: if (r.cns.frmw & 0x10) >> 4:
fawr_cnt += 1 fawr_cnt += 1
nfws = (r.cns.frmw & 0x0e) >> 1 nfws = (r.cns.frmw & 0x0E) >> 1
if nfws in nfws_map: if nfws in nfws_map:
nfws_map[nfws] += 1 nfws_map[nfws] += 1
continue continue
@ -125,4 +149,5 @@ def main():
vs_records.append(r) vs_records.append(r)
print('nr_vs=%i' % len(vs_records)) print('nr_vs=%i' % len(vs_records))
main() main()

View File

@ -12,24 +12,43 @@ import sys
import shutil import shutil
import tempfile import tempfile
import zipfile import zipfile
TAG = b'#\x00' TAG = b'#\x00'
def parse_args(): def parse_args():
import argparse import argparse
parser = argparse.ArgumentParser(description="Self extracting firmware updater") parser = argparse.ArgumentParser(description="Self extracting firmware updater")
parser.add_argument("--directory", help="Directory to extract to") parser.add_argument("--directory", help="Directory to extract to")
parser.add_argument("--cleanup", action='store_true', help="Remove tools when done with installation") parser.add_argument(
parser.add_argument("--verbose", action='store_true', help="Run the tool in verbose mode") "--cleanup",
parser.add_argument("--allow-reinstall", action='store_true', help="Allow re-installing existing firmware versions") action='store_true',
parser.add_argument("--allow-older", action='store_true', help="Allow downgrading firmware versions") help="Remove tools when done with installation",
parser.add_argument("command", choices=["install", "extract"], help="Command to run") )
parser.add_argument(
"--verbose", action='store_true', help="Run the tool in verbose mode"
)
parser.add_argument(
"--allow-reinstall",
action='store_true',
help="Allow re-installing existing firmware versions",
)
parser.add_argument(
"--allow-older", action='store_true', help="Allow downgrading firmware versions"
)
parser.add_argument(
"command", choices=["install", "extract"], help="Command to run"
)
args = parser.parse_args() args = parser.parse_args()
return args return args
def error (msg):
def error(msg):
print(msg) print(msg)
sys.exit(1) sys.exit(1)
def bytes_slicer(length, source): def bytes_slicer(length, source):
start = 0 start = 0
stop = length stop = length
@ -38,68 +57,71 @@ def bytes_slicer(length, source):
start = stop start = stop
stop += length stop += length
def get_zip(): def get_zip():
script = os.path.realpath (__file__) script = os.path.realpath(__file__)
bytes_out = io.BytesIO() bytes_out = io.BytesIO()
with open(script, 'rb') as source: with open(script, 'rb') as source:
for line in source: for line in source:
if not line.startswith(TAG): if not line.startswith(TAG):
continue continue
bytes_out.write(b64decode(line[len(TAG):-1])) bytes_out.write(b64decode(line[len(TAG) : -1]))
return bytes_out return bytes_out
def unzip (destination):
zipf = get_zip () def unzip(destination):
source = zipfile.ZipFile (zipf, 'r') zipf = get_zip()
source = zipfile.ZipFile(zipf, 'r')
for item in source.namelist(): for item in source.namelist():
# extract handles the sanitization # extract handles the sanitization
source.extract (item, destination) source.extract(item, destination)
def copy_cabs (source, target):
if not os.path.exists (target): def copy_cabs(source, target):
os.makedirs (target) if not os.path.exists(target):
os.makedirs(target)
cabs = [] cabs = []
for root, dirs, files in os.walk (source): for root, dirs, files in os.walk(source):
for f in files: for f in files:
if (f.endswith ('.cab')): if f.endswith('.cab'):
origf = os.path.join(root, f) origf = os.path.join(root, f)
shutil.copy (origf, target) shutil.copy(origf, target)
cabs.append (os.path.join (target, f)) cabs.append(os.path.join(target, f))
return cabs return cabs
def install_snap (directory, verbose, allow_reinstall, allow_older, uninstall): def install_snap(directory, verbose, allow_reinstall, allow_older, uninstall):
app = 'fwupd' app = 'fwupd'
common = '/root/snap/%s/common' % app common = '/root/snap/%s/common' % app
#check if snap is installed # check if snap is installed
with open(os.devnull, 'w') as devnull: with open(os.devnull, 'w') as devnull:
subprocess.run (['snap'], check=True, stdout=devnull, stderr=devnull) subprocess.run(['snap'], check=True, stdout=devnull, stderr=devnull)
#check existing installed # check existing installed
cmd = ['snap', 'list', app] cmd = ['snap', 'list', app]
with open(os.devnull, 'w') as devnull: with open(os.devnull, 'w') as devnull:
if verbose: if verbose:
print(cmd) print(cmd)
ret = subprocess.run (cmd, stdout=devnull, stderr=devnull) ret = subprocess.run(cmd, stdout=devnull, stderr=devnull)
if ret.returncode == 0: if ret.returncode == 0:
cmd = ['snap', 'remove', app] cmd = ['snap', 'remove', app]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
# install the snap # install the snap
cmd = ['snap', 'ack', os.path.join (directory, 'fwupd.assert')] cmd = ['snap', 'ack', os.path.join(directory, 'fwupd.assert')]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
cmd = ['snap', 'install', '--classic', os.path.join (directory, 'fwupd.snap')] cmd = ['snap', 'install', '--classic', os.path.join(directory, 'fwupd.snap')]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
# copy the CAB files # copy the CAB files
cabs = copy_cabs (directory, common) cabs = copy_cabs(directory, common)
# run the snap # run the snap
for cab in cabs: for cab in cabs:
@ -111,76 +133,77 @@ def install_snap (directory, verbose, allow_reinstall, allow_older, uninstall):
if verbose: if verbose:
cmd += ["--verbose"] cmd += ["--verbose"]
print(cmd) print(cmd)
subprocess.run (cmd) subprocess.run(cmd)
#remove copied cabs # remove copied cabs
for f in cabs: for f in cabs:
os.remove(f) os.remove(f)
#cleanup # cleanup
if uninstall: if uninstall:
cmd = ['snap', 'remove', app] cmd = ['snap', 'remove', app]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd) subprocess.run(cmd)
def install_flatpak (directory, verbose, allow_reinstall, allow_older, uninstall):
def install_flatpak(directory, verbose, allow_reinstall, allow_older, uninstall):
app = 'org.freedesktop.fwupd' app = 'org.freedesktop.fwupd'
common = '%s/.var/app/%s' % (os.getenv ('HOME'), app) common = '%s/.var/app/%s' % (os.getenv('HOME'), app)
with open(os.devnull, 'w') as devnull: with open(os.devnull, 'w') as devnull:
if not verbose: if not verbose:
output = devnull output = devnull
else: else:
output = None output = None
#look for dependencies # look for dependencies
dep = 'org.gnome.Platform/x86_64/3.30' dep = 'org.gnome.Platform/x86_64/3.30'
repo = 'flathub' repo = 'flathub'
repo_url = 'https://flathub.org/repo/flathub.flatpakrepo' repo_url = 'https://flathub.org/repo/flathub.flatpakrepo'
cmd = ['flatpak', 'info', dep] cmd = ['flatpak', 'info', dep]
if verbose: if verbose:
print(cmd) print(cmd)
ret = subprocess.run (cmd, stdout=output, stderr=output) ret = subprocess.run(cmd, stdout=output, stderr=output)
#not installed # not installed
if ret.returncode != 0: if ret.returncode != 0:
#look for remotes # look for remotes
cmd = ['flatpak', 'remote-info', repo, dep] cmd = ['flatpak', 'remote-info', repo, dep]
if verbose: if verbose:
print(cmd) print(cmd)
ret = subprocess.run (cmd, stdout=output, stderr=output) ret = subprocess.run(cmd, stdout=output, stderr=output)
#not enabled, enable it # not enabled, enable it
if ret.returncode != 0: if ret.returncode != 0:
cmd = ['flatpak', 'remote-add', repo, repo_url] cmd = ['flatpak', 'remote-add', repo, repo_url]
if verbose: if verbose:
print(cmd) print(cmd)
ret = subprocess.run (cmd, stderr=output) ret = subprocess.run(cmd, stderr=output)
# install dep # install dep
cmd = ['flatpak', 'install', repo, dep] cmd = ['flatpak', 'install', repo, dep]
if verbose: if verbose:
print(cmd) print(cmd)
ret = subprocess.run (cmd) ret = subprocess.run(cmd)
#check existing installed # check existing installed
cmd = ['flatpak', 'info', app] cmd = ['flatpak', 'info', app]
if verbose: if verbose:
print(cmd) print(cmd)
ret = subprocess.run (cmd, stdout=output, stderr=output) ret = subprocess.run(cmd, stdout=output, stderr=output)
if ret.returncode == 0: if ret.returncode == 0:
cmd = ['flatpak', 'remove', app] cmd = ['flatpak', 'remove', app]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
#install the flatpak # install the flatpak
cmd = ['flatpak', 'install', os.path.join (directory, 'fwupd.flatpak')] cmd = ['flatpak', 'install', os.path.join(directory, 'fwupd.flatpak')]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, check=True) subprocess.run(cmd, check=True)
# copy the CAB files # copy the CAB files
cabs = copy_cabs (directory, common) cabs = copy_cabs(directory, common)
#run command # run command
for cab in cabs: for cab in cabs:
cmd = ['flatpak', 'run', app, 'install', cab] cmd = ['flatpak', 'run', app, 'install', cab]
if allow_reinstall: if allow_reinstall:
@ -190,18 +213,19 @@ def install_flatpak (directory, verbose, allow_reinstall, allow_older, uninstall
if verbose: if verbose:
cmd += ["--verbose"] cmd += ["--verbose"]
print(cmd) print(cmd)
subprocess.run (cmd) subprocess.run(cmd)
#remove copied cabs # remove copied cabs
for f in cabs: for f in cabs:
os.remove(f) os.remove(f)
#cleanup # cleanup
if uninstall: if uninstall:
cmd = ['flatpak', 'remove', app] cmd = ['flatpak', 'remove', app]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd) subprocess.run(cmd)
# Check which package to use # Check which package to use
# - return False to use packaged version # - return False to use packaged version
@ -218,15 +242,21 @@ def use_included_version(minimum_version):
return True return True
if minimum_version: if minimum_version:
if minimum_version > version: if minimum_version > version:
print("fwupd %s is already installed but this package requires %s" % print(
(version.version, minimum_version)) "fwupd %s is already installed but this package requires %s"
% (version.version, minimum_version)
)
else: else:
print("Using existing fwupd version %s already installed on system." % version.version) print(
"Using existing fwupd version %s already installed on system."
% version.version
)
return False return False
else: else:
print("fwupd %s is installed and must be removed" % version.version) print("fwupd %s is installed and must be removed" % version.version)
return remove_packaged_version(pkg, cache) return remove_packaged_version(pkg, cache)
def remove_packaged_version(pkg, cache): def remove_packaged_version(pkg, cache):
res = False res = False
while True: while True:
@ -244,13 +274,14 @@ def remove_packaged_version(pkg, cache):
raise Exception("Need to remove packaged version") raise Exception("Need to remove packaged version")
return True return True
def install_builtin(directory, verbose, allow_reinstall, allow_older): def install_builtin(directory, verbose, allow_reinstall, allow_older):
cabs = [] cabs = []
for root, dirs, files in os.walk (directory): for root, dirs, files in os.walk(directory):
for f in files: for f in files:
if f.endswith('.cab'): if f.endswith('.cab'):
cabs.append(os.path.join(root, f)) cabs.append(os.path.join(root, f))
#run command # run command
for cab in cabs: for cab in cabs:
cmd = ['fwupdmgr', 'install', cab] cmd = ['fwupdmgr', 'install', cab]
if allow_reinstall: if allow_reinstall:
@ -262,11 +293,12 @@ def install_builtin(directory, verbose, allow_reinstall, allow_older):
print(cmd) print(cmd)
subprocess.run(cmd) subprocess.run(cmd)
def run_installation (directory, verbose, allow_reinstall, allow_older, uninstall):
def run_installation(directory, verbose, allow_reinstall, allow_older, uninstall):
try_snap = False try_snap = False
try_flatpak = False try_flatpak = False
#determine if a minimum version was specified # determine if a minimum version was specified
minimum_path = os.path.join(directory, "minimum") minimum_path = os.path.join(directory, "minimum")
minimum = None minimum = None
if os.path.exists(minimum_path): if os.path.exists(minimum_path):
@ -278,44 +310,60 @@ def run_installation (directory, verbose, allow_reinstall, allow_older, uninstal
return return
# determine what self extracting binary has # determine what self extracting binary has
if os.path.exists (os.path.join (directory, 'fwupd.snap')) and \ if os.path.exists(os.path.join(directory, 'fwupd.snap')) and os.path.exists(
os.path.exists (os.path.join (directory, 'fwupd.assert')): os.path.join(directory, 'fwupd.assert')
):
try_snap = True try_snap = True
if os.path.exists (os.path.join (directory, 'fwupd.flatpak')): if os.path.exists(os.path.join(directory, 'fwupd.flatpak')):
try_flatpak = True try_flatpak = True
if try_snap: if try_snap:
try: try:
install_snap (directory, verbose, allow_reinstall, allow_older, uninstall) install_snap(directory, verbose, allow_reinstall, allow_older, uninstall)
return True return True
except Exception as _: except Exception as _:
if verbose: if verbose:
print ("Snap installation failed") print("Snap installation failed")
if not try_flatpak: if not try_flatpak:
error ("Snap installation failed") error("Snap installation failed")
if try_flatpak: if try_flatpak:
install_flatpak (directory, verbose, allow_reinstall, allow_older, uninstall) install_flatpak(directory, verbose, allow_reinstall, allow_older, uninstall)
if __name__ == '__main__': if __name__ == '__main__':
args = parse_args() args = parse_args()
if 'extract' in args.command: if 'extract' in args.command:
if args.allow_reinstall: if args.allow_reinstall:
error ("allow-reinstall argument doesn't make sense with command %s" % args.command) error(
"allow-reinstall argument doesn't make sense with command %s"
% args.command
)
if args.allow_older: if args.allow_older:
error ("allow-older argument doesn't make sense with command %s" % args.command) error(
"allow-older argument doesn't make sense with command %s" % args.command
)
if args.cleanup: if args.cleanup:
error ("Cleanup argument doesn't make sense with command %s" % args.command) error("Cleanup argument doesn't make sense with command %s" % args.command)
if args.directory is None: if args.directory is None:
error ("No directory specified") error("No directory specified")
if not os.path.exists (args.directory): if not os.path.exists(args.directory):
print ("Creating %s" % args.directory) print("Creating %s" % args.directory)
os.makedirs (args.directory) os.makedirs(args.directory)
unzip (args.directory) unzip(args.directory)
else: else:
if args.directory: if args.directory:
error ("Directory argument %s doesn't make sense with command %s" % (args.directory, args.command)) error(
"Directory argument %s doesn't make sense with command %s"
% (args.directory, args.command)
)
if os.getuid() != 0: if os.getuid() != 0:
error ("This tool must be run as root") error("This tool must be run as root")
with tempfile.TemporaryDirectory (prefix='fwupd') as target: with tempfile.TemporaryDirectory(prefix='fwupd') as target:
unzip (target) unzip(target)
run_installation (target, args.verbose, args.allow_reinstall, args.allow_older, args.cleanup) run_installation(
target,
args.verbose,
args.allow_reinstall,
args.allow_older,
args.cleanup,
)

View File

@ -14,22 +14,43 @@ import tempfile
import zipfile import zipfile
from assets.header import TAG from assets.header import TAG
def error (msg):
def error(msg):
print(msg) print(msg)
sys.exit(1) sys.exit(1)
def parse_args(): def parse_args():
import argparse import argparse
parser = argparse.ArgumentParser(description="Generate a standalone firmware updater")
parser.add_argument("--disable-snap-download", action='store_true', help="Don't download support for snap") parser = argparse.ArgumentParser(
parser.add_argument("--disable-flatpak-download", action='store_true', help="Don't download support for flatpak") description="Generate a standalone firmware updater"
parser.add_argument("--snap-channel", help="Channel to download snap from (optional)") )
parser.add_argument("--minimum", help="Use already installed fwupd version if at least this version") parser.add_argument(
parser.add_argument("cab", help="CAB file or directory containing CAB files to automatically install") "--disable-snap-download",
action='store_true',
help="Don't download support for snap",
)
parser.add_argument(
"--disable-flatpak-download",
action='store_true',
help="Don't download support for flatpak",
)
parser.add_argument(
"--snap-channel", help="Channel to download snap from (optional)"
)
parser.add_argument(
"--minimum", help="Use already installed fwupd version if at least this version"
)
parser.add_argument(
"cab",
help="CAB file or directory containing CAB files to automatically install",
)
parser.add_argument('target', help='target file to create') parser.add_argument('target', help='target file to create')
args = parser.parse_args() args = parser.parse_args()
return args return args
def bytes_slicer(length, source): def bytes_slicer(length, source):
start = 0 start = 0
stop = length stop = length
@ -38,73 +59,87 @@ def bytes_slicer(length, source):
start = stop start = stop
stop += length stop += length
def generate_installer (directory, target):
asset_base = os.path.join (os.path.dirname(os.path.realpath(__file__)),
"assets")
#header def generate_installer(directory, target):
shutil.copy (os.path.join (asset_base, "header.py"), target) asset_base = os.path.join(os.path.dirname(os.path.realpath(__file__)), "assets")
#zip file # header
shutil.copy(os.path.join(asset_base, "header.py"), target)
# zip file
buffer = io.BytesIO() buffer = io.BytesIO()
archive = zipfile.ZipFile(buffer, "a") archive = zipfile.ZipFile(buffer, "a")
for root, dirs, files in os.walk (directory): for root, dirs, files in os.walk(directory):
for f in files: for f in files:
source = os.path.join(root, f) source = os.path.join(root, f)
archive_fname = source.split (directory) [1] archive_fname = source.split(directory)[1]
archive.write(source, archive_fname) archive.write(source, archive_fname)
if 'DEBUG' in os.environ: if 'DEBUG' in os.environ:
print (archive.namelist()) print(archive.namelist())
archive.close() archive.close()
with open (target, 'ab') as bytes_out: with open(target, 'ab') as bytes_out:
encoded = b64encode(buffer.getvalue()) encoded = b64encode(buffer.getvalue())
for section in bytes_slicer(64, encoded): for section in bytes_slicer(64, encoded):
bytes_out.write(TAG) bytes_out.write(TAG)
bytes_out.write(section) bytes_out.write(section)
bytes_out.write(b'\n') bytes_out.write(b'\n')
def download_snap (directory, channel):
def download_snap(directory, channel):
cmd = ['snap', 'download', 'fwupd'] cmd = ['snap', 'download', 'fwupd']
if channel is not None: if channel is not None:
cmd += ['--channel', channel] cmd += ['--channel', channel]
if 'DEBUG' in os.environ: if 'DEBUG' in os.environ:
print(cmd) print(cmd)
subprocess.run (cmd, cwd=directory, check=True) subprocess.run(cmd, cwd=directory, check=True)
for f in os.listdir (directory): for f in os.listdir(directory):
# the signatures associated with the snap # the signatures associated with the snap
if f.endswith(".assert"): if f.endswith(".assert"):
shutil.move (os.path.join(directory, f), os.path.join(directory, 'fwupd.assert')) shutil.move(
os.path.join(directory, f), os.path.join(directory, 'fwupd.assert')
)
# the snap binary itself # the snap binary itself
elif f.endswith(".snap"): elif f.endswith(".snap"):
shutil.move (os.path.join(directory, f), os.path.join(directory, 'fwupd.snap')) shutil.move(
os.path.join(directory, f), os.path.join(directory, 'fwupd.snap')
)
def download_cab_file (directory, uri):
def download_cab_file(directory, uri):
cmd = ['wget', uri] cmd = ['wget', uri]
if 'DEBUG' in os.environ: if 'DEBUG' in os.environ:
print(cmd) print(cmd)
subprocess.run (cmd, cwd=directory, check=True) subprocess.run(cmd, cwd=directory, check=True)
def download_flatpak (directory):
def download_flatpak(directory):
dep = 'org.freedesktop.fwupd' dep = 'org.freedesktop.fwupd'
flatpak_dir = os.path.join(os.getenv('HOME'),'.local', 'share', 'flatpak') flatpak_dir = os.path.join(os.getenv('HOME'), '.local', 'share', 'flatpak')
verbose = 'DEBUG' in os.environ verbose = 'DEBUG' in os.environ
#check if we have installed locally already or not # check if we have installed locally already or not
if not os.path.exists (os.path.join (flatpak_dir, 'app', dep)): if not os.path.exists(os.path.join(flatpak_dir, 'app', dep)):
# install into local user's repo # install into local user's repo
cmd = ['flatpak', 'install', '--user', cmd = [
'https://www.flathub.org/repo/appstream/org.freedesktop.fwupd.flatpakref', '--no-deps', '-y'] 'flatpak',
'install',
'--user',
'https://www.flathub.org/repo/appstream/org.freedesktop.fwupd.flatpakref',
'--no-deps',
'-y',
]
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, cwd=directory, check=True) subprocess.run(cmd, cwd=directory, check=True)
# generate a bundle # generate a bundle
repo = os.path.join(flatpak_dir, 'repo') repo = os.path.join(flatpak_dir, 'repo')
cmd = ['flatpak', 'build-bundle', repo, 'fwupd.flatpak', dep, 'stable'] cmd = ['flatpak', 'build-bundle', repo, 'fwupd.flatpak', dep, 'stable']
if verbose: if verbose:
print(cmd) print(cmd)
subprocess.run (cmd, cwd=directory, check=True) subprocess.run(cmd, cwd=directory, check=True)
if __name__ == '__main__': if __name__ == '__main__':
args = parse_args() args = parse_args()
@ -112,27 +147,27 @@ if __name__ == '__main__':
if not args.cab.startswith("http"): if not args.cab.startswith("http"):
local = args.cab local = args.cab
with tempfile.TemporaryDirectory (prefix='fwupd') as directory: with tempfile.TemporaryDirectory(prefix='fwupd') as directory:
if local: if local:
if not os.path.exists (local): if not os.path.exists(local):
error ("%s doesn't exist" % local) error("%s doesn't exist" % local)
if not os.path.isdir(local): if not os.path.isdir(local):
shutil.copy (local, directory) shutil.copy(local, directory)
else: else:
for root, dirs, files in os.walk(local): for root, dirs, files in os.walk(local):
for f in files: for f in files:
shutil.copy (os.path.join(root, f), directory) shutil.copy(os.path.join(root, f), directory)
else: else:
download_cab_file (directory, args.cab) download_cab_file(directory, args.cab)
if not args.disable_snap_download: if not args.disable_snap_download:
download_snap (directory, args.snap_channel) download_snap(directory, args.snap_channel)
if not args.disable_flatpak_download: if not args.disable_flatpak_download:
download_flatpak (directory) download_flatpak(directory)
if args.minimum: if args.minimum:
with open(os.path.join(directory, "minimum"), "w") as wfd: with open(os.path.join(directory, "minimum"), "w") as wfd:
wfd.write(args.minimum) wfd.write(args.minimum)
generate_installer (directory, args.target) generate_installer(directory, args.target)

View File

@ -10,6 +10,7 @@ SPDX-License-Identifier: LGPL-2.1+
import sys import sys
import hashlib import hashlib
def usage(return_code): def usage(return_code):
""" print usage and exit with the supplied return code """ """ print usage and exit with the supplied return code """
if return_code == 0: if return_code == 0:
@ -19,6 +20,7 @@ def usage(return_code):
out.write("usage: fu-hash.py <HEADER> <SRC1> <SRC2>...") out.write("usage: fu-hash.py <HEADER> <SRC1> <SRC2>...")
sys.exit(return_code) sys.exit(return_code)
if __name__ == '__main__': if __name__ == '__main__':
if {'-?', '--help', '--usage'}.intersection(set(sys.argv)): if {'-?', '--help', '--usage'}.intersection(set(sys.argv)):
usage(0) usage(0)

View File

@ -19,6 +19,7 @@ def _find_part_by_id(parts, part_id):
return part return part
return None return None
# finds a memory layout for a part, climbing up the tree to the parent if reqd. # finds a memory layout for a part, climbing up the tree to the parent if reqd.
def _find_mem_layout(parts, part): def _find_mem_layout(parts, part):
if 'memory-application' in part: if 'memory-application' in part:
@ -26,7 +27,7 @@ def _find_mem_layout(parts, part):
if memory_flash: if memory_flash:
return memory_flash return memory_flash
#look at the parent # look at the parent
if 'parent' in part: if 'parent' in part:
parent = _find_part_by_id(parts, part['parent']) parent = _find_part_by_id(parts, part['parent'])
if parent: if parent:
@ -34,6 +35,7 @@ def _find_mem_layout(parts, part):
print('no parent ', part['parent'], 'found for', part['id']) print('no parent ', part['parent'], 'found for', part['id'])
return None return None
# parses the weird syntax of avrdude.conf and makes lots of nested dictionaries # parses the weird syntax of avrdude.conf and makes lots of nested dictionaries
def _parse_parts(fn_source): def _parse_parts(fn_source):
print("reading", fn_source) print("reading", fn_source)
@ -100,9 +102,11 @@ def _parse_parts(fn_source):
continue continue
return parts return parts
def _get_longest_substring(s1, s2): def _get_longest_substring(s1, s2):
match = SequenceMatcher(None, s1, s2).find_longest_match(0, len(s1), 0, len(s2)) match = SequenceMatcher(None, s1, s2).find_longest_match(0, len(s1), 0, len(s2))
return s2[match.b: match.b + match.size] return s2[match.b : match.b + match.size]
# writes important data to the quirks file # writes important data to the quirks file
def _write_quirks(parts, fn_destination): def _write_quirks(parts, fn_destination):
@ -148,13 +152,16 @@ def _write_quirks(parts, fn_destination):
for chip_id in results: for chip_id in results:
result = results[chip_id] result = results[chip_id]
outp.append('# ' + result['desc'] + ' [USER] USER=0x%x' % result['size'] + '\n') outp.append(
'# ' + result['desc'] + ' [USER] USER=0x%x' % result['size'] + '\n'
)
outp.append(chip_id + '=' + result['mem_layout'] + '\n\n') outp.append(chip_id + '=' + result['mem_layout'] + '\n\n')
# write file # write file
print("writing", fn_destination) print("writing", fn_destination)
open(fn_destination, 'w').writelines(outp) open(fn_destination, 'w').writelines(outp)
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) != 3: if len(sys.argv) != 3:
print("USAGE: %s avrdude.conf tmp.quirk" % sys.argv[0]) print("USAGE: %s avrdude.conf tmp.quirk" % sys.argv[0])