Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 154 additions & 29 deletions api_app/analyzers_manager/file_analyzers/capa_info.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,165 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.
from typing import Dict

from api_app.analyzers_manager.classes import DockerBasedAnalyzer, FileAnalyzer
import json
import logging
import os
import shutil
import subprocess
from pathlib import Path
from shlex import quote

import requests
from django.conf import settings

class CapaInfo(FileAnalyzer, DockerBasedAnalyzer):
name: str = "Capa"
url: str = "http://malware_tools_analyzers:4002/capa"
# interval between http request polling
poll_distance: int = 10
# http request polling max number of tries
max_tries: int = 60
# here, max_tries * poll_distance = 10 minutes
timeout: int = 60 * 9
# whereas subprocess timeout is kept as 60 * 9 = 9 minutes
from api_app.analyzers_manager.classes import FileAnalyzer
from api_app.analyzers_manager.exceptions import AnalyzerRunException
from api_app.analyzers_manager.models import PythonModule
from api_app.mixins import RulesUtiliyMixin

logger = logging.getLogger(__name__)

BASE_LOCATION = f"{settings.MEDIA_ROOT}/capa"
RULES_LOCATION = f"{BASE_LOCATION}/capa-rules"
SIGNATURE_LOCATION = f"{BASE_LOCATION}/sigs"
RULES_FILE = f"{RULES_LOCATION}/capa_rules.zip"
RULES_URL = "https://github.com/mandiant/capa-rules/archive/refs/tags/"


class CapaInfo(FileAnalyzer, RulesUtiliyMixin):
shellcode: bool
arch: str
timeout: float = 15
force_pull_signatures: bool = False

@classmethod
def _download_signatures(cls) -> None:
logger.info(f"Downloading signatures at {SIGNATURE_LOCATION} now")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the "signatures" are less important than the rules ones. These are almost never updated while the "rules" are are updated often. Plus, most of the time, we don't want these signatures to execute either cause it would slow the Capa execution. The rules are always necessary because they are the core part of the tool while these one could not be necessary. Because of that, I would not re-update them once they are here, like you already do. But we need another additional parameter for the user to enable them explicitly otherwise it would be better if these signatures would be disabled by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus, most of the time, we don't want these signatures to execute either cause it would slow the Capa execution.

Regarding your point, Actually I've tried executing the flare-capa without the signatures but it threw an error, when only executed with rules. So, I feel the signatures are necessary for it's execution.

Though, I can definitely make changes in the code that the signatures are only downloaded once or updated on-demand by the user.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the changes such that signatures are only downloaded the first or whenever force_pull_signatures is set to True.


if os.path.exists(SIGNATURE_LOCATION):
logger.info(f"Removing existing signatures at {SIGNATURE_LOCATION}")
shutil.rmtree(SIGNATURE_LOCATION)

os.makedirs(SIGNATURE_LOCATION)
logger.info(f"Created fresh signatures directory at {SIGNATURE_LOCATION}")

signatures_url = "https://api.github.com/repos/mandiant/capa/contents/sigs"
try:
response = requests.get(signatures_url)
signatures_list = response.json()

for signature in signatures_list:

filename = signature["name"]
download_url = signature["download_url"]

signature_file_path = os.path.join(SIGNATURE_LOCATION, filename)

sig_content = requests.get(download_url, stream=True)
with open(signature_file_path, mode="wb") as file:
for chunk in sig_content.iter_content(chunk_size=10 * 1024):
file.write(chunk)

except Exception as e:
logger.error(f"Failed to download signature: {e}")
raise AnalyzerRunException("Failed to update signatures")
logger.info("Successfully updated signatures")

@classmethod
def update(cls, anayzer_module: PythonModule) -> bool:
try:
logger.info("Updating capa rules")
response = requests.get(
"https://api.github.com/repos/mandiant/capa-rules/releases/latest"
)
latest_version = response.json()["tag_name"]
capa_rules_download_url = RULES_URL + latest_version + ".zip"

cls._download_rules(
rule_set_download_url=capa_rules_download_url,
rule_set_directory=RULES_LOCATION,
rule_file_path=RULES_FILE,
latest_version=latest_version,
analyzer_module=anayzer_module,
)

