Skip to content

Commit 7f4923a

Browse files
add unit test for proxy cert validation
1 parent e33b77c commit 7f4923a

File tree

2 files changed

+189
-0
lines changed

2 files changed

+189
-0
lines changed

tests/01-unit/api/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Unit tests for API client functionality
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import os
2+
import tempfile
3+
from unittest.mock import MagicMock, mock_open, patch
4+
5+
import pytest
6+
7+
from pycti import OpenCTIApiClient
8+
9+
10+
class TestOpenCTIApiClient:
11+
"""Test OpenCTIApiClient certificate handling functionality."""
12+
13+
SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE-----
14+
MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
15+
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
16+
aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF
17+
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
18+
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
19+
CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN
20+
-----END CERTIFICATE-----"""
21+
22+
INVALID_CONTENT = "This is not a certificate"
23+
24+
@pytest.fixture
25+
def api_client(self):
26+
"""Create an API client instance without performing health check."""
27+
with patch.object(OpenCTIApiClient, "_setup_proxy_certificates"):
28+
client = OpenCTIApiClient(
29+
url="http://localhost:4000",
30+
token="test-token",
31+
ssl_verify=False,
32+
perform_health_check=False,
33+
)
34+
# Mock the logger
35+
client.app_logger = MagicMock()
36+
return client
37+
38+
def test_get_certificate_content_inline_pem(self, api_client):
39+
"""Test _get_certificate_content with inline PEM certificate."""
40+
result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE)
41+
42+
assert result == self.SAMPLE_CERTIFICATE
43+
api_client.app_logger.debug.assert_called_with(
44+
"HTTPS_CA_CERTIFICATES contains inline certificate content"
45+
)
46+
47+
def test_get_certificate_content_file_path(self, api_client):
48+
"""Test _get_certificate_content with a file path containing certificate."""
49+
# Create a temporary file with certificate content
50+
with tempfile.NamedTemporaryFile(
51+
mode="w", suffix=".crt", delete=False
52+
) as cert_file:
53+
cert_file.write(self.SAMPLE_CERTIFICATE)
54+
cert_file_path = cert_file.name
55+
56+
try:
57+
result = api_client._get_certificate_content(cert_file_path)
58+
59+
assert result == self.SAMPLE_CERTIFICATE
60+
api_client.app_logger.debug.assert_called_with(
61+
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
62+
{"file_path": cert_file_path},
63+
)
64+
finally:
65+
# Clean up
66+
os.unlink(cert_file_path)
67+
68+
def test_get_certificate_content_invalid_file_content(self, api_client):
69+
"""Test _get_certificate_content with a file containing invalid certificate."""
70+
# Create a temporary file with invalid content
71+
with tempfile.NamedTemporaryFile(
72+
mode="w", suffix=".txt", delete=False
73+
) as invalid_file:
74+
invalid_file.write(self.INVALID_CONTENT)
75+
invalid_file_path = invalid_file.name
76+
77+
try:
78+
result = api_client._get_certificate_content(invalid_file_path)
79+
80+
assert result is None
81+
api_client.app_logger.warning.assert_called_with(
82+
"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate",
83+
{"file_path": invalid_file_path},
84+
)
85+
finally:
86+
# Clean up
87+
os.unlink(invalid_file_path)
88+
89+
def test_get_certificate_content_nonexistent_file(self, api_client):
90+
"""Test _get_certificate_content with a nonexistent file path."""
91+
nonexistent_path = "/tmp/nonexistent_certificate.crt"
92+
93+
result = api_client._get_certificate_content(nonexistent_path)
94+
95+
assert result is None
96+
97+
def test_get_certificate_content_invalid_content(self, api_client):
98+
"""Test _get_certificate_content with invalid content (not PEM, not file)."""
99+
result = api_client._get_certificate_content(self.INVALID_CONTENT)
100+
101+
assert result is None
102+
103+
def test_get_certificate_content_whitespace_handling(self, api_client):
104+
"""Test _get_certificate_content handles whitespace correctly."""
105+
# Test with certificate content with leading/trailing whitespace
106+
cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n"
107+
result = api_client._get_certificate_content(cert_with_whitespace)
108+
109+
assert result == cert_with_whitespace # Should return as-is
110+
api_client.app_logger.debug.assert_called_with(
111+
"HTTPS_CA_CERTIFICATES contains inline certificate content"
112+
)
113+
114+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""})
115+
def test_setup_proxy_certificates_no_env(self, api_client):
116+
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set."""
117+
api_client._setup_proxy_certificates()
118+
119+
# Should return early without setting ssl_verify
120+
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
121+
122+
@patch.dict(os.environ, {})
123+
def test_setup_proxy_certificates_env_not_present(self, api_client):
124+
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist."""
125+
api_client._setup_proxy_certificates()
126+
127+
# Should return early without setting ssl_verify
128+
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
129+
130+
@patch("tempfile.mkdtemp")
131+
@patch("os.path.isfile")
132+
@patch("builtins.open", new_callable=mock_open)
133+
@patch.dict(
134+
os.environ,
135+
{
136+
"HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----"
137+
},
138+
)
139+
def test_setup_proxy_certificates_with_inline_cert(
140+
self, mock_file, mock_isfile, mock_mkdtemp, api_client
141+
):
142+
"""Test _setup_proxy_certificates with inline certificate content."""
143+
# Setup mocks
144+
mock_mkdtemp.return_value = "/tmp/test_certs"
145+
mock_isfile.side_effect = (
146+
lambda path: path == "/etc/ssl/certs/ca-certificates.crt"
147+
)
148+
149+
# Mock system certificate content
150+
system_cert_content = (
151+
"-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----"
152+
)
153+
154+
def open_side_effect(path, mode="r"):
155+
if path == "/etc/ssl/certs/ca-certificates.crt" and mode == "r":
156+
return mock_open(read_data=system_cert_content)()
157+
return mock_file()
158+
159+
with patch("builtins.open", side_effect=open_side_effect):
160+
api_client._setup_proxy_certificates()
161+
162+
# Verify proxy certificates were processed
163+
api_client.app_logger.info.assert_called()
164+
165+
@patch("tempfile.mkdtemp")
166+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"})
167+
def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client):
168+
"""Test _setup_proxy_certificates with invalid certificate file path."""
169+
mock_mkdtemp.return_value = "/tmp/test_certs"
170+
171+
# Mock _get_certificate_content to return None (invalid)
172+
with patch.object(api_client, "_get_certificate_content", return_value=None):
173+
api_client._setup_proxy_certificates()
174+
175+
# Should log warning and return early
176+
api_client.app_logger.warning.assert_called()
177+
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
178+
179+
def test_setup_proxy_certificates_exception_handling(self, api_client):
180+
"""Test _setup_proxy_certificates handles exceptions gracefully."""
181+
with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}):
182+
with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")):
183+
api_client._setup_proxy_certificates()
184+
185+
# Should log warning and continue
186+
api_client.app_logger.warning.assert_called_with(
187+
"Failed to setup proxy certificates", {"error": "Mock error"}
188+
)

0 commit comments

Comments
 (0)