Skip to content
This repository was archived by the owner on Nov 29, 2025. It is now read-only.

Commit e1c3cc9

Browse files
author
Samuel Hassine
committed
[connectors] Add methods to split STIX2 bundles
1 parent a201f28 commit e1c3cc9

File tree

2 files changed

+139
-11
lines changed

2 files changed

+139
-11
lines changed

pycti/opencti_connector_helper.py

Lines changed: 138 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import json
66
import base64
7+
import uuid
78
from stix2validator import validate_string
89

910
EXCHANGE_NAME = 'amqp.opencti'
@@ -29,6 +30,7 @@ def __init__(self, identifier, connector_config, rabbitmq_config, log_level='inf
2930

3031
# Initialize configuration
3132
self.connection = None
33+
self.channel = None
3234
self.identifier = identifier
3335
self.config = connector_config
3436
self.rabbitmq_hostname = rabbitmq_config['hostname']
@@ -38,18 +40,15 @@ def __init__(self, identifier, connector_config, rabbitmq_config, log_level='inf
3840
self.queue_name = 'import-connectors-' + self.identifier
3941
self.routing_key = 'import.connectors.' + self.identifier
4042

41-
# Encode the configuration
42-
config_encoded = base64.b64encode(json.dumps(self.config).encode('utf-8')).decode('utf-8')
43-
4443
# Connect to RabbitMQ
4544
self.connection = self._connect()
4645
self.channel = self._create_channel()
46+
self._create_queue()
4747
logging.info('Successfully connected to RabbitMQ')
4848

49-
# Declare the queue for the connector
50-
self.channel.queue_delete(self.queue_name)
51-
self.channel.queue_declare(self.queue_name, durable=True, arguments={'config': config_encoded})
52-
self.channel.queue_bind(queue=self.queue_name, exchange=EXCHANGE_NAME, routing_key=self.routing_key)
49+
# Initialize caching
50+
self.cache_index = {}
51+
self.cache_added = []
5352

5453
def _connect(self):
5554
try:
@@ -67,11 +66,24 @@ def _create_channel(self):
6766
except:
6867
logging.error('Unable to open channel to RabbitMQ with the given parameters')
6968

69+
def _create_queue(self):
70+
if self.channel is not None:
71+
config_encoded = base64.b64encode(json.dumps(self.config).encode('utf-8')).decode('utf-8')
72+
self.channel.queue_delete(self.queue_name)
73+
self.channel.queue_declare(self.queue_name, durable=True, arguments={'config': config_encoded})
74+
self.channel.queue_bind(queue=self.queue_name, exchange=EXCHANGE_NAME, routing_key=self.routing_key)
75+
7076
def _reconnect(self):
7177
self.connection = self._connect()
7278
self.channel = self._create_channel()
79+
self._create_queue()
7380

7481
def send_stix2_bundle(self, bundle, entities_types=[]):
82+
bundles = self.split_stix2_bundle(bundle)
83+
for bundle in bundles:
84+
self._send_bundle('stix2-bundle', bundle, entities_types)
85+
86+
def _send_bundle(self, type, bundle, entities_types=[]):
7587
"""
7688
This method send a STIX2 bundle to RabbitMQ to be consumed by workers
7789
:param bundle: A valid STIX2 bundle
@@ -87,11 +99,127 @@ def send_stix2_bundle(self, bundle, entities_types=[]):
8799

88100
# Prepare the message
89101
message = {
90-
'type': 'stix2-bundle',
102+
'type': type,
91103
'entities_types': entities_types,
92104
'content': base64.b64encode(bundle.encode('utf-8')).decode('utf-8')
93105
}
94106

95107
# Send the message
96-
self.channel.basic_publish(EXCHANGE_NAME, self.routing_key, json.dumps(message))
97-
logging.info('STIX2 bundle has been sent')
108+
try:
109+
self.channel.basic_publish(EXCHANGE_NAME, self.routing_key, json.dumps(message))
110+
logging.info('Bundle has been sent')
111+
except:
112+
logging.error('Unable to send bundle, reconnecting and resending...')
113+
self._reconnect()
114+
self.channel.basic_publish(EXCHANGE_NAME, self.routing_key, json.dumps(message))
115+
116+
def split_stix2_bundle(self, bundle):
117+
self.cache_index = {}
118+
self.cache_added = []
119+
bundle_data = json.loads(bundle)
120+
121+
# Index all objects by id
122+
for item in bundle_data['objects']:
123+
self.cache_index[item['id']] = item
124+
125+
bundles = []
126+
# Reports must be handled because of object_refs
127+
for item in bundle_data['objects']:
128+
if item['type'] == 'report':
129+
items_to_send = self.stix2_deduplicate_objects(self.stix2_get_report_objects(item))
130+
for item_to_send in items_to_send:
131+
self.cache_added.append(item_to_send['id'])
132+
bundles.append(self.stix2_create_bundle(items_to_send))
133+
134+
# Relationships not added in previous reports
135+
for item in bundle_data['objects']:
136+
if item['type'] == 'relationship' and item['id'] not in self.cache_added:
137+
items_to_send = self.stix2_deduplicate_objects(self.stix2_get_relationship_objects(item))
138+
for item_to_send in items_to_send:
139+
self.cache_added.append(item_to_send['id'])
140+
bundles.append(self.stix2_create_bundle(items_to_send))
141+
142+
# Entities not added in previous reports and relationships
143+
for item in bundle_data['objects']:
144+
if item['type'] != 'relationship' and item['id'] not in self.cache_added:
145+
items_to_send = self.stix2_deduplicate_objects(self.stix2_get_entity_objects(item))
146+
for item_to_send in items_to_send:
147+
self.cache_added.append(item_to_send['id'])
148+
bundles.append(self.stix2_create_bundle(items_to_send))
149+
150+
return bundles
151+
152+
def stix2_create_bundle(self, items):
153+
bundle = {
154+
'type': 'bundle',
155+
'id': 'bundle--' + str(uuid.uuid4()),
156+
'spec_version': '2.0',
157+
'objects': items
158+
}
159+
return json.dumps(bundle)
160+
161+
def stix2_get_embedded_objects(self, item):
162+
# Marking definitions
163+
object_marking_refs = []
164+
if 'object_marking_refs' in item:
165+
for object_marking_ref in item['object_marking_refs']:
166+
object_marking_refs.append(self.cache_index[object_marking_ref])
167+
# Created by ref
168+
created_by_ref = None
169+
if 'created_by_ref' in item:
170+
created_by_ref = self.cache_index[item['created_by_ref']]
171+
172+
return {'object_marking_refs': object_marking_refs, 'created_by_ref': created_by_ref}
173+
174+
def stix2_get_entity_objects(self, entity):
175+
items = [entity]
176+
# Get embedded objects
177+
embedded_objects = self.stix2_get_embedded_objects(entity)
178+
# Add created by ref
179+
if embedded_objects['created_by_ref'] is not None:
180+
items.append(embedded_objects['created_by_ref'])
181+
# Add marking definitions
182+
if len(embedded_objects['object_marking_refs']) > 0:
183+
items = items + embedded_objects['object_marking_refs']
184+
185+
return items
186+
187+
def stix2_get_relationship_objects(self, relationship):
188+
items = [relationship]
189+
# Get source ref
190+
items.append(self.cache_index[relationship['source_ref']])
191+
192+
# Get target ref
193+
items.append(self.cache_index[relationship['target_ref']])
194+
195+
# Get embedded objects
196+
embedded_objects = self.stix2_get_embedded_objects(relationship)
197+
# Add created by ref
198+
if embedded_objects['created_by_ref'] is not None:
199+
items.append(embedded_objects['created_by_ref'])
200+
# Add marking definitions
201+
if len(embedded_objects['object_marking_refs']) > 0:
202+
items = items + embedded_objects['object_marking_refs']
203+
204+
return items
205+
206+
def stix2_get_report_objects(self, report):
207+
items = [report]
208+
# Add all object refs
209+
for object_ref in report['object_refs']:
210+
items.append(self.cache_index[object_ref])
211+
for item in items:
212+
if item['type'] == 'relationship':
213+
items = items + self.stix2_get_relationship_objects(item)
214+
else:
215+
items = items + self.stix2_get_entity_objects(item)
216+
return items
217+
218+
def stix2_deduplicate_objects(self, items):
219+
ids = []
220+
final_items = []
221+
for item in items:
222+
if item['id'] not in ids:
223+
final_items.append(item)
224+
ids.append(item['id'])
225+
return final_items

pycti/opencti_stix2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,7 @@ def import_bundle(self, stix_bundle, update=False, types=[]):
11611161

11621162
start_time = time.time()
11631163
for item in stix_bundle['objects']:
1164-
if item['type'] == 'identity' and (len(types) == 0 or 'identity' in types or item['x_opencti_identity_type'] in types):
1164+
if item['type'] == 'identity' and (len(types) == 0 or 'identity' in types or ('x_opencti_identity_type' in item and item['x_opencti_identity_type'] in types)):
11651165
self.import_object(item, update)
11661166
end_time = time.time()
11671167
logging.info("Identities imported in: %ssecs" % round(end_time - start_time))

0 commit comments

Comments
 (0)