|
8 | 8 | import pytz |
9 | 9 |
|
10 | 10 | datefinder.ValueError = ValueError, OverflowError |
| 11 | +import stix2 |
11 | 12 | from stix2 import ObjectPath, ObservationExpression, EqualityComparisonExpression, HashConstant |
12 | 13 |
|
13 | 14 | utc = pytz.UTC |
14 | 15 |
|
| 16 | +# TODO: update this mapping with all the known OpenCTI types |
| 17 | +# the ones below were taken from the misp connector |
| 18 | +STIX2OPENCTI = { |
| 19 | + 'file:hashes.md5': 'File-MD5', |
| 20 | + 'file:hashes.sha1': 'File-SHA1', |
| 21 | + 'file:hashes.sha256': 'File-SHA256', |
| 22 | + 'ipv4-addr:value': 'IPv4-Addr', |
| 23 | + 'domain:value': 'Domain', |
| 24 | + 'url:value': 'URL', |
| 25 | +} |
| 26 | + |
15 | 27 |
|
16 | 28 | class OpenCTIStix2: |
17 | 29 | """ |
@@ -881,17 +893,48 @@ def export_stix_observable(self, entity): |
881 | 893 | return self.prepare_export(entity, final_stix_observable) |
882 | 894 |
|
883 | 895 | def create_indicator(self, stix_object, update=False): |
| 896 | + indicator_type = None |
| 897 | + indicator_value = None |
| 898 | + |
| 899 | + # check the custom stix2 fields |
884 | 900 | if 'x_opencti_observable_type' in stix_object and 'x_opencti_observable_value' in stix_object: |
| 901 | + indicator_type = stix_object['x_opencti_observable_type'] |
| 902 | + indicator_value = stix_object['x_opencti_observable_value'] |
| 903 | + else: |
| 904 | + # check if the indicator is a 'simple' type (i.e it only has exactly one "Comparison Expression") |
| 905 | + # there is no good way of checking this, so this is this is done by using the stix pattern parser, and |
| 906 | + # checking that the pattern's operator is '=' |
| 907 | + # The following pattern will be used for reference: |
| 908 | + # [file:hashes.md5 = 'd41d8cd98f00b204e9800998ecf8427e'] |
| 909 | + pattern = stix2.pattern_visitor.create_pattern_object(stix_object['pattern']) |
| 910 | + if pattern.operand.operator == '=': |
| 911 | + |
| 912 | + # get the object type (here 'file') and check that it is a standard observable type |
| 913 | + object_type = pattern.operand.lhs.object_type_name |
| 914 | + if object_type in stix2.OBJ_MAP_OBSERVABLE: |
| 915 | + |
| 916 | + # get the left hand side as string and use it for looking up the correct OpenCTI name |
| 917 | + lhs = str(pattern.operand.lhs) # this is "file:hashes.md5" from the reference pattern |
| 918 | + if lhs in STIX2OPENCTI: |
| 919 | + # the type and value can now be set |
| 920 | + indicator_type = STIX2OPENCTI[lhs] |
| 921 | + indicator_value = pattern.operand.rhs.value |
| 922 | + |
| 923 | + # check that the indicator type and value have been set before creating the indicator |
| 924 | + if indicator_type and indicator_value: |
885 | 925 | return self.opencti.create_stix_observable_if_not_exists( |
886 | | - stix_object['x_opencti_observable_type'], |
887 | | - stix_object['x_opencti_observable_value'], |
| 926 | + indicator_type, |
| 927 | + indicator_value, |
888 | 928 | self.convert_markdown(stix_object['description']) if 'description' in stix_object else '', |
889 | 929 | stix_object['x_opencti_id'] if 'x_opencti_id' in stix_object else None, |
890 | 930 | stix_object['id'] if 'id' in stix_object else None, |
891 | 931 | stix_object['created'] if 'created' in stix_object else None, |
892 | 932 | stix_object['modified'] if 'modified' in stix_object else None, |
893 | 933 | ) |
894 | | - # TODO: Implement extraction of observables from STIX2 patterns |
| 934 | + else: |
| 935 | + # log that the indicator could not be parsed |
| 936 | + logging.info(" Cannot handle indicator: {id}".format(id=stix_object['stix_id'])) |
| 937 | + |
895 | 938 | return None |
896 | 939 |
|
897 | 940 | def export_stix_relation(self, entity): |
|
0 commit comments