Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
computer_package_version_rank as compute_version_rank_v2,
)
from vulnerabilities.pipelines.v2_improvers import enhance_with_exploitdb as exploitdb_v2
from vulnerabilities.pipelines.v2_improvers import enhance_with_github_poc
from vulnerabilities.pipelines.v2_improvers import enhance_with_kev as enhance_with_kev_v2
from vulnerabilities.pipelines.v2_improvers import (
enhance_with_metasploit as enhance_with_metasploit_v2,
Expand Down Expand Up @@ -70,5 +71,6 @@
compute_advisory_todo_v2.ComputeToDo,
unfurl_version_range_v2.UnfurlVersionRangePipeline,
compute_advisory_todo.ComputeToDo,
enhance_with_github_poc.GithubPocsImproverPipeline,
]
)
94 changes: 94 additions & 0 deletions vulnerabilities/pipelines/v2_improvers/enhance_with_github_poc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
from pathlib import Path

import dateparser
from aboutcode.pipeline import LoopProgress
from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.models import AdvisoryAlias
from vulnerabilities.models import AdvisoryExploit
from vulnerabilities.pipelines import VulnerableCodePipeline


class GithubPocsImproverPipeline(VulnerableCodePipeline):
pipeline_id = "enhance_with_github_poc"
repo_url = "https://github.com/nomi-sec/PoC-in-GitHub"

@classmethod
def steps(cls):
return (
cls.clone_repo,
cls.collect_and_store_exploits,
cls.clean_downloads,
)

