Skip to content
This repository was archived by the owner on Nov 29, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions pycti/api/opencti_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import datetime
import io
import json
import os
import tempfile
from typing import Dict, Tuple, Union

import magic
Expand Down Expand Up @@ -166,6 +168,9 @@ def __init__(
self.app_logger = self.logger_class("api")
self.admin_logger = self.logger_class("admin")

# Setup proxy certificates if provided
self._setup_proxy_certificates()

# Define API
self.api_token = token
self.api_url = url + "/graphql"
Expand Down Expand Up @@ -249,6 +254,125 @@ def __init__(
"OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..."
)

def _setup_proxy_certificates(self):
"""Setup HTTPS proxy certificates from environment variable.

Detects HTTPS_CA_CERTIFICATES environment variable and combines
proxy certificates with system certificates for SSL verification.
Supports both inline certificate content and file paths.
"""
https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES")
if not https_ca_certificates:
return

try:
# Create secure temporary directory
cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_")

# Determine if HTTPS_CA_CERTIFICATES contains inline content or file path
cert_content = self._get_certificate_content(https_ca_certificates)
if not cert_content:
self.app_logger.warning(
"Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path",
{
"value": (
https_ca_certificates[:50] + "..."
if len(https_ca_certificates) > 50
else https_ca_certificates
)
},
)
return

# Write proxy certificate to temp file
proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt")
with open(proxy_cert_file, "w") as f:
f.write(cert_content)

# Find system certificates
system_cert_paths = [
"/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu
"/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS
"/etc/ssl/cert.pem", # Alpine/BSD
]

# Create combined certificate bundle
combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt")
with open(combined_cert_file, "w") as combined:
# Add system certificates first
for system_path in system_cert_paths:
if os.path.exists(system_path):
with open(system_path, "r") as sys_certs:
combined.write(sys_certs.read())
combined.write("\n")
break

# Add proxy certificate
combined.write(cert_content)

# Update ssl_verify to use combined certificate bundle
self.ssl_verify = combined_cert_file

# Set environment variables for urllib and other libraries
os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file
os.environ["SSL_CERT_FILE"] = combined_cert_file

self.app_logger.info(
"Proxy certificates configured",
{"cert_bundle": combined_cert_file},
)

except Exception as e:
self.app_logger.warning(
"Failed to setup proxy certificates", {"error": str(e)}
)

def _get_certificate_content(self, https_ca_certificates):
"""Extract certificate content from environment variable.

Supports both inline certificate content (PEM format) and file paths.

:param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var
:type https_ca_certificates: str
:return: Certificate content in PEM format or None if invalid
:rtype: str or None
"""
# Check if it's inline certificate content (starts with PEM header)
if https_ca_certificates.strip().startswith("-----BEGIN CERTIFICATE-----"):
self.app_logger.debug(
"HTTPS_CA_CERTIFICATES contains inline certificate content"
)
return https_ca_certificates

# Check if it's a file path
if os.path.isfile(https_ca_certificates.strip()):
cert_file_path = https_ca_certificates.strip()
try:
with open(cert_file_path, "r") as f:
cert_content = f.read()
# Validate it's actually a certificate
if "-----BEGIN CERTIFICATE-----" in cert_content:
self.app_logger.debug(
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
{"file_path": cert_file_path},
)
return cert_content
else:
self.app_logger.warning(
"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate",
{"file_path": cert_file_path},
)
return None
except Exception as e:
self.app_logger.warning(
"Failed to read certificate file",
{"file_path": cert_file_path, "error": str(e)},
)
return None

# Neither inline content nor valid file path
return None

def set_applicant_id_header(self, applicant_id):
self.request_headers["opencti-applicant-id"] = applicant_id

Expand Down
1 change: 1 addition & 0 deletions tests/01-unit/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Unit tests for API client functionality
188 changes: 188 additions & 0 deletions tests/01-unit/api/test_opencti_api_client-proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import os
import tempfile
from unittest.mock import MagicMock, mock_open, patch

import pytest

from pycti import OpenCTIApiClient


class TestOpenCTIApiClient:
"""Test OpenCTIApiClient certificate handling functionality."""

SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE-----
MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN
-----END CERTIFICATE-----"""

INVALID_CONTENT = "This is not a certificate"

@pytest.fixture
def api_client(self):
"""Create an API client instance without performing health check."""
with patch.object(OpenCTIApiClient, "_setup_proxy_certificates"):
client = OpenCTIApiClient(
url="http://localhost:4000",
token="test-token",
ssl_verify=False,
perform_health_check=False,
)
# Mock the logger
client.app_logger = MagicMock()
return client

def test_get_certificate_content_inline_pem(self, api_client):
"""Test _get_certificate_content with inline PEM certificate."""
result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE)

assert result == self.SAMPLE_CERTIFICATE
api_client.app_logger.debug.assert_called_with(
"HTTPS_CA_CERTIFICATES contains inline certificate content"
)

def test_get_certificate_content_file_path(self, api_client):
"""Test _get_certificate_content with a file path containing certificate."""
# Create a temporary file with certificate content
with tempfile.NamedTemporaryFile(
mode="w", suffix=".crt", delete=False
) as cert_file:
cert_file.write(self.SAMPLE_CERTIFICATE)
cert_file_path = cert_file.name

try:
result = api_client._get_certificate_content(cert_file_path)

assert result == self.SAMPLE_CERTIFICATE
api_client.app_logger.debug.assert_called_with(
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
{"file_path": cert_file_path},
)
finally:
# Clean up
os.unlink(cert_file_path)

def test_get_certificate_content_invalid_file_content(self, api_client):
"""Test _get_certificate_content with a file containing invalid certificate."""
# Create a temporary file with invalid content
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False
) as invalid_file:
invalid_file.write(self.INVALID_CONTENT)
invalid_file_path = invalid_file.name

try:
result = api_client._get_certificate_content(invalid_file_path)

assert result is None
api_client.app_logger.warning.assert_called_with(
"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate",
{"file_path": invalid_file_path},
)
finally:
# Clean up
os.unlink(invalid_file_path)

def test_get_certificate_content_nonexistent_file(self, api_client):
"""Test _get_certificate_content with a nonexistent file path."""
nonexistent_path = "/tmp/nonexistent_certificate.crt"

result = api_client._get_certificate_content(nonexistent_path)

assert result is None

def test_get_certificate_content_invalid_content(self, api_client):
"""Test _get_certificate_content with invalid content (not PEM, not file)."""
result = api_client._get_certificate_content(self.INVALID_CONTENT)

assert result is None

def test_get_certificate_content_whitespace_handling(self, api_client):
"""Test _get_certificate_content handles whitespace correctly."""
# Test with certificate content with leading/trailing whitespace
cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n"
result = api_client._get_certificate_content(cert_with_whitespace)

assert result == cert_with_whitespace # Should return as-is
api_client.app_logger.debug.assert_called_with(
"HTTPS_CA_CERTIFICATES contains inline certificate content"
)

@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""})
def test_setup_proxy_certificates_no_env(self, api_client):
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set."""
api_client._setup_proxy_certificates()

# Should return early without setting ssl_verify
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False

@patch.dict(os.environ, {})
def test_setup_proxy_certificates_env_not_present(self, api_client):
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist."""
api_client._setup_proxy_certificates()

# Should return early without setting ssl_verify
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False

@patch("tempfile.mkdtemp")
@patch("os.path.isfile")
@patch("builtins.open", new_callable=mock_open)
@patch.dict(
os.environ,
{
"HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----"
},
)
def test_setup_proxy_certificates_with_inline_cert(
self, mock_file, mock_isfile, mock_mkdtemp, api_client
):
"""Test _setup_proxy_certificates with inline certificate content."""
# Setup mocks
mock_mkdtemp.return_value = "/tmp/test_certs"
mock_isfile.side_effect = (
lambda path: path == "/etc/ssl/certs/ca-certificates.crt"
)

# Mock system certificate content
system_cert_content = (
"-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----"
)

def open_side_effect(path, mode="r"):
if path == "/etc/ssl/certs/ca-certificates.crt" and mode == "r":
return mock_open(read_data=system_cert_content)()
return mock_file()

with patch("builtins.open", side_effect=open_side_effect):
api_client._setup_proxy_certificates()

# Verify proxy certificates were processed
api_client.app_logger.info.assert_called()

@patch("tempfile.mkdtemp")
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"})
def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client):
"""Test _setup_proxy_certificates with invalid certificate file path."""
mock_mkdtemp.return_value = "/tmp/test_certs"

# Mock _get_certificate_content to return None (invalid)
with patch.object(api_client, "_get_certificate_content", return_value=None):
api_client._setup_proxy_certificates()

# Should log warning and return early
api_client.app_logger.warning.assert_called()
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False

def test_setup_proxy_certificates_exception_handling(self, api_client):
"""Test _setup_proxy_certificates handles exceptions gracefully."""
with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}):
with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")):
api_client._setup_proxy_certificates()

# Should log warning and continue
api_client.app_logger.warning.assert_called_with(
"Failed to setup proxy certificates", {"error": "Mock error"}
)