Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pycti/api/opencti_api_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def list(self) -> Dict:
push
push_exchange
push_routing
dead_letter_routing
}
}
}
Expand Down
40 changes: 30 additions & 10 deletions pycti/utils/opencti_stix2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
import traceback
import uuid
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import datefinder
import dateutil.parser
Expand All @@ -32,6 +32,7 @@
STIX_CORE_OBJECTS,
STIX_CYBER_OBSERVABLE_MAPPING,
STIX_META_OBJECTS,
OpenCTIStix2Utils,
)

datefinder.ValueError = ValueError, OverflowError
Expand Down Expand Up @@ -196,7 +197,7 @@ def import_bundle_from_file(
file_path: str,
update: bool = False,
types: List = None,
) -> Optional[List]:
) -> Optional[Tuple[list, list]]:
"""import a stix2 bundle from a file

:param file_path: valid path to the file
Expand All @@ -221,7 +222,8 @@ def import_bundle_from_json(
update: bool = False,
types: List = None,
work_id: str = None,
) -> List:
objects_max_refs: int = 0,
) -> Tuple[list, list]:
"""import a stix2 bundle from JSON data

:param json_data: JSON data
Expand All @@ -231,11 +233,13 @@ def import_bundle_from_json(
:param types: list of stix2 types, defaults to None
:type types: list, optional
:param work_id work_id: str, optional
:return: list of imported stix2 objects
:rtype: List
:param objects_max_refs: max deps amount of objects, reject object import if larger than configured amount
:type objects_max_refs: int, optional
:return: list of imported stix2 objects and a list of stix2 objects with too many deps
:rtype: Tuple[List,List]
"""
data = json.loads(json_data)
return self.import_bundle(data, update, types, work_id)
return self.import_bundle(data, update, types, work_id, objects_max_refs)

def resolve_author(self, title: str) -> Optional[Identity]:
if "fireeye" in title.lower() or "mandiant" in title.lower():
Expand Down Expand Up @@ -3060,7 +3064,8 @@ def import_bundle(
update: bool = False,
types: List = None,
work_id: str = None,
) -> List:
objects_max_refs: int = 0,
) -> Tuple[list, list]:
# Check if the bundle is correctly formatted
if "type" not in stix_bundle or stix_bundle["type"] != "bundle":
raise ValueError("JSON data type is not a STIX2 bundle")
Expand Down Expand Up @@ -3094,12 +3099,27 @@ def import_bundle(

# Import every element in a specific order
imported_elements = []
too_large_elements_bundles = []
for bundle in bundles:
for item in bundle["objects"]:
self.import_item(item, update, types, 0, work_id)
imported_elements.append({"id": item["id"], "type": item["type"]})
# If item is considered too large, meaning that it has a number of refs higher than inputted objects_max_refs, do not import it
nb_refs = OpenCTIStix2Utils.compute_object_refs_number(item)
if 0 < objects_max_refs <= nb_refs:
self.opencti.work.report_expectation(
work_id,
{
"error": "Too large element in bundle",
"source": "Element "
+ item["id"]
+ " is too large and couldn't be processed",
},
)
too_large_elements_bundles.append(item)
else:
self.import_item(item, update, types, 0, work_id)
imported_elements.append({"id": item["id"], "type": item["type"]})

return imported_elements
return imported_elements, too_large_elements_bundles

@staticmethod
def put_attribute_in_extension(
Expand Down
7 changes: 6 additions & 1 deletion pycti/utils/opencti_stix2_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def enlist_element(
)
else:
is_compatible = is_id_supported(item_id)

if is_compatible:
self.elements.append(item)
else:
Expand Down Expand Up @@ -262,7 +263,11 @@ def by_dep_size(elem):
)
)

return number_expectations, self.incompatible_items, bundles
return (
number_expectations,
self.incompatible_items,
bundles,
)

@deprecated("Use split_bundle_with_expectations instead")
def split_bundle(self, bundle, use_json=True, event_version=None) -> list:
Expand Down
14 changes: 14 additions & 0 deletions pycti/utils/opencti_stix2_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,17 @@ def retrieveClassForMethod(
if hasattr(attribute, method):
return attribute
return None

@staticmethod
def compute_object_refs_number(entity: Dict):
refs_number = 0
for key in list(entity.keys()):
if key.endswith("_refs") and entity[key] is not None:
refs_number += len(entity[key])
elif key.endswith("_ref"):
refs_number += 1
elif key == "external_references" and entity[key] is not None:
refs_number += len(entity[key])
elif key == "kill_chain_phases" and entity[key] is not None:
refs_number += len(entity[key])
return refs_number
2 changes: 1 addition & 1 deletion tests/02-integration/entities/test_malware.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def test_malware_import_with_sample_refs(api_client):
with open("tests/data/basicMalwareWithSample.json", "r") as content_file:
content = content_file.read()

imported_malware_bundle = api_client.stix2.import_bundle_from_json(
imported_malware_bundle, _ = api_client.stix2.import_bundle_from_json(
json_data=content
)
assert imported_malware_bundle is not None
Expand Down
2 changes: 1 addition & 1 deletion tests/02-integration/utils/test_stix_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_entity_create(entity_class, api_stix, opencti_splitter):
stix_object = stix_class(**class_data)
bundle = Bundle(objects=[stix_object]).serialize()
split_bundle = opencti_splitter.split_bundle(bundle, True, None)[0]
bundles_sent = api_stix.import_bundle_from_json(split_bundle, False, None, None)
bundles_sent, _ = api_stix.import_bundle_from_json(split_bundle, False, None, None)

assert len(bundles_sent) == 1
assert bundles_sent[0]["id"] == stix_object["id"]
Expand Down