Skip to content
This repository was archived by the owner on Nov 29, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions pycti/api/opencti_api_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# coding: utf-8
import atexit
import base64
import datetime
import io
import json
import os
import shutil
import signal
import tempfile
from typing import Dict, Tuple, Union

import magic
Expand Down Expand Up @@ -166,6 +171,18 @@ def __init__(
self.app_logger = self.logger_class("api")
self.admin_logger = self.logger_class("admin")

# Initialize temp certificate directory tracker
self.temp_cert_dir = None

# Setup proxy certificates if provided
self._setup_proxy_certificates()

# Register cleanup handlers for temp certificates
if self.temp_cert_dir:
atexit.register(self._cleanup_temp_certificates)
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)

# Define API
self.api_token = token
self.api_url = url + "/graphql"
Expand Down Expand Up @@ -249,6 +266,167 @@ def __init__(
"OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..."
)

def _setup_proxy_certificates(self):
"""Setup HTTPS proxy certificates from environment variable.

Detects HTTPS_CA_CERTIFICATES environment variable and combines
proxy certificates with system certificates for SSL verification.
Supports both inline certificate content and file paths.
"""
https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES")
if not https_ca_certificates:
return

try:
# Create secure temporary directory
cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_")
self.temp_cert_dir = cert_dir

# Determine if HTTPS_CA_CERTIFICATES contains inline content or file path
cert_content = self._get_certificate_content(https_ca_certificates)
if not cert_content:
self.app_logger.warning(
"Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path",
{
"value": (
https_ca_certificates[:50] + "..."
if len(https_ca_certificates) > 50
else https_ca_certificates
)
},
)
return

# Write proxy certificate to temp file
proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt")
with open(proxy_cert_file, "w") as f:
f.write(cert_content)

# Find system certificates
system_cert_paths = [
"/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu
"/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS
"/etc/ssl/cert.pem", # Alpine/BSD
]

# Create combined certificate bundle
combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt")
with open(combined_cert_file, "w") as combined:
# Add system certificates first
for system_path in system_cert_paths:
if os.path.exists(system_path):
with open(system_path, "r") as sys_certs:
combined.write(sys_certs.read())
combined.write("\n")
break

# Add proxy certificate
combined.write(cert_content)

# Update ssl_verify to use combined certificate bundle
self.ssl_verify = combined_cert_file

# Set environment variables for urllib and other libraries
os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file
os.environ["SSL_CERT_FILE"] = combined_cert_file

self.app_logger.info(
"Proxy certificates configured",
{"cert_bundle": combined_cert_file},
)

except Exception as e:
self.app_logger.error(
"Failed to setup proxy certificates", {"error": str(e)}
)
raise

def _get_certificate_content(self, https_ca_certificates):
"""Extract certificate content from environment variable.

Supports both inline certificate content (PEM format) and file paths.

:param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var
:type https_ca_certificates: str
:return: Certificate content in PEM format or None if invalid
:rtype: str or None
"""
# Strip whitespace once at the beginning
stripped_https_ca_certificates = https_ca_certificates.strip()

# Check if it's inline certificate content (starts with PEM header)
if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"):
self.app_logger.debug(
"HTTPS_CA_CERTIFICATES contains inline certificate content"
)
return https_ca_certificates

# Check if it's a file path
if os.path.isfile(stripped_https_ca_certificates):
cert_file_path = stripped_https_ca_certificates
try:
with open(cert_file_path, "r") as f:
cert_content = f.read()
# Validate it's actually a certificate
if "-----BEGIN CERTIFICATE-----" in cert_content:
self.app_logger.debug(
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
{"file_path": cert_file_path},
)
return cert_content
else:
self.app_logger.warning(
"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate",
{"file_path": cert_file_path},
)
return None
except Exception as e:
self.app_logger.warning(
"Failed to read certificate file",
{"file_path": cert_file_path, "error": str(e)},
)
return None

# Neither inline content nor valid file path
return None

def _cleanup_temp_certificates(self):
"""Clean up temporary certificate directory.

This method is called on normal program exit via atexit
or when receiving termination signals (SIGTERM/SIGINT).
"""
if self.temp_cert_dir and os.path.exists(self.temp_cert_dir):
try:
shutil.rmtree(self.temp_cert_dir)
self.app_logger.debug(
"Cleaned up temporary certificates",
{"cert_dir": self.temp_cert_dir}
)
except Exception as e:
self.app_logger.warning(
"Failed to cleanup temporary certificates",
{"cert_dir": self.temp_cert_dir, "error": str(e)}
)
finally:
self.temp_cert_dir = None

def _signal_handler(self, signum, frame):
"""Handle termination signals (SIGTERM/SIGINT).

Performs cleanup and then raises SystemExit to allow
normal shutdown procedures to complete.

:param signum: Signal number
:param frame: Current stack frame
"""
self.app_logger.info(
"Received termination signal, cleaning up",
{"signal": signum}
)
self._cleanup_temp_certificates()
raise SystemExit(0)

def set_applicant_id_header(self, applicant_id):
self.request_headers["opencti-applicant-id"] = applicant_id

Expand Down
1 change: 1 addition & 0 deletions tests/01-unit/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Unit tests for API client functionality
Loading