-
-
Notifications
You must be signed in to change notification settings - Fork 517
Floss Capa Refactor #2933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Floss Capa Refactor #2933
Changes from all commits
151d4fe
0cc1b9e
8a811d5
8aaf8d0
9e65c97
d47fc8d
6a4ac4a
e3a501d
841d224
535abaf
414fef8
3c6963f
6a1b397
a1e8f69
c1c6033
1913e83
4c832d7
fffdb10
57977c0
e56ab56
42b980d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,40 +1,165 @@ | ||
| # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
| # See the file 'LICENSE' for copying permission. | ||
| from typing import Dict | ||
|
|
||
| from api_app.analyzers_manager.classes import DockerBasedAnalyzer, FileAnalyzer | ||
| import json | ||
| import logging | ||
| import os | ||
| import shutil | ||
| import subprocess | ||
| from pathlib import Path | ||
| from shlex import quote | ||
|
|
||
| import requests | ||
| from django.conf import settings | ||
|
|
||
| class CapaInfo(FileAnalyzer, DockerBasedAnalyzer): | ||
| name: str = "Capa" | ||
| url: str = "http://malware_tools_analyzers:4002/capa" | ||
| # interval between http request polling | ||
| poll_distance: int = 10 | ||
| # http request polling max number of tries | ||
| max_tries: int = 60 | ||
| # here, max_tries * poll_distance = 10 minutes | ||
| timeout: int = 60 * 9 | ||
| # whereas subprocess timeout is kept as 60 * 9 = 9 minutes | ||
| from api_app.analyzers_manager.classes import FileAnalyzer | ||
| from api_app.analyzers_manager.exceptions import AnalyzerRunException | ||
| from api_app.analyzers_manager.models import PythonModule | ||
| from api_app.mixins import RulesUtiliyMixin | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| BASE_LOCATION = f"{settings.MEDIA_ROOT}/capa" | ||
| RULES_LOCATION = f"{BASE_LOCATION}/capa-rules" | ||
| SIGNATURE_LOCATION = f"{BASE_LOCATION}/sigs" | ||
| RULES_FILE = f"{RULES_LOCATION}/capa_rules.zip" | ||
| RULES_URL = "https://github.com/mandiant/capa-rules/archive/refs/tags/" | ||
|
|
||
|
|
||
| class CapaInfo(FileAnalyzer, RulesUtiliyMixin): | ||
| shellcode: bool | ||
| arch: str | ||
| timeout: float = 15 | ||
| force_pull_signatures: bool = False | ||
|
|
||
| @classmethod | ||
| def _download_signatures(cls) -> None: | ||
| logger.info(f"Downloading signatures at {SIGNATURE_LOCATION} now") | ||
|
|
||
| if os.path.exists(SIGNATURE_LOCATION): | ||
| logger.info(f"Removing existing signatures at {SIGNATURE_LOCATION}") | ||
| shutil.rmtree(SIGNATURE_LOCATION) | ||
|
|
||
| os.makedirs(SIGNATURE_LOCATION) | ||
| logger.info(f"Created fresh signatures directory at {SIGNATURE_LOCATION}") | ||
|
|
||
| signatures_url = "https://api.github.com/repos/mandiant/capa/contents/sigs" | ||
| try: | ||
| response = requests.get(signatures_url) | ||
| signatures_list = response.json() | ||
|
|
||
| for signature in signatures_list: | ||
|
|
||
| filename = signature["name"] | ||
| download_url = signature["download_url"] | ||
|
|
||
| signature_file_path = os.path.join(SIGNATURE_LOCATION, filename) | ||
|
|
||
| sig_content = requests.get(download_url, stream=True) | ||
| with open(signature_file_path, mode="wb") as file: | ||
| for chunk in sig_content.iter_content(chunk_size=10 * 1024): | ||
| file.write(chunk) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to download signature: {e}") | ||
| raise AnalyzerRunException("Failed to update signatures") | ||
| logger.info("Successfully updated signatures") | ||
|
|
||
| @classmethod | ||
| def update(cls, anayzer_module: PythonModule) -> bool: | ||
| try: | ||
| logger.info("Updating capa rules") | ||
| response = requests.get( | ||
| "https://api.github.com/repos/mandiant/capa-rules/releases/latest" | ||
| ) | ||
| latest_version = response.json()["tag_name"] | ||
mlodic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| capa_rules_download_url = RULES_URL + latest_version + ".zip" | ||
|
|
||
| cls._download_rules( | ||
| rule_set_download_url=capa_rules_download_url, | ||
| rule_set_directory=RULES_LOCATION, | ||
| rule_file_path=RULES_FILE, | ||
| latest_version=latest_version, | ||
| analyzer_module=anayzer_module, | ||
| ) | ||
|
|
||
| cls._unzip(Path(RULES_FILE)) | ||
|
|
||
| logger.info("Successfully updated capa rules") | ||
|
|
||
| return True | ||
|
|
||
| def config(self, runtime_configuration: Dict): | ||
| super().config(runtime_configuration) | ||
| self.args = [] | ||
| if self.arch != "64": | ||
| self.arch = "32" | ||
| if self.shellcode: | ||
| self.args.append("-f") | ||
| self.args.append("sc" + self.arch) | ||
| except Exception as e: | ||
| logger.error(f"Failed to update capa rules with error: {e}") | ||
|
|
||
| return False | ||
|
|
||
| def run(self): | ||
| # get binary | ||
| binary = self.read_file_bytes() | ||
| # make request data | ||
| fname = str(self.filename).replace("/", "_").replace(" ", "_") | ||
| args = [f"@{fname}", *self.args] | ||
| req_data = {"args": args, "timeout": self.timeout} | ||
| req_files = {fname: binary} | ||
|
|
||
| return self._docker_run(req_data, req_files) | ||
| try: | ||
|
|
||
| response = requests.get( | ||
| "https://api.github.com/repos/mandiant/capa-rules/releases/latest" | ||
| ) | ||
| latest_version = response.json()["tag_name"] | ||
|
|
||
| capa_analyzer_module = self.python_module | ||
|
|
||
| update_status = ( | ||
| True | ||
| if self._check_if_latest_version(latest_version, capa_analyzer_module) | ||
| else self.update(capa_analyzer_module) | ||
| ) | ||
|
|
||
| if self.force_pull_signatures or not os.path.isdir(SIGNATURE_LOCATION): | ||
| self._download_signatures() | ||
|
|
||
| if not (os.path.isdir(RULES_LOCATION)) and not update_status: | ||
|
|
||
| raise AnalyzerRunException("Couldn't update capa rules") | ||
|
|
||
| command: list[str] = ["/usr/local/bin/capa", "--quiet", "--json"] | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| shell_code_arch = "sc64" if self.arch == "64" else "sc32" | ||
| if self.shellcode: | ||
| command.append("-f") | ||
| command.append(shell_code_arch) | ||
|
|
||
| # Setting default capa-rules path | ||
| command.append("-r") | ||
| command.append(RULES_LOCATION) | ||
|
|
||
| # Setting default signatures location | ||
| command.append("-s") | ||
| command.append(SIGNATURE_LOCATION) | ||
|
|
||
| command.append(quote(self.filepath)) | ||
|
|
||
| logger.info( | ||
| f"Starting CAPA analysis for {self.filename} with hash: {self.md5} and command: {command}" | ||
| ) | ||
|
|
||
| process: subprocess.CompletedProcess = subprocess.run( | ||
| command, | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| capture_output=True, | ||
| text=True, | ||
| timeout=self.timeout, | ||
| check=True, | ||
| ) | ||
|
|
||
| result = json.loads(process.stdout) | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| result["command_executed"] = command | ||
| result["rules_version"] = latest_version | ||
|
|
||
| logger.info( | ||
| f"CAPA analysis successfully completed for file: {self.filename} with hash {self.md5}" | ||
| ) | ||
|
|
||
| except subprocess.CalledProcessError as e: | ||
| stderr = e.stderr | ||
| logger.info( | ||
| f"Capa Info failed to run for {self.filename} with hash: {self.md5} with command {e}" | ||
| ) | ||
| raise AnalyzerRunException( | ||
| f" Analyzer for {self.filename} with hash: {self.md5} failed with error: {stderr}" | ||
| ) | ||
|
|
||
| return result | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| # Generated by Django 4.2.17 on 2025-07-24 14:57 | ||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| def migrate(apps, schema_editor): | ||
| PythonModule = apps.get_model("api_app", "PythonModule") | ||
| Parameter = apps.get_model("api_app", "Parameter") | ||
| PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
| CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule") | ||
| AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") | ||
|
|
||
| pm = PythonModule.objects.get( | ||
| module="capa_info.CapaInfo", | ||
| base_path="api_app.analyzers_manager.file_analyzers", | ||
| ) | ||
|
|
||
| new_crontab, created = CrontabSchedule.objects.get_or_create( | ||
| minute="0", | ||
| hour="0", | ||
| day_of_week="*", | ||
| day_of_month="*", | ||
| month_of_year="*", | ||
| timezone="UTC", | ||
| ) | ||
| if created: | ||
| pm.update_schedule = new_crontab | ||
| pm.full_clean() | ||
| pm.save() | ||
|
|
||
| AnalyzerConfig.objects.filter(python_module=pm).update(soft_time_limit=1800) | ||
| AnalyzerConfig.objects.filter(python_module=pm).update(docker_based=False) | ||
|
|
||
| p1 = Parameter( | ||
| name="timeout", | ||
| type="float", | ||
| description="Duration in seconds for which intelowl waits for capa to return results. Default is set to 15 seconds.", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I am missing something, but have you set the default of the new parameter?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, I've not set the default for this one. No worries I'll update and add it. :)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perfect, thank you!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has this been updated?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I've updated this in |
||
| is_secret=False, | ||
| required=False, | ||
| python_module=pm, | ||
| ) | ||
|
|
||
| p2 = Parameter( | ||
| name="force_pull_signatures", | ||
| type="bool", | ||
| description="Force download signatures from flare-capa github repository", | ||
| is_secret=False, | ||
| required=False, | ||
| python_module=pm, | ||
| ) | ||
|
|
||
| p1.full_clean() | ||
| p1.save() | ||
|
|
||
| p2.full_clean() | ||
| p2.save() | ||
|
|
||
| analyzer_configs = AnalyzerConfig.objects.filter(python_module=pm) | ||
|
|
||
| plugin_config_to_create = [ | ||
| PluginConfig(analyzer_config=config, parameter=p1, value=15) | ||
| for config in analyzer_configs | ||
| ] | ||
|
|
||
| PluginConfig.objects.bulk_create(plugin_config_to_create) | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("analyzers_manager", "0167_analyzerrulesfileversion"), | ||
| ] | ||
|
|
||
| operations = [migrations.RunPython(migrate, migrations.RunPython.noop)] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # Generated by Django 4.2.25 on 2025-11-03 10:37 | ||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| def migrate(apps, schema_editor): | ||
| PythonModule = apps.get_model("api_app", "PythonModule") | ||
| AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") | ||
|
|
||
| guarddog_file_module = PythonModule.objects.get( | ||
| module="guarddog_file.GuardDogFile", | ||
| base_path="api_app.analyzers_manager.file_analyzers", | ||
| ) | ||
|
|
||
| guarddog_generic_module = PythonModule.objects.get( | ||
| module="guarddog_generic.GuardDogGeneric", | ||
| base_path="api_app.analyzers_manager.observable_analyzers", | ||
| ) | ||
|
|
||
| AnalyzerConfig.objects.filter(python_module=guarddog_file_module).update( | ||
| disabled=True | ||
| ) | ||
| AnalyzerConfig.objects.filter(python_module=guarddog_generic_module).update( | ||
| disabled=True | ||
| ) | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("analyzers_manager", "0168_update_capa"), | ||
| ] | ||
|
|
||
| operations = [migrations.RunPython(migrate, migrations.RunPython.noop)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the "signatures" are less important than the rules ones. These are almost never updated while the "rules" are are updated often. Plus, most of the time, we don't want these signatures to execute either cause it would slow the Capa execution. The rules are always necessary because they are the core part of the tool while these one could not be necessary. Because of that, I would not re-update them once they are here, like you already do. But we need another additional parameter for the user to enable them explicitly otherwise it would be better if these signatures would be disabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reference: https://github.com/mandiant/capa/tree/master/sigs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding your point, Actually I've tried executing the flare-capa without the signatures but it threw an error, when only executed with rules. So, I feel the signatures are necessary for it's execution.
Though, I can definitely make changes in the code that the signatures are only downloaded once or updated on-demand by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the changes such that signatures are only downloaded the first or whenever
force_pull_signaturesis set to True.