def clone_repo(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def collect_and_store_exploits(self):
"""
Parse PoC JSON files, match them to advisories via aliases,
and create or update related exploit records.
"""

base_directory = Path(self.vcs_response.dest_dir)
json_files = list(base_directory.rglob("**/*.json"))
exploits_count = len(json_files)
self.log(f"Enhancing the vulnerability with {exploits_count:,d} exploit records")
progress = LoopProgress(total_iterations=exploits_count, logger=self.log)
for file_path in progress.iter(json_files):
with open(file_path, "r") as f:
try:
exploits_data = json.load(f)
except json.JSONDecodeError:
self.log(f"Invalid JSON in {file_path}, skipping.")
continue

filename = file_path.stem.strip()
advisories = set()

try:
if alias := AdvisoryAlias.objects.get(alias=filename):
for adv in alias.advisories.all():
advisories.add(adv)
except AdvisoryAlias.DoesNotExist:
self.log(f"Advisory {filename} not found.")
continue

for advisory in advisories:
for exploit_data in exploits_data:
exploit_repo_url = exploit_data.get("html_url")
if not exploit_repo_url:
continue

AdvisoryExploit.objects.update_or_create(
advisory=advisory,
data_source="GitHub-PoC",
source_url=exploit_repo_url,
defaults={
"description": exploit_data.get("description"),
"source_date_published": dateparser.parse(
exploit_data.get("created_at")
),
},
)

self.log(f"Successfully added {exploits_count:,d} exploit advisory")

def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
61 changes: 61 additions & 0 deletions vulnerabilities/pipelines/v2_improvers/github_poc_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from github import Github
from vulnerabilities.models import AdvisoryAlias, AdvisoryExploit, AdvisoryV2
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerablecode.settings import env

GITHUB_TOKEN = env.str("GITHUB_TOKEN")

class GitHubPocImproverPipeline(VulnerableCodePipeline):
"""
Pipeline to collect GitHub PoCs for vulnerabilities.
"""

pipeline_id = "collect_poc"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.github = Github(login_or_token=GITHUB_TOKEN)

@classmethod
def steps(cls):
return (
cls.collect_and_store_poc_results,
)

def search_github_pocs(self, cve_id):
"""Search for PoCs on GitHub for each CVE"""
self.log(f"Searching GitHub for PoCs for {cve_id}")

query = f'"{cve_id}" PoC OR exploit OR "proof of concept"'
return self.github.search_repositories(query)

def collect_and_store_poc_results(self):
"""Store PoC results in the database"""
self.log("Storing PoC results in database...")
for advisory_alias in reversed(AdvisoryAlias.objects.filter(alias__startswith="CVE")):
repositories = self.search_github_pocs(advisory_alias.alias)

if not repositories:
continue

for repository in repositories:
for advisory in advisory_alias.advisories.all():
AdvisoryExploit.objects.update_or_create(
advisory=advisory,
data_source="GitHub POC",
defaults={
"description": repository.description,
"notes": str(repository),
"platform": "github",
},
)
print(repository)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from datetime import datetime
from unittest import mock
from unittest.mock import MagicMock

import pytest

from vulnerabilities.models import AdvisoryAlias
from vulnerabilities.models import AdvisoryExploit
from vulnerabilities.models import AdvisoryV2
from vulnerabilities.pipelines.v2_improvers.enhance_with_github_poc import (
GithubPocsImproverPipeline,
)

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

TEST_REPO_DIR = os.path.join(BASE_DIR, "../../test_data/github_poc")


@pytest.mark.django_db
@mock.patch("vulnerabilities.pipelines.v2_improvers.enhance_with_github_poc.fetch_via_vcs")
def test_github_poc_db_improver(mock_fetch_via_vcs):
mock_vcs = MagicMock()
mock_vcs.dest_dir = TEST_REPO_DIR
mock_vcs.delete = MagicMock()
mock_fetch_via_vcs.return_value = mock_vcs

adv1 = AdvisoryV2.objects.create(
advisory_id="VCIO-123-0001",
datasource_id="ds",
avid="ds/VCIO-123-0001",
unique_content_id="sgsdg45",
url="https://test.com",
date_collected=datetime.now(),
)
adv2 = AdvisoryV2.objects.create(
advisory_id="VCIO-123-1002",
datasource_id="ds",
avid="ds/VCIO-123-1002",
unique_content_id="6hd4d6f",
url="https://test.com",
date_collected=datetime.now(),
)
adv3 = AdvisoryV2.objects.create(
advisory_id="VCIO-123-1003",
datasource_id="ds",
avid="ds/VCIO-123-1003",
unique_content_id="sd6h4sh",
url="https://test.com",
date_collected=datetime.now(),
)

alias1 = AdvisoryAlias.objects.create(alias="CVE-2022-0236")
alias2 = AdvisoryAlias.objects.create(alias="CVE-2025-0108")
alias3 = AdvisoryAlias.objects.create(alias="CVE-2025-0309")
adv1.aliases.add(alias1)
adv2.aliases.add(alias2)
adv3.aliases.add(alias3)

improver = GithubPocsImproverPipeline()
improver.execute()

assert len(AdvisoryExploit.objects.all()) == 10
exploit = AdvisoryExploit.objects.first()
assert exploit.data_source == "GitHub-PoC"
assert exploit.source_url == "https://github.com/iSee857/CVE-2025-0108-PoC"
66 changes: 66 additions & 0 deletions vulnerabilities/tests/test_data/github_poc/2022/CVE-2022-0236.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[
{
"id": 448514056,
"name": "CVE-2022-0236",
"full_name": "qurbat\/CVE-2022-0236",
"owner": {
"login": "qurbat",
"id": 37518297,
"avatar_url": "https:\/\/avatars.githubusercontent.com\/u\/37518297?v=4",
"html_url": "https:\/\/github.com\/qurbat",
"user_view_type": "public"
},
"html_url": "https:\/\/github.com\/qurbat\/CVE-2022-0236",
"description": "Proof of concept for unauthenticated sensitive data disclosure affecting the wp-import-export WordPress plugin (CVE-2022-0236)",
"fork": false,
"created_at": "2022-01-16T09:52:28Z",
"updated_at": "2023-01-28T03:56:57Z",
"pushed_at": "2022-01-18T17:14:53Z",
"stargazers_count": 3,
"watchers_count": 3,
"has_discussions": false,
"forks_count": 3,
"allow_forking": true,
"is_template": false,
"web_commit_signoff_required": false,
"topics": [
"wordpress-security"
],
"visibility": "public",
"forks": 3,
"watchers": 3,
"score": 0,
"subscribers_count": 1
},
{
"id": 448893968,
"name": "CVE-2022-0236",
"full_name": "xiska62314\/CVE-2022-0236",
"owner": {
"login": "xiska62314",
"id": 97891523,
"avatar_url": "https:\/\/avatars.githubusercontent.com\/u\/97891523?v=4",
"html_url": "https:\/\/github.com\/xiska62314",
"user_view_type": "public"
},
"html_url": "https:\/\/github.com\/xiska62314\/CVE-2022-0236",
"description": "CVE-2022-0236",
"fork": false,
"created_at": "2022-01-17T12:56:19Z",
"updated_at": "2022-01-17T12:56:19Z",
"pushed_at": "2022-01-17T12:56:20Z",
"stargazers_count": 0,
"watchers_count": 0,
"has_discussions": false,
"forks_count": 0,
"allow_forking": true,
"is_template": false,
"web_commit_signoff_required": false,
"topics": [],
"visibility": "public",
"forks": 0,
"watchers": 0,
"score": 0,
"subscribers_count": 1
}
]
Loading
Loading