mirror_linux-firmware/contrib/process_linux_firmware.py
Mario Limonciello 84e4027f90 trivial: contrib: wrap the process in try/except to catch server issues
If lore has load problems, don't bail, just wrap them and try again
on the next cycle.

Signed-off-by: Mario Limonciello <mario.limonciello@amd.com>
2025-01-23 10:44:53 -06:00

402 lines
11 KiB
Python
Executable File

#!/usr/bin/python3
import os
import time
import urllib.request
import sqlite3
import feedparser
import argparse
import logging
import email
import email.utils
import smtplib
import subprocess
import sys
import magic # https://pypi.python.org/pypi/python-magic
from datetime import date
from enum import Enum
URL = "https://lore.kernel.org/linux-firmware/new.atom"
class ContentType(Enum):
REPLY = 1
PATCH = 2
PULL_REQUEST = 3
SPAM = 4
content_types = {
"are available in the Git repository at": ContentType.PULL_REQUEST,
"diff --git": ContentType.PATCH,
"Signed-off-by:": ContentType.PATCH,
}
def classify_content(content):
# load content into the email library
msg = email.message_from_string(content)
decoded = None
body = None
# check the subject
subject = msg["Subject"]
if "Re:" in subject:
return ContentType.REPLY
if "PATCH" in subject:
return ContentType.PATCH
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True)
else:
body = msg.get_payload(decode=True)
if body:
m = magic.Magic(mime_encoding=True)
try:
decoded = body.decode(m.from_buffer(body))
except UnicodeDecodeError:
pass
if decoded:
for key in content_types.keys():
if key in decoded:
return content_types[key]
else:
logging.warning("Failed to decode email: %s, treating as SPAM", body)
return ContentType.SPAM
def fetch_url(url):
blob = None
with urllib.request.urlopen(url) as response:
blob = response.read()
m = magic.Magic(mime_encoding=True)
return blob.decode(m.from_buffer(blob))
def quiet_cmd(cmd):
logging.debug("Running {}".format(cmd))
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
logging.debug(output)
def reply_email(content, branch):
user = None
password = None
server = None
port = None
if "SMTP_USER" in os.environ:
user = os.environ["SMTP_USER"]
if "SMTP_PASS" in os.environ:
password = os.environ["SMTP_PASS"]
if "SMTP_SERVER" in os.environ:
server = os.environ["SMTP_SERVER"]
if "SMTP_PORT" in os.environ:
port = os.environ["SMTP_PORT"]
if not user or not password or not server or not port:
logging.debug("Missing SMTP configuration, not sending email")
return
reply = email.message.EmailMessage()
orig = email.message_from_string(content)
try:
reply["To"] = ", ".join(
email.utils.formataddr(t)
for t in email.utils.getaddresses(
orig.get_all("from", [])
+ orig.get_all("to", [])
+ orig.get_all("cc", [])
)
)
except ValueError:
logging.warning("Failed to parse email addresses, not sending email")
return
reply["From"] = "linux-firmware@kernel.org"
try:
reply["Subject"] = "Re: {}".format(orig["Subject"])
except ValueError:
logging.warning("Failed to parse subject, not sending email")
return
reply["In-Reply-To"] = orig["Message-Id"]
reply["References"] = orig["Message-Id"]
reply["Thread-Topic"] = orig["Thread-Topic"]
reply["Thread-Index"] = orig["Thread-Index"]
content = (
"Your request has been forwarded by the Linux Firmware Kernel robot.\n"
"Please follow up at https://gitlab.com/kernel-firmware/linux-firmware/-/merge_requests to ensure it gets merged\n"
"Your request is '{}'".format(branch)
)
reply.set_content(content)
mailserver = smtplib.SMTP(server, port)
mailserver.ehlo()
mailserver.starttls()
mailserver.ehlo()
mailserver.login(user, password)
mailserver.sendmail(reply["From"], reply["To"], reply.as_string())
mailserver.quit()
def create_pr(remote, branch):
cmd = [
"git",
"push",
"-u",
remote,
branch,
"-o",
"merge_request.create",
"-o",
"merge_request.remove_source_branch",
"-o",
"merge_request.target=main",
"-o",
"merge_request.title={}".format(branch),
]
quiet_cmd(cmd)
def refresh_branch():
quiet_cmd(["git", "checkout", "main"])
quiet_cmd(["git", "pull"])
def delete_branch(branch):
quiet_cmd(["git", "checkout", "main"])
quiet_cmd(["git", "branch", "-D", branch])
def process_pr(mbox, num, remote):
branch = "robot/pr-{}-{}".format(num, int(time.time()))
# manual fixup for PRs from drm firmware repo
if "git@gitlab.freedesktop.org:drm/firmware.git" in mbox:
mbox = mbox.replace(
"git@gitlab.freedesktop.org:drm/firmware.git",
"https://gitlab.freedesktop.org/drm/firmware.git",
)
cmd = ["b4", "--debug", "pr", "-b", branch, "-"]
logging.debug("Running {}".format(cmd))
p = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate(mbox.encode("utf-8"))
for line in stdout.splitlines():
logging.debug(line.decode("utf-8"))
for line in stderr.splitlines():
logging.debug(line.decode("utf-8"))
# determine if it worked (we can't tell unfortunately by return code)
cmd = ["git", "branch", "--list", branch]
logging.debug("Running {}".format(cmd))
result = subprocess.check_output(cmd)
if result:
for line in result.splitlines():
logging.debug(line.decode("utf-8"))
logging.info("Forwarding PR for {}".format(branch))
if remote:
create_pr(remote, branch)
delete_branch(branch)
return branch
return None
def process_patch(mbox, num, remote):
# create a new branch for the patch
branch = "robot/patch-{}-{}".format(num, int(time.time()))
cmd = ["git", "checkout", "-b", branch]
quiet_cmd(cmd)
# apply the patch
cmd = ["git", "am"]
logging.debug("Running {}".format(cmd))
p = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate(mbox.encode("utf-8"))
for line in stdout.splitlines():
logging.debug(line.decode("utf-8"))
for line in stderr.splitlines():
logging.debug(line.decode("utf-8"))
if p.returncode != 0:
quiet_cmd(["git", "am", "--abort"])
else:
logging.info("Opening PR for {}".format(branch))
if remote:
create_pr(remote, branch)
delete_branch(branch)
if p.returncode == 0:
return branch
return None
def update_database(conn, url):
c = conn.cursor()
c.execute(
"""CREATE TABLE IF NOT EXISTS firmware (url text, processed integer default 0, spam integer default 0)"""
)
# local file
if os.path.exists(url):
with open(url, "r") as f:
atom = f.read()
# remote file
else:
logging.info("Fetching {}".format(url))
atom = fetch_url(url)
# Parse the atom and extract the URLs
feed = feedparser.parse(atom)
# Insert the URLs into the database (oldest first)
feed["entries"].reverse()
for entry in feed["entries"]:
c.execute("SELECT url FROM firmware WHERE url = ?", (entry.link,))
if c.fetchone():
continue
c.execute("INSERT INTO firmware VALUES (?, ?, ?)", (entry.link, 0, 0))
# Commit the changes and close the connection
conn.commit()
def process_database(conn, remote):
c = conn.cursor()
# get all unprocessed urls that aren't spam
c.execute("SELECT url FROM firmware WHERE processed = 0 AND spam = 0")
num = 0
msg = ""
rows = c.fetchall()
if not rows:
logging.info("No new entries")
return
refresh_branch()
# loop over all unprocessed urls
for row in rows:
branch = None
msg = "Processing ({}%)".format(round(num / len(rows) * 100))
print(msg, end="\r", flush=True)
url = "{}raw".format(row[0])
logging.debug("Processing {}".format(url))
mbox = fetch_url(url)
classification = classify_content(mbox)
if classification == ContentType.PATCH:
logging.debug("Processing patch ({})".format(row[0]))
branch = process_patch(mbox, num, remote)
if classification == ContentType.PULL_REQUEST:
logging.debug("Processing PR ({})".format(row[0]))
branch = process_pr(mbox, num, remote)
if classification == ContentType.SPAM:
logging.debug("Marking spam ({})".format(row[0]))
c.execute("UPDATE firmware SET spam = 1 WHERE url = ?", (row[0],))
if classification == ContentType.REPLY:
logging.debug("Ignoring reply ({})".format(row[0]))
c.execute("UPDATE firmware SET processed = 1 WHERE url = ?", (row[0],))
num += 1
print(" " * len(msg), end="\r", flush=True)
# commit changes
conn.commit()
# send any emails
if branch:
reply_email(mbox, branch)
logging.info("Finished processing {} new entries".format(len(rows)))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process linux-firmware mailing list")
parser.add_argument("--url", default=URL, help="URL to get ATOM feed from")
parser.add_argument(
"--database",
default=os.path.join("contrib", "linux_firmware.db"),
help="sqlite database to store entries in",
)
parser.add_argument("--dry", action="store_true", help="Don't open pull requests")
parser.add_argument(
"--debug", action="store_true", help="Enable debug logging to console"
)
parser.add_argument("--remote", default="origin", help="Remote to push to")
parser.add_argument(
"--refresh-cycle", default=0, help="How frequently to run (in minutes)"
)
args = parser.parse_args()
if not os.path.exists("WHENCE"):
logging.critical(
"Please run this script from the root of the linux-firmware repository"
)
sys.exit(1)
log = os.path.join(
"contrib",
"{prefix}-{date}.{suffix}".format(
prefix="linux_firmware", suffix="txt", date=date.today()
),
)
logging.basicConfig(
format="%(asctime)s %(levelname)s:\t%(message)s",
filename=log,
filemode="w",
level=logging.DEBUG,
)
# set a format which is simpler for console use
console = logging.StreamHandler()
if args.debug:
console.setLevel(logging.DEBUG)
else:
console.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s : %(levelname)s : %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
while True:
if args.dry:
remote = ""
else:
remote = args.remote
try:
conn = sqlite3.connect(args.database)
# update the database
update_database(conn, args.url)
# process the database
process_database(conn, remote)
except urllib.error.HTTPError as e:
logging.error("Failed to fetch URL: {}".format(e))
finally:
conn.close()
if args.refresh_cycle:
logging.info("Sleeping for {} minutes".format(args.refresh_cycle))
time.sleep(int(args.refresh_cycle) * 60)
else:
break