From 425e127bc0578071c801b2c3732c7703898de7b0 Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Wed, 29 Oct 2025 11:59:39 +0100 Subject: [PATCH 1/3] [client] Add inline proxy certificate support for HTTPS connections --- pycti/api/opencti_api_client.py | 62 +++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 5729f6d0..f5afaef2 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -3,6 +3,8 @@ import datetime import io import json +import os +import tempfile from typing import Dict, Tuple, Union import magic @@ -166,6 +168,9 @@ def __init__( self.app_logger = self.logger_class("api") self.admin_logger = self.logger_class("admin") + # Setup proxy certificates if provided + self._setup_proxy_certificates() + # Define API self.api_token = token self.api_url = url + "/graphql" @@ -249,6 +254,63 @@ def __init__( "OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..." ) + def _setup_proxy_certificates(self): + """Setup HTTPS proxy certificates from environment variable. + + Detects HTTPS_CA_CERTIFICATES environment variable and combines + proxy certificates with system certificates for SSL verification. + """ + https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") + if not https_ca_certificates: + return + + try: + # Create secure temporary directory + cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + + # Write proxy certificate to temp file + proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") + with open(proxy_cert_file, "w") as f: + f.write(https_ca_certificates) + + # Find system certificates + system_cert_paths = [ + "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu + "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS + "/etc/ssl/cert.pem", # Alpine/BSD + ] + + # Create combined certificate bundle + combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") + with open(combined_cert_file, "w") as combined: + # Add system certificates first + for system_path in system_cert_paths: + if os.path.exists(system_path): + with open(system_path, "r") as sys_certs: + combined.write(sys_certs.read()) + combined.write("\n") + break + + # Add proxy certificate + combined.write(https_ca_certificates) + + # Update ssl_verify to use combined certificate bundle + self.ssl_verify = combined_cert_file + + # Set environment variables for urllib and other libraries + os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file + os.environ["SSL_CERT_FILE"] = combined_cert_file + + self.app_logger.info( + "Proxy certificates configured", + {"cert_bundle": combined_cert_file}, + ) + + except Exception as e: + self.app_logger.warning( + "Failed to setup proxy certificates", {"error": str(e)} + ) + def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id From e33b77c2f790d48bd1bd1167b5910fa5fbca29bd Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Thu, 30 Oct 2025 09:23:01 +0100 Subject: [PATCH 2/3] handle case when HTTPS_CA_CERTIFICATES is a filepath --- pycti/api/opencti_api_client.py | 66 ++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index f5afaef2..4250ead0 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -259,6 +259,7 @@ def _setup_proxy_certificates(self): Detects HTTPS_CA_CERTIFICATES environment variable and combines proxy certificates with system certificates for SSL verification. + Supports both inline certificate content and file paths. """ https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") if not https_ca_certificates: @@ -268,10 +269,25 @@ def _setup_proxy_certificates(self): # Create secure temporary directory cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path + cert_content = self._get_certificate_content(https_ca_certificates) + if not cert_content: + self.app_logger.warning( + "Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path", + { + "value": ( + https_ca_certificates[:50] + "..." + if len(https_ca_certificates) > 50 + else https_ca_certificates + ) + }, + ) + return + # Write proxy certificate to temp file proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") with open(proxy_cert_file, "w") as f: - f.write(https_ca_certificates) + f.write(cert_content) # Find system certificates system_cert_paths = [ @@ -292,7 +308,7 @@ def _setup_proxy_certificates(self): break # Add proxy certificate - combined.write(https_ca_certificates) + combined.write(cert_content) # Update ssl_verify to use combined certificate bundle self.ssl_verify = combined_cert_file @@ -311,6 +327,52 @@ def _setup_proxy_certificates(self): "Failed to setup proxy certificates", {"error": str(e)} ) + def _get_certificate_content(self, https_ca_certificates): + """Extract certificate content from environment variable. + + Supports both inline certificate content (PEM format) and file paths. + + :param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var + :type https_ca_certificates: str + :return: Certificate content in PEM format or None if invalid + :rtype: str or None + """ + # Check if it's inline certificate content (starts with PEM header) + if https_ca_certificates.strip().startswith("-----BEGIN CERTIFICATE-----"): + self.app_logger.debug( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + return https_ca_certificates + + # Check if it's a file path + if os.path.isfile(https_ca_certificates.strip()): + cert_file_path = https_ca_certificates.strip() + try: + with open(cert_file_path, "r") as f: + cert_content = f.read() + # Validate it's actually a certificate + if "-----BEGIN CERTIFICATE-----" in cert_content: + self.app_logger.debug( + "HTTPS_CA_CERTIFICATES contains valid certificate file path", + {"file_path": cert_file_path}, + ) + return cert_content + else: + self.app_logger.warning( + "File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", + {"file_path": cert_file_path}, + ) + return None + except Exception as e: + self.app_logger.warning( + "Failed to read certificate file", + {"file_path": cert_file_path, "error": str(e)}, + ) + return None + + # Neither inline content nor valid file path + return None + def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id From 7f4923a14536a81fa6c6e6546bf7787a9226ebc2 Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Fri, 31 Oct 2025 16:32:11 +0100 Subject: [PATCH 3/3] add unit test for proxy cert validation --- tests/01-unit/api/__init__.py | 1 + .../api/test_opencti_api_client-proxy.py | 188 ++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 tests/01-unit/api/__init__.py create mode 100644 tests/01-unit/api/test_opencti_api_client-proxy.py diff --git a/tests/01-unit/api/__init__.py b/tests/01-unit/api/__init__.py new file mode 100644 index 00000000..78c08441 --- /dev/null +++ b/tests/01-unit/api/__init__.py @@ -0,0 +1 @@ +# Unit tests for API client functionality diff --git a/tests/01-unit/api/test_opencti_api_client-proxy.py b/tests/01-unit/api/test_opencti_api_client-proxy.py new file mode 100644 index 00000000..76a38453 --- /dev/null +++ b/tests/01-unit/api/test_opencti_api_client-proxy.py @@ -0,0 +1,188 @@ +import os +import tempfile +from unittest.mock import MagicMock, mock_open, patch + +import pytest + +from pycti import OpenCTIApiClient + + +class TestOpenCTIApiClient: + """Test OpenCTIApiClient certificate handling functionality.""" + + SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN +-----END CERTIFICATE-----""" + + INVALID_CONTENT = "This is not a certificate" + + @pytest.fixture + def api_client(self): + """Create an API client instance without performing health check.""" + with patch.object(OpenCTIApiClient, "_setup_proxy_certificates"): + client = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token", + ssl_verify=False, + perform_health_check=False, + ) + # Mock the logger + client.app_logger = MagicMock() + return client + + def test_get_certificate_content_inline_pem(self, api_client): + """Test _get_certificate_content with inline PEM certificate.""" + result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE) + + assert result == self.SAMPLE_CERTIFICATE + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + + def test_get_certificate_content_file_path(self, api_client): + """Test _get_certificate_content with a file path containing certificate.""" + # Create a temporary file with certificate content + with tempfile.NamedTemporaryFile( + mode="w", suffix=".crt", delete=False + ) as cert_file: + cert_file.write(self.SAMPLE_CERTIFICATE) + cert_file_path = cert_file.name + + try: + result = api_client._get_certificate_content(cert_file_path) + + assert result == self.SAMPLE_CERTIFICATE + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains valid certificate file path", + {"file_path": cert_file_path}, + ) + finally: + # Clean up + os.unlink(cert_file_path) + + def test_get_certificate_content_invalid_file_content(self, api_client): + """Test _get_certificate_content with a file containing invalid certificate.""" + # Create a temporary file with invalid content + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False + ) as invalid_file: + invalid_file.write(self.INVALID_CONTENT) + invalid_file_path = invalid_file.name + + try: + result = api_client._get_certificate_content(invalid_file_path) + + assert result is None + api_client.app_logger.warning.assert_called_with( + "File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", + {"file_path": invalid_file_path}, + ) + finally: + # Clean up + os.unlink(invalid_file_path) + + def test_get_certificate_content_nonexistent_file(self, api_client): + """Test _get_certificate_content with a nonexistent file path.""" + nonexistent_path = "/tmp/nonexistent_certificate.crt" + + result = api_client._get_certificate_content(nonexistent_path) + + assert result is None + + def test_get_certificate_content_invalid_content(self, api_client): + """Test _get_certificate_content with invalid content (not PEM, not file).""" + result = api_client._get_certificate_content(self.INVALID_CONTENT) + + assert result is None + + def test_get_certificate_content_whitespace_handling(self, api_client): + """Test _get_certificate_content handles whitespace correctly.""" + # Test with certificate content with leading/trailing whitespace + cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n" + result = api_client._get_certificate_content(cert_with_whitespace) + + assert result == cert_with_whitespace # Should return as-is + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + + @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""}) + def test_setup_proxy_certificates_no_env(self, api_client): + """Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set.""" + api_client._setup_proxy_certificates() + + # Should return early without setting ssl_verify + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + @patch.dict(os.environ, {}) + def test_setup_proxy_certificates_env_not_present(self, api_client): + """Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist.""" + api_client._setup_proxy_certificates() + + # Should return early without setting ssl_verify + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + @patch("tempfile.mkdtemp") + @patch("os.path.isfile") + @patch("builtins.open", new_callable=mock_open) + @patch.dict( + os.environ, + { + "HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----" + }, + ) + def test_setup_proxy_certificates_with_inline_cert( + self, mock_file, mock_isfile, mock_mkdtemp, api_client + ): + """Test _setup_proxy_certificates with inline certificate content.""" + # Setup mocks + mock_mkdtemp.return_value = "/tmp/test_certs" + mock_isfile.side_effect = ( + lambda path: path == "/etc/ssl/certs/ca-certificates.crt" + ) + + # Mock system certificate content + system_cert_content = ( + "-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----" + ) + + def open_side_effect(path, mode="r"): + if path == "/etc/ssl/certs/ca-certificates.crt" and mode == "r": + return mock_open(read_data=system_cert_content)() + return mock_file() + + with patch("builtins.open", side_effect=open_side_effect): + api_client._setup_proxy_certificates() + + # Verify proxy certificates were processed + api_client.app_logger.info.assert_called() + + @patch("tempfile.mkdtemp") + @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"}) + def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client): + """Test _setup_proxy_certificates with invalid certificate file path.""" + mock_mkdtemp.return_value = "/tmp/test_certs" + + # Mock _get_certificate_content to return None (invalid) + with patch.object(api_client, "_get_certificate_content", return_value=None): + api_client._setup_proxy_certificates() + + # Should log warning and return early + api_client.app_logger.warning.assert_called() + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + def test_setup_proxy_certificates_exception_handling(self, api_client): + """Test _setup_proxy_certificates handles exceptions gracefully.""" + with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): + with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")): + api_client._setup_proxy_certificates() + + # Should log warning and continue + api_client.app_logger.warning.assert_called_with( + "Failed to setup proxy certificates", {"error": "Mock error"} + )