cls._unzip(Path(RULES_FILE))

logger.info("Successfully updated capa rules")

return True

def config(self, runtime_configuration: Dict):
super().config(runtime_configuration)
self.args = []
if self.arch != "64":
self.arch = "32"
if self.shellcode:
self.args.append("-f")
self.args.append("sc" + self.arch)
except Exception as e:
logger.error(f"Failed to update capa rules with error: {e}")

return False

def run(self):
# get binary
binary = self.read_file_bytes()
# make request data
fname = str(self.filename).replace("/", "_").replace(" ", "_")
args = [f"@{fname}", *self.args]
req_data = {"args": args, "timeout": self.timeout}
req_files = {fname: binary}

return self._docker_run(req_data, req_files)
try:

response = requests.get(
"https://api.github.com/repos/mandiant/capa-rules/releases/latest"
)
latest_version = response.json()["tag_name"]

capa_analyzer_module = self.python_module

update_status = (
True
if self._check_if_latest_version(latest_version, capa_analyzer_module)
else self.update(capa_analyzer_module)
)

if self.force_pull_signatures or not os.path.isdir(SIGNATURE_LOCATION):
self._download_signatures()

if not (os.path.isdir(RULES_LOCATION)) and not update_status:

raise AnalyzerRunException("Couldn't update capa rules")

command: list[str] = ["/usr/local/bin/capa", "--quiet", "--json"]
shell_code_arch = "sc64" if self.arch == "64" else "sc32"
if self.shellcode:
command.append("-f")
command.append(shell_code_arch)

# Setting default capa-rules path
command.append("-r")
command.append(RULES_LOCATION)

# Setting default signatures location
command.append("-s")
command.append(SIGNATURE_LOCATION)

command.append(quote(self.filepath))

logger.info(
f"Starting CAPA analysis for {self.filename} with hash: {self.md5} and command: {command}"
)

process: subprocess.CompletedProcess = subprocess.run(
command,
capture_output=True,
text=True,
timeout=self.timeout,
check=True,
)

result = json.loads(process.stdout)
result["command_executed"] = command
result["rules_version"] = latest_version

logger.info(
f"CAPA analysis successfully completed for file: {self.filename} with hash {self.md5}"
)

except subprocess.CalledProcessError as e:
stderr = e.stderr
logger.info(
f"Capa Info failed to run for {self.filename} with hash: {self.md5} with command {e}"
)
raise AnalyzerRunException(
f" Analyzer for {self.filename} with hash: {self.md5} failed with error: {stderr}"
)

return result
47 changes: 30 additions & 17 deletions api_app/analyzers_manager/file_analyzers/floss.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

from json import dumps as json_dumps
import logging
import subprocess
from json import dumps, loads
from shlex import quote

from api_app.analyzers_manager.classes import DockerBasedAnalyzer, FileAnalyzer
from api_app.analyzers_manager.exceptions import AnalyzerRunException

logger = logging.getLogger(__name__)


class Floss(FileAnalyzer, DockerBasedAnalyzer):
name: str = "Floss"
url: str = "http://malware_tools_analyzers:4002/floss"
ranking_url: str = "http://malware_tools_analyzers:4002/stringsifter"
url: str = "http://malware_tools_analyzers:4002/stringsifter"
# interval between http request polling
poll_distance: int = 10
# http request polling max number of tries
Expand All @@ -29,30 +33,39 @@ def update(cls) -> bool:
pass

def run(self):
# get binary
binary = self.read_file_bytes()
# make request data
fname = str(self.filename).replace("/", "_").replace(" ", "_")
# From floss v3 there is prompt that can be overcome
# by using the flag --no static.
# We can lose static strings considering that we can easily
# retrieve them with more simple tools
args = [f"@{fname}", "--json", "--no", "static"]
req_data = {"args": args, "timeout": self.timeout}
req_files = {fname: binary}
result = self._docker_run(req_data, req_files)
if not isinstance(result, dict):
try:
process: subprocess.CompletedProcess = subprocess.run(
[
"/usr/local/bin/floss",
"--json",
"--no",
"static",
"--",
quote(self.filepath),
],
capture_output=True,
text=True,
check=True,
)

