Skip to content

Commit 2855533

Browse files
committed
Merge branch 'release/current'
2 parents ed87aeb + 7d4ed3b commit 2855533

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+931
-108
lines changed

pycti/connector/opencti_connector_helper.py

Lines changed: 190 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,21 @@
3737

3838

3939
def killProgramHook(etype, value, tb):
40+
"""Exception hook to terminate the program.
41+
42+
:param etype: Exception type
43+
:param value: Exception value
44+
:param tb: Traceback object
45+
"""
4046
os.kill(os.getpid(), signal.SIGTERM)
4147

4248

4349
def start_loop(loop):
50+
"""Start an asyncio event loop.
51+
52+
:param loop: The asyncio event loop to start
53+
:type loop: asyncio.AbstractEventLoop
54+
"""
4455
asyncio.set_event_loop(loop)
4556
loop.run_forever()
4657

@@ -93,10 +104,24 @@ def get_config_variable(
93104

94105

95106
def is_memory_certificate(certificate):
107+
"""Check if a certificate is provided as a PEM string in memory.
108+
109+
:param certificate: The certificate data to check
110+
:type certificate: str
111+
:return: True if the certificate is a PEM string, False otherwise
112+
:rtype: bool
113+
"""
96114
return certificate.startswith("-----BEGIN")
97115

98116

99117
def ssl_verify_locations(ssl_context, certdata):
118+
"""Load certificate verification locations into SSL context.
119+
120+
:param ssl_context: The SSL context to configure
121+
:type ssl_context: ssl.SSLContext
122+
:param certdata: Certificate data (file path or PEM string)
123+
:type certdata: str or None
124+
"""
100125
if certdata is None:
101126
return
102127

@@ -106,9 +131,17 @@ def ssl_verify_locations(ssl_context, certdata):
106131
ssl_context.load_verify_locations(cafile=certdata)
107132

108133

109-
# As cert must be written in files to be loaded in ssl context
110-
# Creates a temporary file in the most secure manner possible
111134
def data_to_temp_file(data):
135+
"""Write data to a temporary file securely.
136+
137+
Creates a temporary file in the most secure manner possible.
138+
The file is readable and writable only by the creating user ID.
139+
140+
:param data: The data to write to the temporary file
141+
:type data: str
142+
:return: Path to the created temporary file
143+
:rtype: str
144+
"""
112145
# The file is readable and writable only by the creating user ID.
113146
# If the operating system uses permission bits to indicate whether a
114147
# file is executable, the file is executable by no one. The file
@@ -121,6 +154,17 @@ def data_to_temp_file(data):
121154

122155

123156
def ssl_cert_chain(ssl_context, cert_data, key_data, passphrase):
157+
"""Load certificate chain into SSL context.
158+
159+
:param ssl_context: The SSL context to configure
160+
:type ssl_context: ssl.SSLContext
161+
:param cert_data: Certificate data (file path or PEM string)
162+
:type cert_data: str or None
163+
:param key_data: Private key data (file path or PEM string)
164+
:type key_data: str or None
165+
:param passphrase: Passphrase for the private key
166+
:type passphrase: str or None
167+
"""
124168
if cert_data is None:
125169
return
126170

@@ -147,6 +191,13 @@ def ssl_cert_chain(ssl_context, cert_data, key_data, passphrase):
147191

148192

149193
def create_callback_ssl_context(config) -> ssl.SSLContext:
194+
"""Create SSL context for API callback server.
195+
196+
:param config: Configuration dictionary
197+
:type config: dict
198+
:return: Configured SSL context
199+
:rtype: ssl.SSLContext
200+
"""
150201
listen_protocol_api_ssl_key = get_config_variable(
151202
"LISTEN_PROTOCOL_API_SSL_KEY",
152203
["connector", "listen_protocol_api_ssl_key"],
@@ -176,6 +227,13 @@ def create_callback_ssl_context(config) -> ssl.SSLContext:
176227

177228

178229
def create_mq_ssl_context(config) -> ssl.SSLContext:
230+
"""Create SSL context for message queue connections.
231+
232+
:param config: Configuration dictionary
233+
:type config: dict
234+
:return: Configured SSL context for MQ connections
235+
:rtype: ssl.SSLContext
236+
"""
179237
use_ssl_ca = get_config_variable("MQ_USE_SSL_CA", ["mq", "use_ssl_ca"], config)
180238
use_ssl_cert = get_config_variable(
181239
"MQ_USE_SSL_CERT", ["mq", "use_ssl_cert"], config
@@ -292,6 +350,11 @@ def _process_message(self, channel, method, properties, body) -> None:
292350
)
293351

294352
def _set_draft_id(self, draft_id):
353+
"""Set the draft ID for the helper and API instances.
354+
355+
:param draft_id: The draft ID to set
356+
:type draft_id: str
357+
"""
295358
self.helper.draft_id = draft_id
296359
self.helper.api.set_draft_id(draft_id)
297360
self.helper.api_impersonate.set_draft_id(draft_id)
@@ -546,6 +609,11 @@ def run(self) -> None:
546609
raise ValueError("Unsupported listen protocol type")
547610

548611
def stop(self):
612+
"""Stop the ListenQueue thread and close connections.
613+
614+
This method sets the exit event, closes the RabbitMQ connection,
615+
and waits for the processing thread to complete.
616+
"""
549617
self.helper.connector_logger.info("Preparing ListenQueue for clean shutdown")
550618
self.exit_event.set()
551619
self.pika_connection.close()
@@ -794,6 +862,10 @@ def run(self) -> None: # pylint: disable=too-many-branches
794862
sys.excepthook(*sys.exc_info())
795863

796864
def stop(self):
865+
"""Stop the ListenStream thread.
866+
867+
This method sets the exit event to signal the stream listening thread to stop.
868+
"""
797869
self.helper.connector_logger.info("Preparing ListenStream for clean shutdown")
798870
self.exit_event.set()
799871

@@ -817,6 +889,11 @@ def __init__(
817889

818890
@property
819891
def all_details(self):
892+
"""Get all connector information details as a dictionary.
893+
894+
:return: Dictionary containing all connector status information
895+
:rtype: dict
896+
"""
820897
return {
821898
"run_and_terminate": self._run_and_terminate,
822899
"buffering": self._buffering,
@@ -832,6 +909,11 @@ def run_and_terminate(self) -> bool:
832909

833910
@run_and_terminate.setter
834911
def run_and_terminate(self, value):
912+
"""Set the run_and_terminate flag.
913+
914+
:param value: Whether the connector should run once and terminate
915+
:type value: bool
916+
"""
835917
self._run_and_terminate = value
836918

837919
@property
@@ -840,6 +922,11 @@ def buffering(self) -> bool:
840922

841923
@buffering.setter
842924
def buffering(self, value):
925+
"""Set the buffering status.
926+
927+
:param value: Whether the connector is currently buffering
928+
:type value: bool
929+
"""
843930
self._buffering = value
844931

845932
@property
@@ -848,6 +935,11 @@ def queue_threshold(self) -> float:
848935

849936
@queue_threshold.setter
850937
def queue_threshold(self, value):
938+
"""Set the queue threshold value.
939+
940+
:param value: The queue size threshold in MB
941+
:type value: float
942+
"""
851943
self._queue_threshold = value
852944

853945
@property
@@ -856,6 +948,11 @@ def queue_messages_size(self) -> float:
856948

857949
@queue_messages_size.setter
858950
def queue_messages_size(self, value):
951+
"""Set the current queue messages size.
952+
953+
:param value: The current size of messages in the queue in MB
954+
:type value: float
955+
"""
859956
self._queue_messages_size = value
860957

861958
@property
@@ -864,6 +961,11 @@ def next_run_datetime(self) -> datetime:
864961

865962
@next_run_datetime.setter
866963
def next_run_datetime(self, value):
964+
"""Set the next scheduled run datetime.
965+
966+
:param value: The datetime for the next scheduled run
967+
:type value: datetime
968+
"""
867969
self._next_run_datetime = value
868970

869971
@property
@@ -872,6 +974,11 @@ def last_run_datetime(self) -> datetime:
872974

873975
@last_run_datetime.setter
874976
def last_run_datetime(self, value):
977+
"""Set the last run datetime.
978+
979+
:param value: The datetime of the last run
980+
:type value: datetime
981+
"""
875982
self._last_run_datetime = value
876983

877984

@@ -1062,6 +1169,20 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
10621169
config,
10631170
default=False,
10641171
)
1172+
metrics_namespace = get_config_variable(
1173+
"CONNECTOR_METRICS_NAMESPACE",
1174+
["connector", "metrics_namespace"],
1175+
config,
1176+
False,
1177+
"",
1178+
)
1179+
metrics_subsystem = get_config_variable(
1180+
"CONNECTOR_METRICS_SUBSYSTEM",
1181+
["connector", "metrics_subsystem"],
1182+
config,
1183+
False,
1184+
"",
1185+
)
10651186
metrics_port = get_config_variable(
10661187
"CONNECTOR_METRICS_PORT",
10671188
["connector", "metrics_port"],
@@ -1102,7 +1223,11 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
11021223
# For retro compatibility
11031224

11041225
self.metric = OpenCTIMetricHandler(
1105-
self.connector_logger, expose_metrics, metrics_port
1226+
self.connector_logger,
1227+
expose_metrics,
1228+
metrics_namespace,
1229+
metrics_subsystem,
1230+
metrics_port,
11061231
)
11071232
# Register the connector in OpenCTI
11081233
self.connector = OpenCTIConnector(
@@ -1226,6 +1351,11 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
12261351
self.listen_queue = None
12271352

12281353
def stop(self) -> None:
1354+
"""Stop the connector and clean up resources.
1355+
1356+
This method stops all running threads (listen queue, ping thread) and
1357+
unregisters the connector from OpenCTI.
1358+
"""
12291359
self.connector_logger.info("Preparing connector for clean shutdown")
12301360
if self.listen_queue:
12311361
self.listen_queue.stop()
@@ -1235,9 +1365,20 @@ def stop(self) -> None:
12351365
self.api.connector.unregister(self.connector_id)
12361366

12371367
def get_name(self) -> Optional[Union[bool, int, str]]:
1368+
"""Get the connector name.
1369+
1370+
:return: The name of the connector
1371+
:rtype: Optional[Union[bool, int, str]]
1372+
"""
12381373
return self.connect_name
12391374

12401375
def get_stream_collection(self):
1376+
"""Get the stream collection configuration.
1377+
1378+
:return: Stream collection configuration dictionary
1379+
:rtype: dict
1380+
:raises ValueError: If no stream is connected
1381+
"""
12411382
if self.connect_live_stream_id is not None:
12421383
if self.connect_live_stream_id in ["live", "raw"]:
12431384
return {
@@ -1272,12 +1413,27 @@ def get_stream_collection(self):
12721413
raise ValueError("This connector is not connected to any stream")
12731414

12741415
def get_only_contextual(self) -> Optional[Union[bool, int, str]]:
1416+
"""Get the only_contextual configuration value.
1417+
1418+
:return: Whether the connector processes only contextual data
1419+
:rtype: Optional[Union[bool, int, str]]
1420+
"""
12751421
return self.connect_only_contextual
12761422

12771423
def get_run_and_terminate(self) -> Optional[Union[bool, int, str]]:
1424+
"""Get the run_and_terminate configuration value.
1425+
1426+
:return: Whether the connector should run once and terminate
1427+
:rtype: Optional[Union[bool, int, str]]
1428+
"""
12781429
return self.connect_run_and_terminate
12791430

12801431
def get_validate_before_import(self) -> Optional[Union[bool, int, str]]:
1432+
"""Get the validate_before_import configuration value.
1433+
1434+
:return: Whether to validate data before importing
1435+
:rtype: Optional[Union[bool, int, str]]
1436+
"""
12811437
return self.connect_validate_before_import
12821438

12831439
def set_state(self, state) -> None:
@@ -1308,6 +1464,11 @@ def get_state(self) -> Optional[Dict]:
13081464
return None
13091465

13101466
def force_ping(self):
1467+
"""Force a ping to the OpenCTI API to update connector state.
1468+
1469+
This method manually triggers a ping to synchronize the connector state
1470+
with the OpenCTI platform.
1471+
"""
13111472
try:
13121473
initial_state = self.get_state()
13131474
connector_info = self.connector_info.all_details
@@ -1720,12 +1881,27 @@ def listen_stream(
17201881
return self.listen_stream
17211882

17221883
def get_opencti_url(self) -> Optional[Union[bool, int, str]]:
1884+
"""Get the OpenCTI URL.
1885+
1886+
:return: The URL of the OpenCTI platform
1887+
:rtype: Optional[Union[bool, int, str]]
1888+
"""
17231889
return self.opencti_url
17241890

17251891
def get_opencti_token(self) -> Optional[Union[bool, int, str]]:
1892+
"""Get the OpenCTI API token.
1893+
1894+
:return: The API token for OpenCTI authentication
1895+
:rtype: Optional[Union[bool, int, str]]
1896+
"""
17261897
return self.opencti_token
17271898

17281899
def get_connector(self) -> OpenCTIConnector:
1900+
"""Get the OpenCTIConnector instance.
1901+
1902+
:return: The OpenCTIConnector instance
1903+
:rtype: OpenCTIConnector
1904+
"""
17291905
return self.connector
17301906

17311907
def date_now(self) -> str:
@@ -2275,6 +2451,17 @@ def get_attribute_in_mitre_extension(key, object) -> any:
22752451
return None
22762452

22772453
def get_data_from_enrichment(self, data, standard_id, opencti_entity):
2454+
"""Extract STIX entity and objects from enrichment data.
2455+
2456+
:param data: The enrichment data containing a bundle
2457+
:type data: dict
2458+
:param standard_id: The STIX standard ID of the entity
2459+
:type standard_id: str
2460+
:param opencti_entity: The OpenCTI entity object
2461+
:type opencti_entity: dict
2462+
:return: Dictionary containing stix_entity and stix_objects
2463+
:rtype: dict
2464+
"""
22782465
bundle = data.get("bundle", None)
22792466
# Extract main entity from bundle in case of playbook
22802467
if bundle is None:

0 commit comments

Comments
 (0)