Skip to content

Commit 808593c

Browse files
add unit test for proxy cert validation
1 parent dde9417 commit 808593c

File tree

2 files changed

+196
-0
lines changed

2 files changed

+196
-0
lines changed

tests/01-unit/api/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Unit tests for API client functionality
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import os
2+
import tempfile
3+
from unittest.mock import patch, MagicMock, mock_open
4+
import pytest
5+
from pycti import OpenCTIApiClient
6+
7+
8+
class TestOpenCTIApiClient:
9+
"""Test OpenCTIApiClient certificate handling functionality."""
10+
11+
SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE-----
12+
MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
13+
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
14+
aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF
15+
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
16+
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
17+
CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN
18+
-----END CERTIFICATE-----"""
19+
20+
INVALID_CONTENT = "This is not a certificate"
21+
22+
@pytest.fixture
23+
def api_client(self):
24+
"""Create an API client instance without performing health check."""
25+
with patch.object(OpenCTIApiClient, '_setup_proxy_certificates'):
26+
client = OpenCTIApiClient(
27+
url="http://localhost:4000",
28+
token="test-token",
29+
ssl_verify=False,
30+
perform_health_check=False
31+
)
32+
# Mock the logger
33+
client.app_logger = MagicMock()
34+
return client
35+
36+
def test_get_certificate_content_inline_pem(self, api_client):
37+
"""Test _get_certificate_content with inline PEM certificate."""
38+
result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE)
39+
40+
assert result == self.SAMPLE_CERTIFICATE
41+
api_client.app_logger.debug.assert_called_with(
42+
"HTTPS_CA_CERTIFICATES contains inline certificate content"
43+
)
44+
45+
def test_get_certificate_content_file_path(self, api_client):
46+
"""Test _get_certificate_content with a file path containing certificate."""
47+
# Create a temporary file with certificate content
48+
with tempfile.NamedTemporaryFile(mode='w', suffix='.crt', delete=False) as cert_file:
49+
cert_file.write(self.SAMPLE_CERTIFICATE)
50+
cert_file_path = cert_file.name
51+
52+
try:
53+
result = api_client._get_certificate_content(cert_file_path)
54+
55+
assert result == self.SAMPLE_CERTIFICATE
56+
api_client.app_logger.debug.assert_called_with(
57+
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
58+
{"file_path": cert_file_path}
59+
)
60+
finally:
61+
# Clean up
62+
os.unlink(cert_file_path)
63+
64+
def test_get_certificate_content_invalid_file_content(self, api_client):
65+
"""Test _get_certificate_content with a file containing invalid certificate."""
66+
# Create a temporary file with invalid content
67+
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as invalid_file:
68+
invalid_file.write(self.INVALID_CONTENT)
69+
invalid_file_path = invalid_file.name
70+
71+
try:
72+
result = api_client._get_certificate_content(invalid_file_path)
73+
74+
assert result is None
75+
api_client.app_logger.warning.assert_called_with(
76+
"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate",
77+
{"file_path": invalid_file_path}
78+
)
79+
finally:
80+
# Clean up
81+
os.unlink(invalid_file_path)
82+
83+
def test_get_certificate_content_nonexistent_file(self, api_client):
84+
"""Test _get_certificate_content with a nonexistent file path."""
85+
nonexistent_path = "/tmp/nonexistent_certificate.crt"
86+
87+
result = api_client._get_certificate_content(nonexistent_path)
88+
89+
assert result is None
90+
91+
def test_get_certificate_content_invalid_content(self, api_client):
92+
"""Test _get_certificate_content with invalid content (not PEM, not file)."""
93+
result = api_client._get_certificate_content(self.INVALID_CONTENT)
94+
95+
assert result is None
96+
97+
def test_get_certificate_content_whitespace_handling(self, api_client):
98+
"""Test _get_certificate_content handles whitespace correctly."""
99+
# Test with certificate content with leading/trailing whitespace
100+
cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n"
101+
result = api_client._get_certificate_content(cert_with_whitespace)
102+
103+
assert result == cert_with_whitespace # Should return as-is
104+
api_client.app_logger.debug.assert_called_with(
105+
"HTTPS_CA_CERTIFICATES contains inline certificate content"
106+
)
107+
108+
def test_get_certificate_content_file_read_permission_error(self, api_client):
109+
"""Test _get_certificate_content when file exists but can't be read."""
110+
# Create a temporary file
111+
with tempfile.NamedTemporaryFile(mode='w', suffix='.crt', delete=False) as cert_file:
112+
cert_file.write(self.SAMPLE_CERTIFICATE)
113+
cert_file_path = cert_file.name
114+
115+
try:
116+
# Make file unreadable (on Unix systems)
117+
os.chmod(cert_file_path, 0o000)
118+
119+
result = api_client._get_certificate_content(cert_file_path)
120+
121+
assert result is None
122+
# Check that warning was logged about failed read
123+
assert any(
124+
call[0][0] == "Failed to read certificate file"
125+
for call in api_client.app_logger.warning.call_args_list
126+
)
127+
finally:
128+
# Restore permissions and clean up
129+
os.chmod(cert_file_path, 0o644)
130+
os.unlink(cert_file_path)
131+
132+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""})
133+
def test_setup_proxy_certificates_no_env(self, api_client):
134+
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set."""
135+
api_client._setup_proxy_certificates()
136+
137+
# Should return early without setting ssl_verify
138+
assert not hasattr(api_client, 'ssl_verify') or api_client.ssl_verify is False
139+
140+
@patch.dict(os.environ, {})
141+
def test_setup_proxy_certificates_env_not_present(self, api_client):
142+
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist."""
143+
api_client._setup_proxy_certificates()
144+
145+
# Should return early without setting ssl_verify
146+
assert not hasattr(api_client, 'ssl_verify') or api_client.ssl_verify is False
147+
148+
@patch('tempfile.mkdtemp')
149+
@patch('os.path.isfile')
150+
@patch('builtins.open', new_callable=mock_open)
151+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----"})
152+
def test_setup_proxy_certificates_with_inline_cert(self, mock_file, mock_isfile, mock_mkdtemp, api_client):
153+
"""Test _setup_proxy_certificates with inline certificate content."""
154+
# Setup mocks
155+
mock_mkdtemp.return_value = "/tmp/test_certs"
156+
mock_isfile.side_effect = lambda path: path == "/etc/ssl/certs/ca-certificates.crt"
157+
158+
# Mock system certificate content
159+
system_cert_content = "-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----"
160+
161+
def open_side_effect(path, mode='r'):
162+
if path == "/etc/ssl/certs/ca-certificates.crt" and mode == 'r':
163+
return mock_open(read_data=system_cert_content)()
164+
return mock_file()
165+
166+
with patch('builtins.open', side_effect=open_side_effect):
167+
api_client._setup_proxy_certificates()
168+
169+
# Verify proxy certificates were processed
170+
api_client.app_logger.info.assert_called()
171+
172+
@patch('tempfile.mkdtemp')
173+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"})
174+
def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client):
175+
"""Test _setup_proxy_certificates with invalid certificate file path."""
176+
mock_mkdtemp.return_value = "/tmp/test_certs"
177+
178+
# Mock _get_certificate_content to return None (invalid)
179+
with patch.object(api_client, '_get_certificate_content', return_value=None):
180+
api_client._setup_proxy_certificates()
181+
182+
# Should log warning and return early
183+
api_client.app_logger.warning.assert_called()
184+
assert not hasattr(api_client, 'ssl_verify') or api_client.ssl_verify is False
185+
186+
def test_setup_proxy_certificates_exception_handling(self, api_client):
187+
"""Test _setup_proxy_certificates handles exceptions gracefully."""
188+
with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}):
189+
with patch('tempfile.mkdtemp', side_effect=Exception("Mock error")):
190+
api_client._setup_proxy_certificates()
191+
192+
# Should log warning and continue
193+
api_client.app_logger.warning.assert_called_with(
194+
"Failed to setup proxy certificates", {"error": "Mock error"}
195+
)

0 commit comments

Comments
 (0)