result = loads(process.stdout)

except subprocess.CalledProcessError as e:
stderr = e.stderr
logger.info(f"Floss failed to run for {self.filename} with command {e}")
raise AnalyzerRunException(
f"result from floss tool is not a dict but is {type(result)}."
f" Full dump: {result}"
f" Analyzer for {self.filename} failed with error: {stderr}"
)

result["exceeded_max_number_of_strings"] = {}
# we are changing the endpoint of _docker_run to stringsifter
self.url = self.ranking_url

for key in self.max_no_of_strings:
if self.rank_strings[key]:
strings = json_dumps(result["strings"][key])
strings = dumps(result["strings"][key])
# 4 is the number of arguments that we are already passing
analyzable_strings = strings[: self.OS_MAX_ARGS - 5]
args = [
Expand Down
74 changes: 74 additions & 0 deletions api_app/analyzers_manager/migrations/0168_update_capa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Generated by Django 4.2.17 on 2025-07-24 14:57

from django.db import migrations


def migrate(apps, schema_editor):
PythonModule = apps.get_model("api_app", "PythonModule")
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")

pm = PythonModule.objects.get(
module="capa_info.CapaInfo",
base_path="api_app.analyzers_manager.file_analyzers",
)

new_crontab, created = CrontabSchedule.objects.get_or_create(
minute="0",
hour="0",
day_of_week="*",
day_of_month="*",
month_of_year="*",
timezone="UTC",
)
if created:
pm.update_schedule = new_crontab
pm.full_clean()
pm.save()

AnalyzerConfig.objects.filter(python_module=pm).update(soft_time_limit=1800)
AnalyzerConfig.objects.filter(python_module=pm).update(docker_based=False)

p1 = Parameter(
name="timeout",
type="float",
description="Duration in seconds for which intelowl waits for capa to return results. Default is set to 15 seconds.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but have you set the default of the new parameter?

Copy link
Member Author

@spoiicy spoiicy Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I've not set the default for this one. No worries I'll update and add it. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this been updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I've updated this in update_capa.py migration. Below is the snippet.

    plugin_config_to_create = [
        PluginConfig(analyzer_config=config, parameter=p1, value=15)
        for config in analyzer_configs
    ]

    PluginConfig.objects.bulk_create(plugin_config_to_create)

is_secret=False,
required=False,
python_module=pm,
)

p2 = Parameter(
name="force_pull_signatures",
type="bool",
description="Force download signatures from flare-capa github repository",
is_secret=False,
required=False,
python_module=pm,
)

p1.full_clean()
p1.save()

p2.full_clean()
p2.save()

analyzer_configs = AnalyzerConfig.objects.filter(python_module=pm)

plugin_config_to_create = [
PluginConfig(analyzer_config=config, parameter=p1, value=15)
for config in analyzer_configs
]

PluginConfig.objects.bulk_create(plugin_config_to_create)


class Migration(migrations.Migration):

dependencies = [
("analyzers_manager", "0167_analyzerrulesfileversion"),
]

operations = [migrations.RunPython(migrate, migrations.RunPython.noop)]
34 changes: 34 additions & 0 deletions api_app/analyzers_manager/migrations/0169_update_guarddog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by Django 4.2.25 on 2025-11-03 10:37

from django.db import migrations


def migrate(apps, schema_editor):
PythonModule = apps.get_model("api_app", "PythonModule")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")

guarddog_file_module = PythonModule.objects.get(
module="guarddog_file.GuardDogFile",
base_path="api_app.analyzers_manager.file_analyzers",
)

guarddog_generic_module = PythonModule.objects.get(
module="guarddog_generic.GuardDogGeneric",
base_path="api_app.analyzers_manager.observable_analyzers",
)

AnalyzerConfig.objects.filter(python_module=guarddog_file_module).update(
disabled=True
)
AnalyzerConfig.objects.filter(python_module=guarddog_generic_module).update(
disabled=True
)


class Migration(migrations.Migration):

dependencies = [
("analyzers_manager", "0168_update_capa"),
]

operations = [migrations.RunPython(migrate, migrations.RunPython.noop)]
Loading
Loading