From b2d9c753942703dc132021d1db7ddabde781e66c Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Sun, 6 Jul 2025 22:54:20 +0300 Subject: [PATCH 1/4] Modify NPM importer to support package-first mode #1936 * Update NPM importer to filter and process advisories relevant to the purl passed in the constructor * Update NPM v2 importer to filter and process advisories relevant to the purl passed in the constructor * Update NPM importer tests to test package-first mode * Update NPM v2 importer tests to test package-first mode Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/pipelines/npm_importer.py | 97 ++++++++++- .../pipelines/v2_importers/npm_importer.py | 94 ++++++++++- .../pipelines/test_npm_importer_pipeline.py | 151 +++++++++++++++++ .../test_npm_importer_pipeline_v2.py | 158 ++++++++++++++++++ 4 files changed, 488 insertions(+), 12 deletions(-) diff --git a/vulnerabilities/pipelines/npm_importer.py b/vulnerabilities/pipelines/npm_importer.py index 7b6d3aba2..b0ea05ea6 100644 --- a/vulnerabilities/pipelines/npm_importer.py +++ b/vulnerabilities/pipelines/npm_importer.py @@ -9,14 +9,19 @@ # Author: Navonil Das (@NavonilDas) +import json +import os +import tempfile from pathlib import Path from typing import Iterable import pytz +import requests from dateutil.parser import parse from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import NpmVersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -39,28 +44,88 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipeline): repo_url = "git+https://github.com/nodejs/security-wg" importer_name = "Npm Importer" + is_batch_run = True + + def __init__(self, *args, purl=None, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + NpmImporterPipeline.is_batch_run = False + if self.purl.type != "npm": + print(f"Warning: This importer handles NPM packages. Current PURL: {self.purl!s}") + @classmethod def steps(cls): - return ( + if not cls.is_batch_run: + return [ + cls.fetch_package_advisories, + cls.collect_and_store_advisories, + cls.import_new_advisories, + ] + + return [ cls.clone, cls.collect_and_store_advisories, cls.import_new_advisories, cls.clean_downloads, - ) + ] def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) + def fetch_package_advisories(self): + if not self.purl or self.purl.type != "npm": + return + + self.log(f"Fetching advisories for package {self.purl.name}") + + package_name = self.purl.name + + self.temp_dir = tempfile.mkdtemp() + self.package_advisories = [] + + api_url = "https://api.github.com/repos/nodejs/security-wg/contents/vuln/npm" + response = requests.get(api_url) + + if response.status_code != 200: + self.log(f"Failed to fetch advisories directory: {response.status_code}") + return + + for item in response.json(): + if item["type"] == "file" and item["name"].endswith(".json"): + file_url = item["download_url"] + try: + file_content = requests.get(file_url).json() + + if file_content.get("module_name") == package_name: + file_path = os.path.join(self.temp_dir, item["name"]) + with open(file_path, "w") as f: + json.dump(file_content, f) + self.package_advisories.append(file_path) + except Exception as e: + self.log(f"Error processing advisory file {item['name']}: {str(e)}") + + self.log(f"Found {len(self.package_advisories)} advisories for package {package_name}") + def advisories_count(self): - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - return sum(1 for _ in vuln_directory.glob("*.json")) + if NpmImporterPipeline.is_batch_run: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + return sum(1 for _ in vuln_directory.glob("*.json")) + else: + return len(getattr(self, "package_advisories", [])) def collect_advisories(self) -> Iterable[AdvisoryData]: - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + if NpmImporterPipeline.is_batch_run: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + for advisory in vuln_directory.glob("*.json"): + yield from self.to_advisory_data(advisory) + else: + if not hasattr(self, "package_advisories"): + return - for advisory in vuln_directory.glob("*.json"): - yield from self.to_advisory_data(advisory) + for advisory_path in self.package_advisories: + yield from self.to_advisory_data(Path(advisory_path)) def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: data = load_json(file) @@ -112,6 +177,11 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: affected_packages.append(self.get_affected_package(data, package_name)) advsisory_aliases = data.get("cves") or [] + if self.purl and self.purl.version: + affected_package = affected_packages[0] if affected_packages else None + if affected_package and not self._version_is_affected(affected_package): + return + for alias in advsisory_aliases: yield AdvisoryData( summary=build_description(summary=summary, description=description), @@ -122,6 +192,13 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: url=f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json", ) + def _version_is_affected(self, affected_package): + if not self.purl.version or not affected_package.affected_version_range: + return True + + purl_version = SemverVersion(self.purl.version) + return purl_version in affected_package.affected_version_range + def get_affected_package(self, data, package_name): affected_version_range = None unaffected_version_range = None @@ -164,5 +241,11 @@ def clean_downloads(self): self.log(f"Removing cloned repository") self.vcs_response.delete() + if hasattr(self, "temp_dir") and os.path.exists(self.temp_dir): + import shutil + + self.log(f"Removing temporary directory") + shutil.rmtree(self.temp_dir) + def on_failure(self): self.clean_downloads() diff --git a/vulnerabilities/pipelines/v2_importers/npm_importer.py b/vulnerabilities/pipelines/v2_importers/npm_importer.py index 67e2a4355..aa027d30b 100644 --- a/vulnerabilities/pipelines/v2_importers/npm_importer.py +++ b/vulnerabilities/pipelines/v2_importers/npm_importer.py @@ -9,14 +9,19 @@ # Author: Navonil Das (@NavonilDas) +import json +import os +import tempfile from pathlib import Path from typing import Iterable import pytz +import requests from dateutil.parser import parse from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import NpmVersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -42,8 +47,24 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipelineV2): repo_url = "git+https://github.com/nodejs/security-wg" unfurl_version_ranges = True + is_batch_run = True + + def __init__(self, *args, purl=None, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + NpmImporterPipeline.is_batch_run = False + if self.purl.type != "npm": + print(f"Warning: This importer handles NPM packages. Current PURL: {self.purl!s}") + @classmethod def steps(cls): + if not cls.is_batch_run: + return ( + cls.fetch_package_advisories, + cls.collect_and_store_advisories, + cls.clean_downloads, + ) return ( cls.clone, cls.collect_and_store_advisories, @@ -54,15 +75,60 @@ def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) + def fetch_package_advisories(self): + if not self.purl or self.purl.type != "npm": + return + + self.log(f"Fetching advisories for package {self.purl.name}") + + package_name = self.purl.name + + self.temp_dir = tempfile.mkdtemp() + self.package_advisories = [] + + api_url = "https://api.github.com/repos/nodejs/security-wg/contents/vuln/npm" + response = requests.get(api_url) + + if response.status_code != 200: + self.log(f"Failed to fetch advisories directory: {response.status_code}") + return + + for item in response.json(): + if item["type"] == "file" and item["name"].endswith(".json"): + file_url = item["download_url"] + try: + file_content = requests.get(file_url).json() + + if file_content.get("module_name") == package_name: + file_path = os.path.join(self.temp_dir, item["name"]) + with open(file_path, "w") as f: + json.dump(file_content, f) + self.package_advisories.append(file_path) + except Exception as e: + self.log(f"Error processing advisory file {item['name']}: {str(e)}") + + self.log(f"Found {len(self.package_advisories)} advisories for package {package_name}") + def advisories_count(self): - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - return sum(1 for _ in vuln_directory.glob("*.json")) + if NpmImporterPipeline.is_batch_run: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + return sum(1 for _ in vuln_directory.glob("*.json")) + else: + return len(getattr(self, "package_advisories", [])) def collect_advisories(self) -> Iterable[AdvisoryData]: - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + if NpmImporterPipeline.is_batch_run: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + for advisory in vuln_directory.glob("*.json"): + yield self.to_advisory_data(advisory) + else: + if not hasattr(self, "package_advisories"): + return - for advisory in vuln_directory.glob("*.json"): - yield self.to_advisory_data(advisory) + for advisory_path in self.package_advisories: + result = self.to_advisory_data(Path(advisory_path)) + if result: + yield result def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: if file.name == "index.json": @@ -121,6 +187,11 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: affected_packages.append(self.get_affected_package(data, package_name)) advsisory_aliases = data.get("cves") or [] + if self.purl and self.purl.version: + affected_package = affected_packages[0] if affected_packages else None + if affected_package and not self._version_is_affected(affected_package): + return + return AdvisoryData( advisory_id=f"npm-{id}", aliases=advsisory_aliases, @@ -132,6 +203,13 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: url=f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json", ) + def _version_is_affected(self, affected_package): + if not self.purl.version or not affected_package.affected_version_range: + return True + + purl_version = SemverVersion(self.purl.version) + return purl_version in affected_package.affected_version_range + def get_affected_package(self, data, package_name): affected_version_range = None unaffected_version_range = None @@ -174,5 +252,11 @@ def clean_downloads(self): self.log(f"Removing cloned repository") self.vcs_response.delete() + if hasattr(self, "temp_dir") and os.path.exists(self.temp_dir): + import shutil + + self.log(f"Removing temporary directory") + shutil.rmtree(self.temp_dir) + def on_failure(self): self.clean_downloads() diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py index bcfb83f62..5e3d661bd 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py @@ -12,6 +12,7 @@ import json import os from pathlib import Path +from unittest.mock import MagicMock from unittest.mock import patch from packageurl import PackageURL @@ -77,3 +78,153 @@ def test_npm_improver(mock_response): result.extend(inference) expected_file = os.path.join(TEST_DATA, f"npm-improver-expected.json") util_tests.check_results_against_json(result, expected_file) + + +@patch("requests.get") +def test_package_first_mode_valid_npm_package(mock_get): + mock_dir_response = MagicMock() + mock_dir_response.status_code = 200 + mock_dir_response.json.return_value = [ + { + "type": "file", + "name": "152.json", + "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", + } + ] + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + mock_file_response = MagicMock() + mock_file_response.json.return_value = sample_data + + mock_get.side_effect = [mock_dir_response, mock_file_response] + + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].aliases == ["CVE-2013-4116"] + assert len(advisories[0].affected_packages) == 1 + assert advisories[0].affected_packages[0].package.name == "npm" + + +@patch("requests.get") +def test_package_first_mode_unaffected_version(mock_get): + mock_dir_response = MagicMock() + mock_dir_response.status_code = 200 + mock_dir_response.json.return_value = [ + { + "type": "file", + "name": "152.json", + "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", + } + ] + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + mock_file_response = MagicMock() + mock_file_response.json.return_value = sample_data + + mock_get.side_effect = [mock_dir_response, mock_file_response] + + purl = PackageURL(type="npm", name="npm", version="1.4.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_invalid_package_type(mock_get): + purl = PackageURL(type="pypi", name="django", version="3.0.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + mock_get.assert_not_called() + + +@patch("requests.get") +def test_package_first_mode_package_not_found(mock_get): + mock_dir_response = MagicMock() + mock_dir_response.status_code = 200 + mock_dir_response.json.return_value = [ + { + "type": "file", + "name": "152.json", + "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", + } + ] + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + sample_data["module_name"] = "some-other-package" + + mock_file_response = MagicMock() + mock_file_response.json.return_value = sample_data + + mock_get.side_effect = [mock_dir_response, mock_file_response] + + purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_api_error(mock_get): + mock_error_response = MagicMock() + mock_error_response.status_code = 404 + + mock_get.return_value = mock_error_response + + purl = PackageURL(type="npm", name="npm", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_version_is_affected(): + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + + affected_package = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=NpmVersionRange( + constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) + ), + ) + + assert pipeline._version_is_affected(affected_package) == True + + pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") + assert pipeline._version_is_affected(affected_package) == False + + pipeline.purl = PackageURL(type="npm", name="npm") + assert pipeline._version_is_affected(affected_package) == True + + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py index 7941c9b69..325d87ea9 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py @@ -8,18 +8,26 @@ # import json +import os +from pathlib import Path from types import SimpleNamespace +from unittest.mock import MagicMock +from unittest.mock import patch import pytz from packageurl import PackageURL +from univers.version_constraint import VersionConstraint from univers.version_range import NpmVersionRange from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline from vulnerabilities.severity_systems import CVSSV2 from vulnerabilities.severity_systems import CVSSV3 +TEST_DATA = Path(__file__).parent.parent / "test_data" / "npm" + def test_clone(monkeypatch): import vulnerabilities.pipelines.v2_importers.npm_importer as npm_mod @@ -126,3 +134,153 @@ def test_get_affected_package_special_and_standard(): pkg2 = p.get_affected_package(data2, "pkg2") assert isinstance(pkg2.affected_version_range, NpmVersionRange) assert pkg2.fixed_version == SemverVersion("2.0.1") + + +@patch("requests.get") +def test_package_first_mode_valid_npm_package(mock_get): + mock_dir_response = MagicMock() + mock_dir_response.status_code = 200 + mock_dir_response.json.return_value = [ + { + "type": "file", + "name": "152.json", + "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", + } + ] + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + mock_file_response = MagicMock() + mock_file_response.json.return_value = sample_data + + mock_get.side_effect = [mock_dir_response, mock_file_response] + + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].aliases == ["CVE-2013-4116"] + assert len(advisories[0].affected_packages) == 1 + assert advisories[0].affected_packages[0].package.name == "npm" + + +@patch("requests.get") +def test_package_first_mode_unaffected_version(mock_get): + mock_dir_response = MagicMock() + mock_dir_response.status_code = 200 + mock_dir_response.json.return_value = [ + { + "type": "file", + "name": "152.json", + "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", + } + ] + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + mock_file_response = MagicMock() + mock_file_response.json.return_value = sample_data + + mock_get.side_effect = [mock_dir_response, mock_file_response] + + purl = PackageURL(type="npm", name="npm", version="1.4.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_invalid_package_type(mock_get): + purl = PackageURL(type="pypi", name="django", version="3.0.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + mock_get.assert_not_called() + + +@patch("requests.get") +def test_package_first_mode_package_not_found(mock_get): + mock_dir_response = MagicMock() + mock_dir_response.status_code = 200 + mock_dir_response.json.return_value = [ + { + "type": "file", + "name": "152.json", + "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", + } + ] + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + sample_data["module_name"] = "some-other-package" + + mock_file_response = MagicMock() + mock_file_response.json.return_value = sample_data + + mock_get.side_effect = [mock_dir_response, mock_file_response] + + purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_api_error(mock_get): + mock_error_response = MagicMock() + mock_error_response.status_code = 404 + + mock_get.return_value = mock_error_response + + purl = PackageURL(type="npm", name="npm", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_version_is_affected(): + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + + affected_package = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=NpmVersionRange( + constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) + ), + ) + + assert pipeline._version_is_affected(affected_package) == True + + pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") + assert pipeline._version_is_affected(affected_package) == False + + pipeline.purl = PackageURL(type="npm", name="npm") + assert pipeline._version_is_affected(affected_package) == True + + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True From caf13ba7668779e41c82d911761f9c296ab91bc1 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Sat, 12 Jul 2025 12:05:52 +0300 Subject: [PATCH 2/4] Modify NPM importer to support package-first mode using SCM approach #1936 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/pipelines/npm_importer.py | 81 ++++----------- .../pipelines/v2_importers/npm_importer.py | 76 ++++---------- .../pipelines/test_npm_importer_pipeline.py | 99 ++++++++----------- .../test_npm_importer_pipeline_v2.py | 95 ++++++------------ 4 files changed, 111 insertions(+), 240 deletions(-) diff --git a/vulnerabilities/pipelines/npm_importer.py b/vulnerabilities/pipelines/npm_importer.py index b0ea05ea6..d6c577e3d 100644 --- a/vulnerabilities/pipelines/npm_importer.py +++ b/vulnerabilities/pipelines/npm_importer.py @@ -56,13 +56,6 @@ def __init__(self, *args, purl=None, **kwargs): @classmethod def steps(cls): - if not cls.is_batch_run: - return [ - cls.fetch_package_advisories, - cls.collect_and_store_advisories, - cls.import_new_advisories, - ] - return [ cls.clone, cls.collect_and_store_advisories, @@ -74,58 +67,32 @@ def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) - def fetch_package_advisories(self): - if not self.purl or self.purl.type != "npm": - return - - self.log(f"Fetching advisories for package {self.purl.name}") - - package_name = self.purl.name - - self.temp_dir = tempfile.mkdtemp() - self.package_advisories = [] - - api_url = "https://api.github.com/repos/nodejs/security-wg/contents/vuln/npm" - response = requests.get(api_url) + def advisories_count(self): + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + return sum(1 for _ in vuln_directory.glob("*.json")) - if response.status_code != 200: - self.log(f"Failed to fetch advisories directory: {response.status_code}") - return + def collect_advisories(self) -> Iterable[AdvisoryData]: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + advisory_files = list(vuln_directory.glob("*.json")) - for item in response.json(): - if item["type"] == "file" and item["name"].endswith(".json"): - file_url = item["download_url"] + if not self.is_batch_run: + package_name = self.purl.name + filtered_files = [] + for advisory_file in advisory_files: try: - file_content = requests.get(file_url).json() - - if file_content.get("module_name") == package_name: - file_path = os.path.join(self.temp_dir, item["name"]) - with open(file_path, "w") as f: - json.dump(file_content, f) - self.package_advisories.append(file_path) + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_affected(affected_package): + filtered_files.append(advisory_file) except Exception as e: - self.log(f"Error processing advisory file {item['name']}: {str(e)}") + self.log(f"Error processing advisory file {advisory_file}: {str(e)}") + advisory_files = filtered_files - self.log(f"Found {len(self.package_advisories)} advisories for package {package_name}") - - def advisories_count(self): - if NpmImporterPipeline.is_batch_run: - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - return sum(1 for _ in vuln_directory.glob("*.json")) - else: - return len(getattr(self, "package_advisories", [])) - - def collect_advisories(self) -> Iterable[AdvisoryData]: - if NpmImporterPipeline.is_batch_run: - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - for advisory in vuln_directory.glob("*.json"): - yield from self.to_advisory_data(advisory) - else: - if not hasattr(self, "package_advisories"): - return - - for advisory_path in self.package_advisories: - yield from self.to_advisory_data(Path(advisory_path)) + for advisory in list(advisory_files): + for result in self.to_advisory_data(advisory): + if result: + yield result def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: data = load_json(file) @@ -241,11 +208,5 @@ def clean_downloads(self): self.log(f"Removing cloned repository") self.vcs_response.delete() - if hasattr(self, "temp_dir") and os.path.exists(self.temp_dir): - import shutil - - self.log(f"Removing temporary directory") - shutil.rmtree(self.temp_dir) - def on_failure(self): self.clean_downloads() diff --git a/vulnerabilities/pipelines/v2_importers/npm_importer.py b/vulnerabilities/pipelines/v2_importers/npm_importer.py index aa027d30b..3e509e63e 100644 --- a/vulnerabilities/pipelines/v2_importers/npm_importer.py +++ b/vulnerabilities/pipelines/v2_importers/npm_importer.py @@ -59,12 +59,6 @@ def __init__(self, *args, purl=None, **kwargs): @classmethod def steps(cls): - if not cls.is_batch_run: - return ( - cls.fetch_package_advisories, - cls.collect_and_store_advisories, - cls.clean_downloads, - ) return ( cls.clone, cls.collect_and_store_advisories, @@ -75,60 +69,32 @@ def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) - def fetch_package_advisories(self): - if not self.purl or self.purl.type != "npm": - return - - self.log(f"Fetching advisories for package {self.purl.name}") - - package_name = self.purl.name - - self.temp_dir = tempfile.mkdtemp() - self.package_advisories = [] - - api_url = "https://api.github.com/repos/nodejs/security-wg/contents/vuln/npm" - response = requests.get(api_url) + def advisories_count(self): + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + return sum(1 for _ in vuln_directory.glob("*.json")) - if response.status_code != 200: - self.log(f"Failed to fetch advisories directory: {response.status_code}") - return + def collect_advisories(self) -> Iterable[AdvisoryData]: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + advisory_files = list(vuln_directory.glob("*.json")) - for item in response.json(): - if item["type"] == "file" and item["name"].endswith(".json"): - file_url = item["download_url"] + if not self.is_batch_run: + package_name = self.purl.name + filtered_files = [] + for advisory_file in advisory_files: try: - file_content = requests.get(file_url).json() - - if file_content.get("module_name") == package_name: - file_path = os.path.join(self.temp_dir, item["name"]) - with open(file_path, "w") as f: - json.dump(file_content, f) - self.package_advisories.append(file_path) + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_affected(affected_package): + filtered_files.append(advisory_file) except Exception as e: - self.log(f"Error processing advisory file {item['name']}: {str(e)}") - - self.log(f"Found {len(self.package_advisories)} advisories for package {package_name}") - - def advisories_count(self): - if NpmImporterPipeline.is_batch_run: - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - return sum(1 for _ in vuln_directory.glob("*.json")) - else: - return len(getattr(self, "package_advisories", [])) - - def collect_advisories(self) -> Iterable[AdvisoryData]: - if NpmImporterPipeline.is_batch_run: - vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - for advisory in vuln_directory.glob("*.json"): - yield self.to_advisory_data(advisory) - else: - if not hasattr(self, "package_advisories"): - return + self.log(f"Error processing advisory file {advisory_file}: {str(e)}") + advisory_files = filtered_files - for advisory_path in self.package_advisories: - result = self.to_advisory_data(Path(advisory_path)) - if result: - yield result + for advisory in list(advisory_files): + result = self.to_advisory_data(advisory) + if result: + yield result def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: if file.name == "index.json": diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py index 5e3d661bd..22579bff1 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py @@ -12,7 +12,7 @@ import json import os from pathlib import Path -from unittest.mock import MagicMock +from types import SimpleNamespace from unittest.mock import patch from packageurl import PackageURL @@ -80,31 +80,23 @@ def test_npm_improver(mock_response): util_tests.check_results_against_json(result, expected_file) -@patch("requests.get") -def test_package_first_mode_valid_npm_package(mock_get): - mock_dir_response = MagicMock() - mock_dir_response.status_code = 200 - mock_dir_response.json.return_value = [ - { - "type": "file", - "name": "152.json", - "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", - } - ] +def test_package_first_mode_valid_npm_package(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") with open(npm_sample_file) as f: sample_data = json.load(f) - mock_file_response = MagicMock() - mock_file_response.json.return_value = sample_data + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) - mock_get.side_effect = [mock_dir_response, mock_file_response] + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="npm", version="1.2.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 1 @@ -113,59 +105,46 @@ def test_package_first_mode_valid_npm_package(mock_get): assert advisories[0].affected_packages[0].package.name == "npm" -@patch("requests.get") -def test_package_first_mode_unaffected_version(mock_get): - mock_dir_response = MagicMock() - mock_dir_response.status_code = 200 - mock_dir_response.json.return_value = [ - { - "type": "file", - "name": "152.json", - "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", - } - ] +def test_package_first_mode_unaffected_version(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") with open(npm_sample_file) as f: sample_data = json.load(f) - mock_file_response = MagicMock() - mock_file_response.json.return_value = sample_data + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) - mock_get.side_effect = [mock_dir_response, mock_file_response] + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="npm", version="1.4.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 -@patch("requests.get") -def test_package_first_mode_invalid_package_type(mock_get): +def test_package_first_mode_invalid_package_type(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + purl = PackageURL(type="pypi", name="django", version="3.0.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 - mock_get.assert_not_called() - - -@patch("requests.get") -def test_package_first_mode_package_not_found(mock_get): - mock_dir_response = MagicMock() - mock_dir_response.status_code = 200 - mock_dir_response.json.return_value = [ - { - "type": "file", - "name": "152.json", - "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", - } - ] + + +def test_package_first_mode_package_not_found(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") with open(npm_sample_file) as f: @@ -173,31 +152,27 @@ def test_package_first_mode_package_not_found(mock_get): sample_data["module_name"] = "some-other-package" - mock_file_response = MagicMock() - mock_file_response.json.return_value = sample_data + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) - mock_get.side_effect = [mock_dir_response, mock_file_response] + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 -@patch("requests.get") -def test_package_first_mode_api_error(mock_get): - mock_error_response = MagicMock() - mock_error_response.status_code = 404 - - mock_get.return_value = mock_error_response +def test_package_first_mode_missing_vuln_directory(tmp_path): + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="npm", version="1.0.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 @@ -228,3 +203,9 @@ def test_version_is_affected(): fixed_version=SemverVersion(string="1.3.3"), ) assert pipeline._version_is_affected(affected_package_no_range) == True + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py index 325d87ea9..7c0536419 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py @@ -66,8 +66,8 @@ def test_advisories_count_and_collect(tmp_path): (vuln_dir / "001.json").write_text(json.dumps({"id": "001"})) p = NpmImporterPipeline() p.vcs_response = SimpleNamespace(dest_dir=str(base), delete=lambda: None) - assert p.advisories_count() == 2 advisories = list(p.collect_advisories()) + assert p.advisories_count() == 2 # Should yield None for index.json and one AdvisoryData real = [a for a in advisories if isinstance(a, AdvisoryData)] assert len(real) == 1 @@ -136,31 +136,23 @@ def test_get_affected_package_special_and_standard(): assert pkg2.fixed_version == SemverVersion("2.0.1") -@patch("requests.get") -def test_package_first_mode_valid_npm_package(mock_get): - mock_dir_response = MagicMock() - mock_dir_response.status_code = 200 - mock_dir_response.json.return_value = [ - { - "type": "file", - "name": "152.json", - "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", - } - ] +def test_package_first_mode_valid_npm_package(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") with open(npm_sample_file) as f: sample_data = json.load(f) - mock_file_response = MagicMock() - mock_file_response.json.return_value = sample_data + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) - mock_get.side_effect = [mock_dir_response, mock_file_response] + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="npm", version="1.2.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 1 @@ -169,59 +161,46 @@ def test_package_first_mode_valid_npm_package(mock_get): assert advisories[0].affected_packages[0].package.name == "npm" -@patch("requests.get") -def test_package_first_mode_unaffected_version(mock_get): - mock_dir_response = MagicMock() - mock_dir_response.status_code = 200 - mock_dir_response.json.return_value = [ - { - "type": "file", - "name": "152.json", - "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", - } - ] +def test_package_first_mode_unaffected_version(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") with open(npm_sample_file) as f: sample_data = json.load(f) - mock_file_response = MagicMock() - mock_file_response.json.return_value = sample_data + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) - mock_get.side_effect = [mock_dir_response, mock_file_response] + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="npm", version="1.4.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 -@patch("requests.get") -def test_package_first_mode_invalid_package_type(mock_get): +def test_package_first_mode_invalid_package_type(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + purl = PackageURL(type="pypi", name="django", version="3.0.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 - mock_get.assert_not_called() -@patch("requests.get") -def test_package_first_mode_package_not_found(mock_get): - mock_dir_response = MagicMock() - mock_dir_response.status_code = 200 - mock_dir_response.json.return_value = [ - { - "type": "file", - "name": "152.json", - "download_url": "https://raw.githubusercontent.com/nodejs/security-wg/main/vuln/npm/152.json", - } - ] +def test_package_first_mode_package_not_found(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") with open(npm_sample_file) as f: @@ -229,31 +208,15 @@ def test_package_first_mode_package_not_found(mock_get): sample_data["module_name"] = "some-other-package" - mock_file_response = MagicMock() - mock_file_response.json.return_value = sample_data + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) - mock_get.side_effect = [mock_dir_response, mock_file_response] + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response - pipeline.fetch_package_advisories() - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 0 - - -@patch("requests.get") -def test_package_first_mode_api_error(mock_get): - mock_error_response = MagicMock() - mock_error_response.status_code = 404 - - mock_get.return_value = mock_error_response - - purl = PackageURL(type="npm", name="npm", version="1.0.0") - pipeline = NpmImporterPipeline(purl=purl) - - pipeline.fetch_package_advisories() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 From 80e2aaf84ca15113c2a7488eae7c3ee749996b3b Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Tue, 29 Jul 2025 17:45:57 +0300 Subject: [PATCH 3/4] Add NPM live importer #1936 * Add NPM live pipeline importer to filter advisories affecting a single PURL * Add tests for NPM live importer Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/pipelines/npm_importer.py | 56 +------- .../pipelines/v2_importers/npm_importer.py | 61 ++------ .../v2_importers/npm_live_importer.py | 92 ++++++++++++ .../pipelines/test_npm_importer_pipeline.py | 134 +---------------- .../test_npm_live_importer_pipeline_v2.py | 135 ++++++++++++++++++ 5 files changed, 242 insertions(+), 236 deletions(-) create mode 100644 vulnerabilities/pipelines/v2_importers/npm_live_importer.py create mode 100644 vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py diff --git a/vulnerabilities/pipelines/npm_importer.py b/vulnerabilities/pipelines/npm_importer.py index d6c577e3d..8bcc52759 100644 --- a/vulnerabilities/pipelines/npm_importer.py +++ b/vulnerabilities/pipelines/npm_importer.py @@ -9,19 +9,14 @@ # Author: Navonil Das (@NavonilDas) -import json -import os -import tempfile from pathlib import Path from typing import Iterable import pytz -import requests from dateutil.parser import parse from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import NpmVersionRange -from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -44,24 +39,14 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipeline): repo_url = "git+https://github.com/nodejs/security-wg" importer_name = "Npm Importer" - is_batch_run = True - - def __init__(self, *args, purl=None, **kwargs): - super().__init__(*args, **kwargs) - self.purl = purl - if self.purl: - NpmImporterPipeline.is_batch_run = False - if self.purl.type != "npm": - print(f"Warning: This importer handles NPM packages. Current PURL: {self.purl!s}") - @classmethod def steps(cls): - return [ + return ( cls.clone, cls.collect_and_store_advisories, cls.import_new_advisories, cls.clean_downloads, - ] + ) def clone(self): self.log(f"Cloning `{self.repo_url}`") @@ -73,26 +58,9 @@ def advisories_count(self): def collect_advisories(self) -> Iterable[AdvisoryData]: vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - advisory_files = list(vuln_directory.glob("*.json")) - - if not self.is_batch_run: - package_name = self.purl.name - filtered_files = [] - for advisory_file in advisory_files: - try: - data = load_json(advisory_file) - if data.get("module_name") == package_name: - affected_package = self.get_affected_package(data, package_name) - if not self.purl.version or self._version_is_affected(affected_package): - filtered_files.append(advisory_file) - except Exception as e: - self.log(f"Error processing advisory file {advisory_file}: {str(e)}") - advisory_files = filtered_files - - for advisory in list(advisory_files): - for result in self.to_advisory_data(advisory): - if result: - yield result + + for advisory in vuln_directory.glob("*.json"): + yield from self.to_advisory_data(advisory) def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: data = load_json(file) @@ -144,11 +112,6 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: affected_packages.append(self.get_affected_package(data, package_name)) advsisory_aliases = data.get("cves") or [] - if self.purl and self.purl.version: - affected_package = affected_packages[0] if affected_packages else None - if affected_package and not self._version_is_affected(affected_package): - return - for alias in advsisory_aliases: yield AdvisoryData( summary=build_description(summary=summary, description=description), @@ -159,13 +122,6 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: url=f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json", ) - def _version_is_affected(self, affected_package): - if not self.purl.version or not affected_package.affected_version_range: - return True - - purl_version = SemverVersion(self.purl.version) - return purl_version in affected_package.affected_version_range - def get_affected_package(self, data, package_name): affected_version_range = None unaffected_version_range = None @@ -209,4 +165,4 @@ def clean_downloads(self): self.vcs_response.delete() def on_failure(self): - self.clean_downloads() + self.clean_downloads() \ No newline at end of file diff --git a/vulnerabilities/pipelines/v2_importers/npm_importer.py b/vulnerabilities/pipelines/v2_importers/npm_importer.py index 3e509e63e..231528bbd 100644 --- a/vulnerabilities/pipelines/v2_importers/npm_importer.py +++ b/vulnerabilities/pipelines/v2_importers/npm_importer.py @@ -10,18 +10,14 @@ # Author: Navonil Das (@NavonilDas) import json -import os -import tempfile from pathlib import Path from typing import Iterable import pytz -import requests from dateutil.parser import parse from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import NpmVersionRange -from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -47,16 +43,6 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipelineV2): repo_url = "git+https://github.com/nodejs/security-wg" unfurl_version_ranges = True - is_batch_run = True - - def __init__(self, *args, purl=None, **kwargs): - super().__init__(*args, **kwargs) - self.purl = purl - if self.purl: - NpmImporterPipeline.is_batch_run = False - if self.purl.type != "npm": - print(f"Warning: This importer handles NPM packages. Current PURL: {self.purl!s}") - @classmethod def steps(cls): return ( @@ -75,32 +61,18 @@ def advisories_count(self): def collect_advisories(self) -> Iterable[AdvisoryData]: vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - advisory_files = list(vuln_directory.glob("*.json")) - - if not self.is_batch_run: - package_name = self.purl.name - filtered_files = [] - for advisory_file in advisory_files: - try: - data = load_json(advisory_file) - if data.get("module_name") == package_name: - affected_package = self.get_affected_package(data, package_name) - if not self.purl.version or self._version_is_affected(affected_package): - filtered_files.append(advisory_file) - except Exception as e: - self.log(f"Error processing advisory file {advisory_file}: {str(e)}") - advisory_files = filtered_files - - for advisory in list(advisory_files): - result = self.to_advisory_data(advisory) - if result: - yield result + + for advisory in vuln_directory.glob("*.json"): + yield self.to_advisory_data(advisory) def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: if file.name == "index.json": self.log(f"Skipping {file.name} file") return data = load_json(file) + advisory_text = None + with open(file) as f: + advisory_text = f.read() id = data.get("id") description = data.get("overview") or "" summary = data.get("title") or "" @@ -153,11 +125,6 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: affected_packages.append(self.get_affected_package(data, package_name)) advsisory_aliases = data.get("cves") or [] - if self.purl and self.purl.version: - affected_package = affected_packages[0] if affected_packages else None - if affected_package and not self._version_is_affected(affected_package): - return - return AdvisoryData( advisory_id=f"npm-{id}", aliases=advsisory_aliases, @@ -167,15 +134,9 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: references_v2=references, severities=severities, url=f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json", + original_advisory_text=advisory_text or json.dumps(data, indent=2, ensure_ascii=False), ) - def _version_is_affected(self, affected_package): - if not self.purl.version or not affected_package.affected_version_range: - return True - - purl_version = SemverVersion(self.purl.version) - return purl_version in affected_package.affected_version_range - def get_affected_package(self, data, package_name): affected_version_range = None unaffected_version_range = None @@ -218,11 +179,5 @@ def clean_downloads(self): self.log(f"Removing cloned repository") self.vcs_response.delete() - if hasattr(self, "temp_dir") and os.path.exists(self.temp_dir): - import shutil - - self.log(f"Removing temporary directory") - shutil.rmtree(self.temp_dir) - def on_failure(self): - self.clean_downloads() + self.clean_downloads() \ No newline at end of file diff --git a/vulnerabilities/pipelines/v2_importers/npm_live_importer.py b/vulnerabilities/pipelines/v2_importers/npm_live_importer.py new file mode 100644 index 000000000..06d660180 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/npm_live_importer.py @@ -0,0 +1,92 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from pathlib import Path +from typing import Iterable + +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline +from vulnerabilities.utils import load_json + + +class NpmLiveImporterPipeline(NpmImporterPipeline): + """ + Node.js Security Working Group importer pipeline + + Import advisories from nodejs security working group including node proper advisories and npm advisories for a single PURL. + """ + + pipeline_id = "nodejs_security_wg_live_importer" + supported_types = ["npm"] + spdx_license_expression = "MIT" + license_url = "https://github.com/nodejs/security-wg/blob/main/LICENSE.md" + repo_url = "git+https://github.com/nodejs/security-wg" + unfurl_version_ranges = True + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.clone, + cls.collect_and_store_advisories, + cls.clean_downloads, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for NpmLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + def collect_advisories(self) -> Iterable[AdvisoryData]: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + advisory_files = list(vuln_directory.glob("*.json")) + + package_name = self.purl.name + filtered_files = [] + for advisory_file in advisory_files: + try: + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_affected(affected_package): + filtered_files.append(advisory_file) + except Exception as e: + self.log(f"Error processing advisory file {advisory_file}: {str(e)}") + advisory_files = filtered_files + + for advisory in list(advisory_files): + result = self.to_advisory_data(advisory) + if result: + yield result + + def _version_is_affected(self, affected_package): + if not self.purl.version or not affected_package.affected_version_range: + return True + + purl_version = SemverVersion(self.purl.version) + return purl_version in affected_package.affected_version_range \ No newline at end of file diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py index 22579bff1..9845ff4c6 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py @@ -12,7 +12,6 @@ import json import os from pathlib import Path -from types import SimpleNamespace from unittest.mock import patch from packageurl import PackageURL @@ -77,135 +76,4 @@ def test_npm_improver(mock_response): inference = [data.to_dict() for data in improver.get_inferences(advisory)] result.extend(inference) expected_file = os.path.join(TEST_DATA, f"npm-improver-expected.json") - util_tests.check_results_against_json(result, expected_file) - - -def test_package_first_mode_valid_npm_package(tmp_path): - vuln_dir = tmp_path / "vuln" / "npm" - vuln_dir.mkdir(parents=True) - - npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") - with open(npm_sample_file) as f: - sample_data = json.load(f) - - advisory_file = vuln_dir / "152.json" - advisory_file.write_text(json.dumps(sample_data)) - - mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) - - purl = PackageURL(type="npm", name="npm", version="1.2.0") - pipeline = NpmImporterPipeline(purl=purl) - pipeline.vcs_response = mock_vcs_response - - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 1 - assert advisories[0].aliases == ["CVE-2013-4116"] - assert len(advisories[0].affected_packages) == 1 - assert advisories[0].affected_packages[0].package.name == "npm" - - -def test_package_first_mode_unaffected_version(tmp_path): - vuln_dir = tmp_path / "vuln" / "npm" - vuln_dir.mkdir(parents=True) - - npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") - with open(npm_sample_file) as f: - sample_data = json.load(f) - - advisory_file = vuln_dir / "152.json" - advisory_file.write_text(json.dumps(sample_data)) - - mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) - - purl = PackageURL(type="npm", name="npm", version="1.4.0") - pipeline = NpmImporterPipeline(purl=purl) - pipeline.vcs_response = mock_vcs_response - - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 0 - - -def test_package_first_mode_invalid_package_type(tmp_path): - vuln_dir = tmp_path / "vuln" / "npm" - vuln_dir.mkdir(parents=True) - - mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) - - purl = PackageURL(type="pypi", name="django", version="3.0.0") - pipeline = NpmImporterPipeline(purl=purl) - pipeline.vcs_response = mock_vcs_response - - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 0 - - -def test_package_first_mode_package_not_found(tmp_path): - vuln_dir = tmp_path / "vuln" / "npm" - vuln_dir.mkdir(parents=True) - - npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") - with open(npm_sample_file) as f: - sample_data = json.load(f) - - sample_data["module_name"] = "some-other-package" - - advisory_file = vuln_dir / "152.json" - advisory_file.write_text(json.dumps(sample_data)) - - mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) - - purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") - pipeline = NpmImporterPipeline(purl=purl) - pipeline.vcs_response = mock_vcs_response - - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 0 - - -def test_package_first_mode_missing_vuln_directory(tmp_path): - mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) - - purl = PackageURL(type="npm", name="npm", version="1.0.0") - pipeline = NpmImporterPipeline(purl=purl) - pipeline.vcs_response = mock_vcs_response - - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 0 - - -def test_version_is_affected(): - purl = PackageURL(type="npm", name="npm", version="1.2.0") - pipeline = NpmImporterPipeline(purl=purl) - - affected_package = AffectedPackage( - package=PackageURL(type="npm", name="npm"), - affected_version_range=NpmVersionRange( - constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) - ), - ) - - assert pipeline._version_is_affected(affected_package) == True - - pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") - assert pipeline._version_is_affected(affected_package) == False - - pipeline.purl = PackageURL(type="npm", name="npm") - assert pipeline._version_is_affected(affected_package) == True - - affected_package_no_range = AffectedPackage( - package=PackageURL(type="npm", name="npm"), - affected_version_range=None, - fixed_version=SemverVersion(string="1.3.3"), - ) - assert pipeline._version_is_affected(affected_package_no_range) == True - affected_package_no_range = AffectedPackage( - package=PackageURL(type="npm", name="npm"), - affected_version_range=None, - fixed_version=SemverVersion(string="1.3.3"), - ) - assert pipeline._version_is_affected(affected_package_no_range) == True + util_tests.check_results_against_json(result, expected_file) \ No newline at end of file diff --git a/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py new file mode 100644 index 000000000..350c4db65 --- /dev/null +++ b/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py @@ -0,0 +1,135 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +import os +from pathlib import Path +from types import SimpleNamespace + +from packageurl import PackageURL +from univers.version_constraint import VersionConstraint +from univers.version_range import NpmVersionRange +from univers.versions import SemverVersion + +from vulnerabilities.importer import AffectedPackage +from vulnerabilities.pipelines.v2_importers.npm_live_importer import NpmLiveImporterPipeline + +TEST_DATA = Path(__file__).parent.parent / "test_data" / "npm" + +def test_package_first_mode_valid_npm_package(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].aliases == ["CVE-2013-4116"] + assert len(advisories[0].affected_packages) == 1 + assert advisories[0].affected_packages[0].package.name == "npm" + + +def test_package_first_mode_unaffected_version(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.4.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_invalid_package_type(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="pypi", name="django", version="3.0.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_package_not_found(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + sample_data["module_name"] = "some-other-package" + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_version_is_affected(): + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + + affected_package = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=NpmVersionRange( + constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) + ), + ) + + assert pipeline._version_is_affected(affected_package) == True + + pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") + assert pipeline._version_is_affected(affected_package) == False + + pipeline.purl = PackageURL(type="npm", name="npm") + assert pipeline._version_is_affected(affected_package) == True + + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True From 5267b189e60524d7cc900c78b7b9b093c1be0f0f Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Tue, 29 Jul 2025 17:58:15 +0300 Subject: [PATCH 4/4] Update NPM Live Importer tests #1936 Signed-off-by: Michael Ehab Mikhail --- .../pipelines/test_npm_live_importer_pipeline_v2.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py index 350c4db65..650cb7b19 100644 --- a/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py +++ b/vulnerabilities/tests/pipelines/test_npm_live_importer_pipeline_v2.py @@ -9,6 +9,7 @@ import json import os +import pytest from pathlib import Path from types import SimpleNamespace @@ -39,6 +40,7 @@ def test_package_first_mode_valid_npm_package(tmp_path): pipeline = NpmLiveImporterPipeline(purl=purl) pipeline.vcs_response = mock_vcs_response + pipeline.get_purl_inputs() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 1 @@ -64,6 +66,7 @@ def test_package_first_mode_unaffected_version(tmp_path): pipeline = NpmLiveImporterPipeline(purl=purl) pipeline.vcs_response = mock_vcs_response + pipeline.get_purl_inputs() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 @@ -79,9 +82,8 @@ def test_package_first_mode_invalid_package_type(tmp_path): pipeline = NpmLiveImporterPipeline(purl=purl) pipeline.vcs_response = mock_vcs_response - advisories = list(pipeline.collect_advisories()) - - assert len(advisories) == 0 + with pytest.raises(ValueError): + pipeline.get_purl_inputs() def test_package_first_mode_package_not_found(tmp_path): @@ -103,6 +105,7 @@ def test_package_first_mode_package_not_found(tmp_path): pipeline = NpmLiveImporterPipeline(purl=purl) pipeline.vcs_response = mock_vcs_response + pipeline.get_purl_inputs() advisories = list(pipeline.collect_advisories()) assert len(advisories) == 0 @@ -111,6 +114,7 @@ def test_package_first_mode_package_not_found(tmp_path): def test_version_is_affected(): purl = PackageURL(type="npm", name="npm", version="1.2.0") pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() affected_package = AffectedPackage( package=PackageURL(type="npm", name="npm"),