From feb4a99fc9fda02fd5dde750359fbb83ccd9a21e Mon Sep 17 00:00:00 2001 From: Gareth Jones Date: Wed, 28 May 2025 15:30:38 +1200 Subject: [PATCH] fix: ensure that "empty" object fields on advisories are handled correctly --- ruff.toml | 3 ++- scripts/download_sa_advisories.py | 11 +++++----- scripts/generate_osv_advisories.py | 7 +++++-- scripts/precache_nodes.py | 8 +++++--- scripts/typings/drupal.py | 29 +++++++++++++++++++++++---- scripts/utils.py | 32 ++++++++++++++++++++++++++++++ 6 files changed, 74 insertions(+), 16 deletions(-) create mode 100644 scripts/utils.py diff --git a/ruff.toml b/ruff.toml index 0ca1406a..6777c968 100644 --- a/ruff.toml +++ b/ruff.toml @@ -19,5 +19,6 @@ select = [ known-first-party = [ 'generate_osv_advisories', 'user_agent', - 'typings' + 'typings', + 'utils' ] diff --git a/scripts/download_sa_advisories.py b/scripts/download_sa_advisories.py index 5a54a055..f72e614f 100755 --- a/scripts/download_sa_advisories.py +++ b/scripts/download_sa_advisories.py @@ -9,10 +9,10 @@ import json import os -import typing import requests +import utils from typings import drupal from user_agent import user_agent @@ -26,11 +26,10 @@ def get_most_recent_changed_timestamp() -> int: for file in os.scandir('cache/advisories'): if not file.is_file() or not file.name.endswith('.json'): continue - with open(file.path) as f: - advisory = typing.cast(drupal.Advisory, json.load(f)) - changed = int(advisory['changed']) - if changed > most_recent_changed or most_recent_changed == 0: - most_recent_changed = changed + advisory = utils.load_sa_advisory(file.path) + changed = int(advisory['changed']) + if changed > most_recent_changed or most_recent_changed == 0: + most_recent_changed = changed except FileNotFoundError: pass return most_recent_changed diff --git a/scripts/generate_osv_advisories.py b/scripts/generate_osv_advisories.py index 2a4616e8..39dd079f 100755 --- a/scripts/generate_osv_advisories.py +++ b/scripts/generate_osv_advisories.py @@ -17,6 +17,7 @@ import semver from markdownify import markdownify +import utils from typings import drupal, osv from user_agent import user_agent @@ -318,6 +319,9 @@ def get_credits_from_sa( def determine_composer_package_name(sa_advisory: drupal.Advisory) -> str: + if sa_advisory['field_project'] is None: + raise Exception('advisory does not have a project!') + project = typing.cast( drupal.Project, fetch_drupal_node(sa_advisory['field_project']['id']) ) @@ -425,8 +429,7 @@ def generate_osv_advisories() -> None: if not file.is_file() or not file.name.endswith('.json'): continue - with open(file.path) as f: - sa_advisory: drupal.Advisory = json.load(f) + sa_advisory = utils.load_sa_advisory(file.path) print(f'processing {sa_advisory["url"]}') sa_id = file.name.removesuffix('.json') osv_advisory = build_osv_advisory(sa_id, sa_advisory) diff --git a/scripts/precache_nodes.py b/scripts/precache_nodes.py index 06d64697..9a790dc7 100755 --- a/scripts/precache_nodes.py +++ b/scripts/precache_nodes.py @@ -12,6 +12,7 @@ import requests +import utils from typings import drupal from user_agent import user_agent @@ -49,9 +50,10 @@ def fetch_and_cache_drupal_nodes() -> None: if not file.is_file() or not file.name.endswith('.json'): continue - with open(file.path) as f: - sa_advisory: drupal.Advisory = json.load(f) - ids.add(sa_advisory['field_project']['id']) + sa_advisory = utils.load_sa_advisory(file.path) + + if sa_advisory['field_project'] is not None: + ids.add(sa_advisory['field_project']['id']) for i, batch in enumerate(batched(ids, 50, strict=False)): print(f'fetching {len(batch)} nodes ({len(ids) - i * 50 - len(batch)} remaining)') diff --git a/scripts/typings/drupal.py b/scripts/typings/drupal.py index cc48afc9..2c837e14 100644 --- a/scripts/typings/drupal.py +++ b/scripts/typings/drupal.py @@ -17,21 +17,42 @@ class Node(typing.TypedDict): type: str -class Advisory(Node): +class AdvisoryBase(Node): field_is_psa: typing.Literal['0', '1'] field_affected_versions: str | None - field_project: EntityReferenceField field_fixed_in: list[EntityReferenceField] - field_sa_reported_by: RichTextField | list[typing.Never] field_sa_criticality: str field_sa_cve: list[str] - field_sa_description: RichTextField created: str changed: str title: str url: str +class Advisory(AdvisoryBase): + """ + Represents an advisory sourced from the Drupal JSON API that has been + transformed to make it easier to work with + """ + + field_project: EntityReferenceField | None + field_sa_reported_by: RichTextField + field_sa_description: RichTextField + + +class AdvisoryRaw(AdvisoryBase): + """ + Represents an advisory provided by the Drupal JSON API without any post-processing. + + This mainly means that object fields which don't have a value in the database + will be represented by an empty list due to how associated arrays in PHP work + """ + + field_project: EntityReferenceField | list[typing.Never] + field_sa_reported_by: RichTextField | list[typing.Never] + field_sa_description: RichTextField | list[typing.Never] + + class Project(Node): # type will be project_module, project_theme, or project_core field_project_machine_name: str diff --git a/scripts/utils.py b/scripts/utils.py new file mode 100644 index 00000000..a7ebea87 --- /dev/null +++ b/scripts/utils.py @@ -0,0 +1,32 @@ +import json + +from typings import drupal + + +def load_sa_advisory(file_path: str) -> drupal.Advisory: + """ + Loads a Drupal advisory from a json file stored on disk, making some adjustments + in the process to make it easier to work with + """ + with open(file_path) as f: + raw_advisory: drupal.AdvisoryRaw = json.load(f) + + # noinspection PyTypeChecker + # https://youtrack.jetbrains.com/issue/PY-58714/False-positive-TypedDict-has-missing-key-when-using-unpacking + sa_advisory: drupal.Advisory = { + **raw_advisory, + 'field_project': None, + 'field_sa_reported_by': {'format': '1', 'value': ''}, + 'field_sa_description': {'format': '1', 'value': ''}, + } + + if isinstance(raw_advisory['field_project'], dict): + sa_advisory['field_project'] = raw_advisory['field_project'] + + if isinstance(raw_advisory['field_sa_reported_by'], dict): + sa_advisory['field_sa_reported_by'] = raw_advisory['field_sa_reported_by'] + + if isinstance(raw_advisory['field_sa_description'], dict): + sa_advisory['field_sa_description'] = raw_advisory['field_sa_description'] + + return sa_advisory