diff --git a/scapy/contrib/dtn/bpv7.py b/scapy/contrib/dtn/bpv7.py new file mode 100644 index 00000000000..b70fe7664e4 --- /dev/null +++ b/scapy/contrib/dtn/bpv7.py @@ -0,0 +1,940 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information + +# scapy.contrib.description = Bundle Protocol version 7 (BPv7) +# scapy.contrib.status = loads + +""" +Bundle Protocol version 7 (BPv7) layer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:authors: Timothy Recker, timothy.recker@nasa.gov + Tad Kollar, tad.kollar@nasa.gov +""" + +from scapy.packet import Packet +from scapy.fields import ( + PacketField, + MultipleTypeField, + BitEnumField, + PacketListField, + ConditionalField, + FieldListField, + BitField, + BitFieldLenField, +) +from scapy.all import raw +from scapy.contrib.dtn.cbor import ( + CBORInteger, + CBORByteString, + CBORIntOrText, + CBORArray, + CBORNull, + CBORStopCode, + CBORAny, + CBORPacketField, + CBORPacketFieldWithRemain, +) +import scapy.contrib.dtn.common as Common + +import time +import crcmod.predefined +from enum import IntFlag +from typing import Tuple, List +import re + + +class InvalidCRCType(Exception): + """ + Exception raised when an invalid CRC type code is + encountered. + + Attributes: + type_code: the invalid type code + """ + + def __init__(self, type_code): + super().__init__( + f"Tried to compute a CRC using an invalid type code: {type_code}" + ) + + +def compute_crc(crc_type: int, pkt: bytes, ignore_existing: bool = True): + # prepare parameters + if crc_type == CrcTypes.CRC32C: + size = 4 + crcfun = crcmod.predefined.mkCrcFun("crc-32c") + elif crc_type == CrcTypes.CRC16: + size = 2 + crcfun = crcmod.predefined.mkCrcFun("x-25") + else: + raise InvalidCRCType(crc_type) + + crc_index = len(pkt) - size + + # Wipe anything in existing crc field + if ignore_existing: + pkt = pkt[:crc_index] + b"\x00" * size + + return crcfun(pkt).to_bytes(size, "big"), crc_index + + +class PacketFieldWithRemain(PacketField): + """ + The regular Packet.getfield() never returns the remaining bytes, so the CRC or + other following fields get lost. This getfield does return the remaining bytes. + """ + + def getfield(self, pkt: Packet, s: bytes) -> Tuple[bytes, Packet]: + i = self.m2i(pkt, s) + remain_size = len(s) - len(raw(i)) + remain = s[-remain_size:] + return remain, i + + +class IPN(CBORArray): + fields_desc = CBORArray.fields_desc + [ + CBORInteger("node_id", 1), + CBORInteger("service_number", 1), + ] + + ipn_re = re.compile(r"ipn:(.*)\.(.*)") + + # pylint: disable=W0201 + # field (instance variable) initialization is handled via "fields_desc" + def from_string(self, ipn_str: str): + result = IPN.ipn_re.search(ipn_str) + self.node_id = int(result.group(1)) + self.service_number = int(result.group(2)) + + return self + + def __str__(self): + return f"ipn:{self.node_id}.{self.service_number}" # noqa: E231 + + def __eq__(self, other): + if isinstance(other, IPN): + return (self.node_id == other.node_id) and ( + self.service_number == other.service_number + ) + return False + + +class DTN(Packet): + fields_desc = [ + CBORIntOrText("uri", 0) # can be 0 (type Int) or a string (type String) + ] + + def extract_padding(self, s): + return "", s + + def __eq__(self, other): + if isinstance(other, DTN): + return self.uri == other.uri + return False + + +class EndpointID(CBORArray): + fields_desc = CBORArray.fields_desc + [ + CBORInteger("scheme_code", 2), + MultipleTypeField( + [ + ( + PacketFieldWithRemain("ssp", DTN(), DTN), + lambda pkt: pkt.scheme_code == 1, + ), + ( + PacketFieldWithRemain("ssp", IPN(), IPN), + lambda pkt: pkt.scheme_code == 2, + ), + ], + PacketFieldWithRemain("ssp", IPN(), IPN), + ), + ] + + def dissect(self, s: bytes): + """This dissect doesn't process the payload, because there is none.""" + s = self.pre_dissect(s) + s = self.do_dissect(s) + s = self.post_dissect(s) + + def __eq__(self, other): + if isinstance(other, EndpointID): + return (self.scheme_code == other.scheme_code) and (self.ssp == other.ssp) + return False + + +# pylint: disable=R0903 +# Packet types are not intended to have many(any) public functions +class Timestamp(CBORArray): + fields_desc = CBORArray.fields_desc + [CBORInteger("t", 0), CBORInteger("seq", 0)] + + def __ge__(self, other): + if isinstance(other, Timestamp): + return (self.t >= other.t) and (self.seq >= other.seq) + return False + + def __gt__(self, other): + if isinstance(other, Timestamp): + return (self.t > other.t) or (self.t == other.t and (self.seq > other.seq)) + return False + + +class BlockTypes: + """ + Bundle block type codes + """ + + PRIMARY = 0 + PAYLOAD = 1 + AUTHENTICATION = 2 + INTEGRITY = 3 + CONFIDENTIALITY = 4 + PREV_HOP = 5 + PREV_NODE = 6 + AGE = 7 + HOP_COUNT = 10 + BLOCK_INTEGRITY = 11 + BLOCK_CONFIDENTIALITY = 12 + + +class CrcTypes: + """ + Bundle CRC type codes + """ + + NONE = 0 + CRC16 = 1 + CRC32C = 2 + + +# pylint: disable=R0903 +# Packet types are not intended to have many(any) public functions +class SecurityTargets(CBORArray): + fields_desc = [ + CBORArray._major_type, + BitFieldLenField("add", 0, 5, count_of="targets"), + FieldListField( + "targets", [0], CBORInteger("tgt", 0), count_from=lambda pkt: pkt.add + ), + ] + + def count_additional_fields(self): + return len(self.getfieldval("targets")) + + +class CBORTuple(CBORArray): + """ + A pair of CBOR integers consisting of [id, value]. + """ + + fields_desc = CBORArray.fields_desc + [ + CBORInteger("id", 0), + CBORAny("value", b"\x00"), + ] + + +class CBORTupleArray(CBORArray): + fields_desc = [ + CBORArray._major_type, + BitFieldLenField("add", None, 5, count_of="tuples"), + PacketListField( + "tuples", [CBORTuple()], CBORTuple, count_from=lambda pkt: pkt.add + ), + ] + + def find_value_with_id(self, target_id: int): + """Find the tuple with the specified id and return the value.""" + try: + tup = next(x for x in self.tuples if x.id == target_id) + except StopIteration: + return None + + return tup.value + + def count_additional_fields(self) -> int: + return len(self.getfieldval("tuples")) + + +class SecurityResults(CBORArray): + """A CBOR array of CBORTupleArrays.""" + + fields_desc = [ + CBORArray._major_type, + BitFieldLenField("add", None, 5, count_of="results"), + PacketListField( + "results", + [CBORTupleArray()], + CBORTupleArray, + count_from=lambda pkt: pkt.add, + ), + ] + + def count_additional_fields(self): + return len(self.getfieldval("results")) + + +class AbstractSecurityBlock(Packet): + """ + The structure of the security-specific parts of the BIB and BCB are identical + and are defined here. This structure will reside in the block-specific data + field of a BPv7 canonical block. + """ + + fields_desc = [ + PacketField("security_targets", SecurityTargets(), SecurityTargets), + CBORInteger("security_context_id", 0), + CBORInteger("security_context_flags", 1), + PacketField("security_source", EndpointID(), EndpointID), + ConditionalField( + PacketField( + "security_context_parameters", CBORTupleArray(), CBORTupleArray + ), + lambda p: (p.security_context_flags & 1), + ), + PacketField("security_results", SecurityResults(), SecurityResults), + ] + + def dissect(self, s: bytes): + """This dissect doesn't process the payload, because there is none.""" + s = self.pre_dissect(s) + s = self.do_dissect(s) + s = self.post_dissect(s) + + +class CanonicalBlock(CBORArray): + class CtrlFlags(IntFlag): + """ + Block Processing Control Flags + """ + + BLOCK_MUST_BE_REPLICATED = 0x01 + REPORT_IF_UNPROCESSABLE = 0x02 + DELETE_BUNDLE_IF_UNPROCESSED = 0x04 + DISCARD_IF_NOT_PROCESSED = 0x010 + + TypeCodes = { + BlockTypes.PAYLOAD: "payload", + BlockTypes.AUTHENTICATION: "authentication", + BlockTypes.INTEGRITY: "integrity", + BlockTypes.CONFIDENTIALITY: "confidentiality", + BlockTypes.PREV_HOP: "prev_hop", + BlockTypes.PREV_NODE: "prev_node", + BlockTypes.AGE: "age", + BlockTypes.HOP_COUNT: "hop_count", + BlockTypes.BLOCK_INTEGRITY: "block_integrity", + BlockTypes.BLOCK_CONFIDENTIALITY: "block_confidentiality", + } + + fields_template: Common.FieldsTemplate = { + "type_code": BitEnumField("type_code", BlockTypes.PAYLOAD, 8, TypeCodes), + "block_number": CBORInteger("block_number", 1), + "flags": CBORInteger("flags", 0), + "crc_type": CBORInteger("crc_type", CrcTypes.CRC32C), + "data": CBORByteString("data", b"\xde\xad\xbe\xef"), + "crc": ConditionalField( + MultipleTypeField( + [ + ( + CBORByteString("crc", b"\x00\x00"), + lambda pkt: pkt.crc_type == CrcTypes.CRC16, + ), + ( + CBORByteString("crc", b"\x00\x00\x00\x00"), + lambda pkt: pkt.crc_type == CrcTypes.CRC32C, + ), + ], + CBORNull("crc", None), + ), + lambda pkt: pkt.crc_type != CrcTypes.NONE, + ), + } + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + encrypted = False + encrypted_by = [] + + def get_header(self) -> bytes: + return raw(self)[1:4] + + def post_dissect(self, s): + """ + Because some block elements--such as CRCs and CBOR array headers--are added to + the raw representation via overriding the post_build method (and correspondingly + removed during pre_dissect), the raw packet cache must be cleared. Otherwise, + some important methods will be broken for Blocks built from sniffed packets; + for example, `raw(Bundle(raw_bytes_received_from_socket))` will not produce + valid bundle bytes. See the comment linked below and the subsequent comment + with a solution copied here. + + https://github.com/secdev/scapy/issues/1021#issuecomment-704472941 + """ + self.raw_packet_cache = None # Reset packet to allow post_build + return s + + def post_build(self, pkt, pay): + pkt = self.set_additional_fields(pkt) + + if self.crc_type != CrcTypes.NONE: + crc, index = compute_crc(self.crc_type, pkt) + pkt = pkt[:index] + crc + + return pkt + pay + + def get_block_bytes(self) -> bytes: + return raw(self) + + def count_additional_fields(self): + return 5 if self.crc_type == CrcTypes.NONE else 6 + + +class PayloadBlock(CanonicalBlock): + """ + Contains the bundle payload. + """ + + fields_template = Common.template_replace( + CanonicalBlock.fields_template, + { + "type_code": BitEnumField( + "type_code", BlockTypes.PAYLOAD, 8, CanonicalBlock.TypeCodes + ) + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + + +# pylint: disable=R0901 +class EncryptedPayloadBlock(PayloadBlock): + """ + Contains the bundle payload. The data field is encrypted. + """ + + encrypted = True + + +class PreviousNodeBlock(CanonicalBlock): + """ + Contains the ID of the node that forwarded this bundle. + """ + + fields_template = Common.template_replace( + CanonicalBlock.fields_template, + { + "type_code": BitEnumField( + "type_code", BlockTypes.PREV_NODE, 8, CanonicalBlock.TypeCodes + ), + "data": CBORPacketField("data", EndpointID(), EndpointID), + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + + +# pylint: disable=R0901 +class EncryptedPreviousNodeBlock(PreviousNodeBlock): + """ + Contains the ID of the node that forwarded this bundle. The data field is encrypted. + """ + + fields_template = Common.template_replace( + PreviousNodeBlock.fields_template, + { + # The data field definition from the parent class cannot be used here. + # That data is now encrypted and cannot be decrypted to its original bytes + # (within the scope of this module), so the Packet that it represents + # cannot be dissected. + # Instead, the data field definition from CanonicalBlock is used. + "data": CBORByteString("data", b"\xde\xad\xbe\xef") + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + encrypted = True + + +class BundleAge(Packet): + fields_desc = [CBORInteger("age", 0)] + + def __eq__(self, other): + if isinstance(other, BundleAge): + return self.age == other.age + return False + + def dissect(self, s: bytes): + """This dissect doesn't process the payload, because there is none.""" + s = self.pre_dissect(s) + s = self.do_dissect(s) + s = self.post_dissect(s) + + +class BundleAgeBlock(CanonicalBlock): + """ + Contains the number of milliseconds that have elapsed between the time the + bundle was created and the time at which it was most recently forwarded. + """ + + fields_template = Common.template_replace( + CanonicalBlock.fields_template, + { + "type_code": BitEnumField( + "type_code", BlockTypes.AGE, 8, CanonicalBlock.TypeCodes + ), + "data": CBORPacketFieldWithRemain("data", BundleAge(), BundleAge), + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + + +# pylint: disable=R0901 +class EncryptedBundleAgeBlock(BundleAgeBlock): + """ + Contains the number of milliseconds that have elapsed between the time the + bundle was created and the time at which it was most recently forwarded. + The data field is encrypted. + """ + + fields_template = Common.template_replace( + BundleAgeBlock.fields_template, + {"data": CBORByteString("data", b"\xde\xad\xbe\xef")}, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + encrypted = True + + +class HopCount(CBORArray): + fields_desc = CBORArray.fields_desc + [ + CBORInteger("limit", 0), + CBORInteger("count", 0), + ] + + def __eq__(self, other): + if isinstance(other, HopCount): + return (self.limit == other.limit) and (self.count == other.count) + return False + + +class HopCountBlock(CanonicalBlock): + """ + Contains information on the Bundle's allowed number of hops and the hops that + have already happened. + """ + + fields_template = Common.template_replace( + CanonicalBlock.fields_template, + { + "type_code": BitEnumField( + "type_code", BlockTypes.HOP_COUNT, 8, CanonicalBlock.TypeCodes + ), + "data": CBORPacketField("data", HopCount(), HopCount), + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + + +# pylint: disable=R0901 +class EncryptedHopCountBlock(HopCountBlock): + """ + Contains information on the Bundle's allowed number of hops and the hops that + have already happened. The data field is encrypted. + """ + + fields_template = Common.template_replace( + HopCountBlock.fields_template, + {"data": CBORByteString("data", b"\xde\xad\xbe\xef")}, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + encrypted = True + + +class BlockIntegrityBlock(CanonicalBlock): + """ + This defines a CanonicalBlock with its type code as 11 and an + AbstractSecurityBlock as its data field. + """ + + fields_template = Common.template_replace( + CanonicalBlock.fields_template, + { + "type_code": BitEnumField( + "type_code", BlockTypes.BLOCK_INTEGRITY, 8, CanonicalBlock.TypeCodes + ), + "data": CBORPacketFieldWithRemain( + "data", AbstractSecurityBlock(), AbstractSecurityBlock + ), + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + + +# pylint: disable=R0901 +class EncryptedBlockIntegrityBlock(BlockIntegrityBlock): + """ + This defines a CanonicalBlock with its type code as 11 and an encrypted + AbstractSecurityBlock as its data field. + """ + + fields_template = Common.template_replace( + BlockIntegrityBlock.fields_template, + {"data": CBORByteString("data", b"\xde\xad\xbe\xef")}, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + encrypted = True + + +class BlockConfidentialityBlock(CanonicalBlock): + """ + This defines a CanonicalBlock with its type code as 12 and an + AbstractSecurityBlock as its data field. + """ + + fields_template = Common.template_replace( + CanonicalBlock.fields_template, + { + "type_code": BitEnumField( + "type_code", + BlockTypes.BLOCK_CONFIDENTIALITY, + 8, + CanonicalBlock.TypeCodes, + ), + "data": CBORPacketFieldWithRemain( + "data", AbstractSecurityBlock(), AbstractSecurityBlock + ), + }, + ) + + fields_desc = CBORArray.fields_desc + Common.make_fields_desc(fields_template) + + +class UnassignedExtensionBlock(CanonicalBlock): + """An extension block with an unassigned type code < 192.""" + + +class EncryptedUnassignedExtensionBlock(CanonicalBlock): + """An extension block with an unassigned type code < 192. + The data field is encrypted.""" + + encrypted = True + + +class ReservedExtensionBlock(CanonicalBlock): + """An extension block with a type code 192-255.""" + + +class EncryptedReservedExtensionBlock(CanonicalBlock): + """An extension block with a type code 192-255. The data field is encrypted.""" + + encrypted = True + + +class PrimaryBlock(CBORArray): + class CtrlFlags(IntFlag): + """ + Bundle Processing Control Flags + """ + + BUNDLE_IS_FRAGMENT = 0x01 + ADMIN_RECORD = 0x02 + MUST_NOT_BE_FRAGMENTED = 0x04 + ACKNOWLEDGEMENT_REQUESTED = 0x20 + STATUS_TIME_REQUESTED = 0x40 + + REQUEST_REPORTING_OF_BUNDLE_RECEPTION = 0x4000 + REQUEST_REPORTING_OF_BUNDLE_FORWARDING = 0x10000 + REQUEST_REPORTING_OF_BUNDLE_DELIVERY = 0x20000 + REQUEST_REPORTING_OF_BUNDLE_DELETION = 0x40000 + + fields_desc = CBORArray.fields_desc + [ + CBORInteger("version", 7), + CBORInteger("flags", 0), + CBORInteger("crc_type", CrcTypes.CRC32C), + PacketField("dest", EndpointID(), EndpointID), + PacketField("src", EndpointID(), EndpointID), + PacketField("report", EndpointID(scheme_code=1), EndpointID), + PacketField("creation_timestamp", Timestamp(t=int(time.time())), Timestamp), + CBORInteger("lifetime", 0), + ConditionalField( + CBORInteger("fragment_offset", 0), + lambda pkt: pkt.flags & PrimaryBlock.CtrlFlags.BUNDLE_IS_FRAGMENT, + ), + ConditionalField( + CBORInteger("total_adu_length", 0), + lambda pkt: pkt.flags & PrimaryBlock.CtrlFlags.BUNDLE_IS_FRAGMENT, + ), + ConditionalField( + MultipleTypeField( + [ + ( + CBORByteString("crc", b"\x00\x00"), + lambda pkt: pkt.crc_type == CrcTypes.CRC16, + ), + ( + CBORByteString("crc", b"\x00\x00\x00\x00"), + lambda pkt: pkt.crc_type == CrcTypes.CRC32C, + ), + ], + CBORNull("crc", None), + ), + lambda pkt: pkt.crc_type != CrcTypes.NONE, + ), + ] + + def dissect(self, s: bytes): + """This dissect doesn't process the payload, because there is none.""" + s = self.pre_dissect(s) + s = self.do_dissect(s) + s = self.post_dissect(s) + + def post_dissect(self, s): + # see docstring for equivalent Canonical Block method + self.raw_packet_cache = None # Reset packet to allow post_build + return s + + def post_build(self, pkt, pay): + pkt = self.set_additional_fields(pkt) + + if self.crc_type != CrcTypes.NONE: + # insert crc + crc, index = compute_crc(self.crc_type, pkt) + pkt = pkt[:index] + crc + + return pkt + pay + + def count_additional_fields(self): + count = 8 + if self.crc_type != CrcTypes.NONE: + count += 1 + if self.flags & PrimaryBlock.CtrlFlags.BUNDLE_IS_FRAGMENT: + count += 2 + return count + + +TYPE_CODE_TO_BLOCK_TYPE_MAP = { + # (type_code, is_encrypted): block_type + (BlockTypes.PRIMARY, False): PrimaryBlock, + (BlockTypes.PRIMARY, True): None, # should not happen + (BlockTypes.PAYLOAD, False): PayloadBlock, + (BlockTypes.PAYLOAD, True): EncryptedPayloadBlock, + (BlockTypes.PREV_NODE, False): PreviousNodeBlock, + (BlockTypes.PREV_NODE, True): EncryptedPreviousNodeBlock, + (BlockTypes.AGE, False): BundleAgeBlock, + (BlockTypes.AGE, True): EncryptedBundleAgeBlock, + (BlockTypes.HOP_COUNT, False): HopCountBlock, + (BlockTypes.HOP_COUNT, True): EncryptedHopCountBlock, + (BlockTypes.BLOCK_INTEGRITY, False): BlockIntegrityBlock, + (BlockTypes.BLOCK_INTEGRITY, True): EncryptedBlockIntegrityBlock, + (BlockTypes.BLOCK_CONFIDENTIALITY, False): BlockConfidentialityBlock, + (BlockTypes.BLOCK_CONFIDENTIALITY, True): None, # should not happen +} + +UNENCRYPTED_TO_ENCRYPTED_TYPE_MAP = { + PayloadBlock: EncryptedPayloadBlock, + PreviousNodeBlock: EncryptedPreviousNodeBlock, + BundleAgeBlock: EncryptedBundleAgeBlock, + HopCountBlock: EncryptedHopCountBlock, + BlockIntegrityBlock: EncryptedBlockIntegrityBlock, + UnassignedExtensionBlock: EncryptedUnassignedExtensionBlock, + ReservedExtensionBlock: EncryptedReservedExtensionBlock, +} + +ENCRYPTED_TO_UNENCRYPTED_TYPE_MAP = { + EncryptedPayloadBlock: PayloadBlock, + EncryptedPreviousNodeBlock: PreviousNodeBlock, + EncryptedBundleAgeBlock: BundleAgeBlock, + EncryptedHopCountBlock: HopCountBlock, + EncryptedBlockIntegrityBlock: BlockIntegrityBlock, + EncryptedUnassignedExtensionBlock: UnassignedExtensionBlock, + EncryptedReservedExtensionBlock: ReservedExtensionBlock, +} + + +def next_block_type(pkt, lst, cur, remain): + del pkt, lst, cur # Not used + if remain is None or remain == b"\xff": + return None + return Bundle.identify_block(remain) + + +def guess_block_class(block_bytes, pkt): + del pkt # Not used + if block_bytes is None or block_bytes == b"\xff": + return None + return Bundle.identify_block(block_bytes) + + +class Bundle(CBORArray): + def count_additional_fields(self) -> int: + return 31 + + fields_desc = [ + CBORArray._major_type, + BitField("add", 31, 5), + PacketFieldWithRemain("primary_block", PrimaryBlock(), PrimaryBlock), + PacketListField( + "canonical_blocks", [CanonicalBlock()], next_cls_cb=next_block_type + ), + CBORStopCode("stop_code", 31), + ] + + @staticmethod + def type_code_to_block_type(type_code: int, encrypted: bool = False): + map_key = (type_code, encrypted) + if map_key not in TYPE_CODE_TO_BLOCK_TYPE_MAP: + if type_code < 192: + if encrypted: + return EncryptedUnassignedExtensionBlock + return UnassignedExtensionBlock + if type_code >= 192: + if encrypted: + return EncryptedReservedExtensionBlock + return ReservedExtensionBlock + return CanonicalBlock + + return TYPE_CODE_TO_BLOCK_TYPE_MAP[map_key] + + def find_block_by_type( + self, block_type, excluded_block_nums: List[int] = None + ) -> CanonicalBlock: + """ + Find the first canonical block matching the specified type, + with a block number not in the excluded list. + """ + if excluded_block_nums is None: + excluded_block_nums = [] + try: + block = next( + x + for x in self.canonical_blocks + if isinstance(x, block_type) + and x.block_number not in excluded_block_nums + ) + except StopIteration: + block = None + + return block + + def find_block_by_type_code( + self, type_code: int, excluded_block_nums: List[int] = None + ) -> CanonicalBlock: + """ + Find the first block matching the specified type code, with a block number not + in the excluded list. + """ + if excluded_block_nums is None: + excluded_block_nums = [] + try: + block = next( + x + for x in self.canonical_blocks + if (x.type_code == type_code) + and (x.block_number not in excluded_block_nums) + ) + except StopIteration: + block = None + + return block + + def find_block_by_number(self, block_num: int) -> CanonicalBlock: + """Find the canonical block with the specified block number.""" + try: + block = next( + x for x in self.canonical_blocks if x.block_number == block_num + ) + except StopIteration: + block = None + + return block + + def get_new_block_number(self) -> int: + """Return a new canonical block number one higher than the highest in use.""" + new_num = 2 + + for block in self.canonical_blocks: + if block.block_number >= new_num: + new_num = block.block_number + 1 + + return new_num + + def add_block( + self, + block: CanonicalBlock, + block_num_to_insert_above=1, + select_block_number=False, + ) -> CanonicalBlock: + """Insert an extension block before the block with specified block number.""" + if select_block_number: + block.block_number = self.get_new_block_number() + + insert_pos = -1 + for idx, test_block in enumerate(self.canonical_blocks): + if test_block.block_number == block_num_to_insert_above: + insert_pos = idx + + if insert_pos == -1: + raise ValueError( + "Could not find block number to insert above", block_num_to_insert_above + ) + + self.canonical_blocks.insert(insert_pos, block) + + return block + + def replace_block_by_block_num(self, block_num: int, new_block: CanonicalBlock): + for idx, block in enumerate(self.canonical_blocks): + if block.block_number == block_num: + self.canonical_blocks[idx] = new_block + return + + @staticmethod + def identify_block(block_bytes: bytes): + """Determine the type of the canonical block.""" + type_code = block_bytes[1] + + block_type = Bundle.type_code_to_block_type(type_code) + encrypted_block_type = Bundle.type_code_to_block_type(type_code, True) + + if encrypted_block_type is not block_type: + # Try to construct the block as the unencrypted type. If it + # doesn't work, specify the encrypted version. If it works due to + # chance arrangement of bytes but is actually encrypted, it will + # be corrected by post_dissect() + try: + _ = block_type(block_bytes) + # pylint: disable=W0702, W0718 + # Scapy just raises a generic Exception if it fails + except Exception: + block_type = encrypted_block_type + + return block_type + + def post_dissect(self, s): + """ + Find the BCBs and check their security targets to definitively determine + which blocks are encrypted. + """ + for bcb_block in self.canonical_blocks: + if isinstance(bcb_block, BlockConfidentialityBlock): + for idx, block in enumerate(self.canonical_blocks): + if block.block_number in bcb_block.data.security_targets.targets: + block_type = type(block) + new_block = block + + # Found a block originally detected as unencrypted, but the + # BCB specifies is encrypted. Replace with an encrypted type. + if not block_type.encrypted: + encrypted_type = UNENCRYPTED_TO_ENCRYPTED_TYPE_MAP[ + block_type + ] + new_block = encrypted_type(raw(block)) + self.canonical_blocks[idx] = new_block + + new_block.encrypted_by.append(bcb_block.block_number) + return s diff --git a/scapy/contrib/dtn/cbor.py b/scapy/contrib/dtn/cbor.py new file mode 100644 index 00000000000..7167c1a2afd --- /dev/null +++ b/scapy/contrib/dtn/cbor.py @@ -0,0 +1,293 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information + +# scapy.contrib.description = Concise Binary Object Representation (CBOR) +# scapy.contrib.status = library + +""" +Concise Binary Object Representation (CBOR) utility +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:authors: Timothy Recker, timothy.recker@nasa.gov + Tad Kollar, tad.kollar@nasa.gov +""" + +from scapy.fields import Field, BitField, BitEnumField, PacketField +from scapy.packet import Packet +from scapy.all import raw +import flynn +from typing import Tuple, List, Union + +MajorTypes = { + 0: "unsigned", + 1: "negative", + 2: "byte string", + 3: "text string", + 4: "array", + 5: "map", + 6: "tag", + 7: "simple/float", +} + + +class MajorTypeException(Exception): + """This exception indicates that a CBOR object has an unexpected + value for its Major Type. + + Attributes: + actual -- the integer value of the actual Major Type + expected -- an integer or list indicating the acceptable Major Type values + """ + + def __init__(self, actual: int, expected: Union[int, List[int]]): + + message = f"[Error] Major type {actual} does not refer to a(n)" + if isinstance(expected, int): + typ = MajorTypes[expected] + message += f" {typ}." + else: + typ = MajorTypes[expected[0]] + message += f" {typ}" + for val in expected[1:]: + typ = MajorTypes[val] + message += f" or {typ}" + message += "." + super().__init__(message) + + +class StopCodeException(Exception): + def __init__(self, value): + super().__init__(f"[Error] Major type {value} does not refer to a stop code.") + + +class AdditionalInfoException(Exception): + """This exception indicates that a CBOR object has an unexpected value for + its Additional Info.""" + + def __init__(self): + super().__init__("[Error] Invalid additional info.") + + +class UnhandledTypeException(Exception): + def __init__(self, value, cls): + super().__init__(f"[Error] Major type {value} is not handled by {cls}.") + + +# CBOR definitions +class CBORNull(Field): + """This class exists so that it can be used in a MultipleTypeField containing CBOR + values. Every option given to a MultipleTypeField must be a field with at least a + name. Thus, if one of the MultipleType options should be that no field is present, + you need a field that produces no bytes when added to the packet. + CBORNull can serve this purpose.""" + + +class CBORBase(Field): + @staticmethod + def static_get_head_info(b): + if len(b) == 0: + return None, None + + head = b[0] + major_type = head >> 5 + add_info = head & 0b00011111 + + return major_type, add_info + + def get_head_info(self, b): + return CBORBase.static_get_head_info(b) + + def addfield(self, pkt, s, val): + return s + flynn.dumps(val) + + +class CBORInteger(CBORBase): + @staticmethod + def get_value(add_info, b): + if add_info < 24: + val_length = 1 # 1 byte head, argument=add_info + val = add_info + elif add_info == 24: + val_length = 2 # 1 byte head + 1 byte argument + val = b[1] + elif add_info == 25: + val_length = 3 # 1 byte head + 2 byte argument + val = int.from_bytes(b[1:3], byteorder="big") + elif add_info == 26: + val_length = 5 # 4 byte argument + val = int.from_bytes(b[1:5], byteorder="big") + elif add_info == 27: + val_length = 9 # 8 byte argument + val = int.from_bytes(b[1:9], byteorder="big") + else: + raise AdditionalInfoException() + + return b[val_length:], val + + def getfield(self, pkt, s): + major_type, add_info = self.get_head_info(s) + + if major_type != 0: + raise MajorTypeException(major_type, 0) + + return CBORInteger.get_value(add_info, s) + + +class CBORStringBase(CBORBase): + @staticmethod + def get_value(add_info, b): + if add_info < 24: + arg_size = 0 + data_size = add_info # argument = data size = additional info + else: + if add_info == 24: + arg_size = 1 + elif add_info == 25: + arg_size = 2 + elif add_info == 26: + arg_size = 4 + elif add_info == 27: + arg_size = 8 + else: + raise AdditionalInfoException() + + # size of argument is known now, so + # get value of the argument, which contains the size of the data + data_size = int.from_bytes(b[1 : 1 + arg_size], byteorder="big") + val_length = 1 + arg_size + data_size + val = b[1 + arg_size : val_length] + + return b[val_length:], val + + +class CBORByteString(CBORStringBase): + def getfield(self, pkt, s): + major_type, add_info = self.get_head_info(s) + + if major_type != 2: + raise MajorTypeException(major_type, 2) + + return CBORStringBase.get_value(add_info, s) + + +class CBORTextString(CBORStringBase): + def getfield(self, pkt, s): + major_type, add_info = self.get_head_info(s) + + if major_type != 3: + raise MajorTypeException(major_type, 3) + + return CBORStringBase.get_value(add_info, s) + + +class CBORIntOrText(CBORBase): + def getfield(self, pkt, s): + major_type, add_info = self.get_head_info(s) + + if major_type == 0: + return CBORInteger.get_value(add_info, s) + if major_type == 3: + return CBORStringBase.get_value(add_info, s) + + raise MajorTypeException(major_type, [0, 3]) + + +class CBORStopCode(CBORBase): + def addfield(self, pkt, s, val): + return s + b"\xff" + + @staticmethod + def get_value(add_info, b): + return b[1:], add_info + + def getfield(self, pkt, s): + major_type, add_info = self.get_head_info(s) + if major_type != 7: + raise StopCodeException(major_type) + + return CBORStopCode.get_value(add_info, s) + + +class CBORAny(CBORBase): + def getfield(self, pkt, s): + major_type, add_info = self.get_head_info(s) + + if major_type == 0: + return CBORInteger.get_value(add_info, s) + if major_type in [2, 3]: + return CBORStringBase.get_value(add_info, s) + if major_type == 7: + return CBORStopCode.get_value(add_info, s) + + raise UnhandledTypeException(major_type, CBORAny) + + +class CBORArray(Packet): + _major_type = BitEnumField("major_type", 4, 3, MajorTypes) + _add = BitField("add", 0, 5) # additional information = length + + # head fields + fields_desc = [_major_type, _add] + + def count_additional_fields(self) -> int: + """ + Return the number of fields other than the two head fields. This method does + not work correctly with ConditionalFields and should be overridden when that + field type is in use. + """ + head_field_count = 2 + return len(self.default_fields) - head_field_count + + def set_additional_fields(self, pkt: Packet) -> bytes: + """For an array, the add field is set to the number of elements minus + the two head fields.""" + # pylint: disable=W0201 + # field (instance variable) initialization is handled via "fields_desc" + self.add = self.count_additional_fields() + head = (self.major_type << 5) | self.add + + return head.to_bytes(1, "big") + pkt[1:] + + def post_build(self, pkt: bytes, pay: bytes) -> bytes: + return self.set_additional_fields(pkt) + pay + + def extract_padding(self, s): + return "", s + + +class CBORPacketField(PacketField): + def i2m(self, pkt: Packet, i) -> bytes: + if i is None: + return b"" + + return flynn.dumps(raw(i)) + + def m2i(self, pkt: Packet, m): + _, add_info = CBORBase.static_get_head_info(m) + remain, decoded_m = CBORStringBase.get_value(add_info, m) + try: + # we want to set parent wherever possible + return self.cls(decoded_m + remain, _parent=pkt) # type: ignore + except TypeError: + return self.cls(decoded_m + remain) + + +class CBORPacketFieldWithRemain(CBORPacketField): + """ + The regular Packet.getfield() never returns the remaining bytes, so the CRC or + other following fields get lost. This getfield does return the remaining bytes. + """ + + def m2i(self, pkt: Packet, m): + _, add_info = CBORBase.static_get_head_info(m) + remain, decoded_m = CBORStringBase.get_value(add_info, m) + try: + # we want to set parent wherever possible + return remain, self.cls(decoded_m + remain, _parent=pkt) # type: ignore + except TypeError: + return remain, self.cls(decoded_m + remain) + + def getfield(self, pkt: Packet, s: bytes) -> Tuple[bytes, Packet]: + remain, i = self.m2i(pkt, s) + return remain, i diff --git a/scapy/contrib/dtn/common.py b/scapy/contrib/dtn/common.py new file mode 100644 index 00000000000..84a94517f8c --- /dev/null +++ b/scapy/contrib/dtn/common.py @@ -0,0 +1,49 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information + +# scapy.contrib.description = utility functions and classes for DTN module +# scapy.contrib.status = library + +from scapy.packet import Packet, Raw +from scapy.fields import Field +from typing import Dict, List + + +class NoPayloadPacket(Packet): + """A packet with no payload layer to bind.""" + + def extract_padding(self, s): + return "", s + + def post_dissect(self, s): + try: + if self[Raw].load is not None: + raise ValueError(f"found payload in {Packet} when none was expected") + except IndexError: # No Raw layer found, i.e. no unparsed payload is present + pass + return s + + +class ControlPacket(NoPayloadPacket): + """A packet containing control data, rather than user data.""" + + +class FieldPacket(NoPayloadPacket): + """A packet intended for use as a field (i.e. PacketField or PacketListField) + in another Packet, rather than one sent or received on the wire. Useful when you + need heterogeneous, compound data similar to a record/struct within + another Packet.""" + + +FieldsTemplate = Dict[str, Field] + + +def template_replace( + template: FieldsTemplate, new_values: FieldsTemplate +) -> FieldsTemplate: + return {**template, **new_values} + + +def make_fields_desc(template: FieldsTemplate) -> List[Field]: + return list(template.values()) diff --git a/scapy/contrib/dtn/tcpcl.py b/scapy/contrib/dtn/tcpcl.py new file mode 100644 index 00000000000..4ed0539605f --- /dev/null +++ b/scapy/contrib/dtn/tcpcl.py @@ -0,0 +1,301 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information + +# scapy.contrib.description = TCP Convergence Layer version 4 (TCPCLv4) +# scapy.contrib.status = loads + +""" +TCP Convergence Layer version 4 (TCPCLv4) layer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:author: Timothy Recker, timothy.recker@nasa.gov +""" + +from scapy.packet import Packet, Raw, bind_layers +from scapy.fields import ( + XByteEnumField, + PacketField, + BitField, + BitFieldLenField, + ConditionalField, + PacketListField, + StrLenField, + FieldLenField, +) + +import struct +from enum import IntEnum + +from scapy.contrib.dtn.common import ControlPacket, FieldPacket +import scapy.contrib.dtn.bpv7 as BPv7 + + +class MagicValueError(Exception): + """ + Exception raised when a ContactHeader is dissected and the magic value is incorrect. + """ + + def __init__(self, value): + super().__init__( + f"Tried to decode ContactHeader with invalid magic value of {value}" + ) + + +class ContactHeader(ControlPacket): + MAGIC_VALUE = 0x64746E21 + + class Flag(IntEnum): + CAN_TLS = 0x01 + + fields_desc = [ + BitField("magic", MAGIC_VALUE, 32), + BitField("version", 4, 8), + XByteEnumField("flags", 0, {Flag.CAN_TLS: "can tls"}), + ] + + def post_dissect(self, s): + if self.magic != self.MAGIC_VALUE: + raise MagicValueError(self.magic) + return super().post_dissect(s) + + +class MsgHeader(Packet): + class MsgType(IntEnum): + SESS_INIT = 0x07 + SESS_TERM = 0x05 + XFER_SEGMENT = 0x01 + XFER_ACK = 0x02 + XFER_REFUSE = 0x03 + KEEPALIVE = 0x04 + MSG_REJECT = 0x06 + + fields_desc = [ + XByteEnumField( + "type", + MsgType.XFER_SEGMENT, + { + MsgType.SESS_INIT: "sess_init", + MsgType.SESS_TERM: "sess_term", + MsgType.XFER_SEGMENT: "xfer_segment", + MsgType.XFER_ACK: "xfer_ack", + MsgType.XFER_REFUSE: "xfer_refuse", + MsgType.KEEPALIVE: "keepalive", + MsgType.MSG_REJECT: "msg_reject", + }, + ) + ] + + +class Ext(FieldPacket): + """ + Definition for an Extension Item in the format of a Type-Length-Value container. + """ + + class Flag(IntEnum): + CRITICAL = 0x01 + + class Type(IntEnum): + LENGTH = 0x0001 + + fields_desc = [ + XByteEnumField("flags", 0, {Flag.CRITICAL: "critical"}), + BitField("type", 0, 16), + BitFieldLenField("length", default=0, size=16, length_of="data"), + StrLenField("data", 0, length_from=lambda pkt: pkt.length), + ] + + +class SessInit(ControlPacket): + fields_desc = [ + BitField("keepalive", 0, 16), + BitField("segment_mru", 0, 64), + BitField("transfer_mru", 0, 64), + FieldLenField( + "id_length", None, length_of="id", fmt="H" + ), # Node ID Length (U16) + StrLenField( + "id", b"", length_from=lambda pkt: pkt.id_length + ), # Node ID Data (variable) + BitFieldLenField("ext_length", 0, 32, length_of="ext_items"), + ConditionalField( + PacketListField( + "ext_items", [], Ext, length_from=lambda pkt: pkt.ext_length + ), + lambda pkt: pkt.ext_length > 0, + ), + ] + + +class Keepalive(ControlPacket): + """ + A keepalive message consists only of a MsgHeader with the type code KEEPALIVE. + """ + + +class MsgReject(ControlPacket): + class ReasonCode(IntEnum): + UNKNOWN = 0x01 + UNSUPPORTED = 0x02 + UNEXPECTED = 0x03 + + fields_desc = [ + XByteEnumField( + "reason", + ReasonCode.UNSUPPORTED, + { + ReasonCode.UNKNOWN: "message type unknown", + ReasonCode.UNSUPPORTED: "message unsupported", + ReasonCode.UNEXPECTED: "message unexpected", + }, + ), + PacketField("header", MsgHeader(), MsgHeader), + ] + + +class Xfer(Packet): + """ + Abstract class containing fields and flags common to Xfer messages + """ + + class Flag(IntEnum): + END = 0x01 + START = 0x02 + + fields_desc = [ + XByteEnumField( + "flags", + 0, + {Flag.END: "END", Flag.START: "START", Flag.START | Flag.END: "START|END"}, + ), + BitField("id", 0, 64), + ] + + +class InvalidPayloadError(Exception): + """ + This error indicates that an XferSegment contains raw bytes instead of + a properly formatted Bundle as its payload. + """ + + def __init__(self, payload_bytes): + super().__init__( + f"Failed to fully parse Bundle from Xfer payload: bundle={payload_bytes}" + ) + + +class XferSegment(Xfer): + """ + Packet for transferring a data segment + """ + + fields_desc = Xfer.fields_desc + [ + ConditionalField( + BitFieldLenField("ext_length", default=0, size=32, length_of="ext_items"), + lambda pkt: pkt.flags & Xfer.Flag.START, + ), + ConditionalField( + PacketListField( + "ext_items", + [Ext(type=Ext.Type.LENGTH)], + Ext, + length_from=lambda pkt: pkt.ext_length, + ), + lambda pkt: (pkt.flags & Xfer.Flag.START) and (pkt.ext_length > 0), + ), + BitField("length", default=0, size=64), + ] + + def post_build(self, pkt, pay): + # calculate the length field + if not self.length: + index = len(pkt) - 8 # size of length is 8 bytes, thus position=len(pkt)-8 + length = len(pay) + pkt = pkt[:index] + struct.pack("!Q", length) + return pkt + pay + + def post_dissect(self, s): + """An XferSegment message should have a Bundle as payload. + If it has raw bytes instead, raise an error.""" + try: + if self[Raw].load is not None: + raise InvalidPayloadError(self[Raw].load) + except IndexError: # Raw layer or load field does not exist + pass # no action required + + return s + + +class XferAck(ControlPacket): + fields_desc = Xfer.fields_desc + [BitField("length", default=0, size=64)] + + +class XferRefuse(ControlPacket): + class ReasonCode(IntEnum): + UNKNOWN = 0x00 + COMPLETED = 0x01 + NO_RESOURCES = 0x02 + RETRANSMIT = 0x03 + NOT_ACCEPTABLE = 0x04 + EXT_FAIL = 0x05 + SESS_TERM = 0x06 + + fields_desc = [ + XByteEnumField( + "reason", + ReasonCode.UNKNOWN, + { + ReasonCode.UNKNOWN: "unknown", + ReasonCode.COMPLETED: "complete bundle received", + ReasonCode.NO_RESOURCES: "resources exhausted", + ReasonCode.RETRANSMIT: "retransmit bundle", + ReasonCode.NOT_ACCEPTABLE: "bundle not acceptable", + ReasonCode.EXT_FAIL: "failed to process extensions", + ReasonCode.SESS_TERM: "session is terminating", + }, + ), + BitField("id", 0, 64), + ] + + +class SessTerm(ControlPacket): + class Flag(IntEnum): + REPLY = 0x01 + + class ReasonCode(IntEnum): + UNKNOWN = 0x00 + TIMEOUT = 0x01 + MISMATCH = 0x02 + BUSY = 0x03 + CONTACT_FAIL = 0x04 + NO_RESOURCES = 0x05 + + fields_desc = [ + XByteEnumField("flags", 0, {Flag.REPLY: "reply"}), + XByteEnumField( + "reason", + ReasonCode.UNKNOWN, + { + ReasonCode.UNKNOWN: "unknown", + ReasonCode.TIMEOUT: "idle timeout", + ReasonCode.MISMATCH: "version mismatch", + ReasonCode.BUSY: "entity busy", + ReasonCode.CONTACT_FAIL: "failed to process contact header or sess init", # noqa: E501 + ReasonCode.NO_RESOURCES: "entity resource exhaustion", + }, + ), + ] + + +# Bind all TCPCL message headers to TCPCL messages. +# This way, if `some_bytes` consists of a raw TCPCL message, you can evaluate +# e.g. `x=MsgHeader(some_bytes)` and `x` will be a Packet consisting of a TCPCL +# MsgHeader with the correct type code plus a payload of the correct TCPCL message type. +bind_layers(MsgHeader, SessInit, type=MsgHeader.MsgType.SESS_INIT) +bind_layers(MsgHeader, Keepalive, type=MsgHeader.MsgType.KEEPALIVE) +bind_layers(MsgHeader, MsgReject, type=MsgHeader.MsgType.MSG_REJECT) +bind_layers(MsgHeader, XferSegment, type=MsgHeader.MsgType.XFER_SEGMENT) +bind_layers(MsgHeader, XferAck, type=MsgHeader.MsgType.XFER_ACK) +bind_layers(MsgHeader, XferRefuse, type=MsgHeader.MsgType.XFER_REFUSE) +bind_layers(MsgHeader, SessTerm, type=MsgHeader.MsgType.SESS_TERM) +bind_layers(XferSegment, BPv7.Bundle) diff --git a/scapy/contrib/dtn/tcpcl_session.py b/scapy/contrib/dtn/tcpcl_session.py new file mode 100644 index 00000000000..6fc194ef01e --- /dev/null +++ b/scapy/contrib/dtn/tcpcl_session.py @@ -0,0 +1,133 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information + +# scapy.contrib.description = TCP Convergence Layer version 4 (TCPCLv4) +# scapy.contrib.status = loads + +# These classes support unit testing of the TCPCL scapy layer +# (scapy.contrib.dtn.tcpcl) and illustrate how the protocol messages may +# be used to emulate a TCPCL session. + +from scapy.all import Raw, raw, TCP, Packet, bind_layers, split_layers +import scapy.contrib.dtn.tcpcl as TCPCL +from typing import List + + +class Session: + """ + TCPCL messages are conventionally, but not necessarily, sent on port 4556. + Since this cannot be relied upon, especially on a localhost session, the best + way to bind TCP packets to TCPCL message is to track the state of a TCPCL session. + Once Contact Headers are successfully exchanged, TCP packets can be assumed to + carry payloads of TCPCL messages until the session ends. + """ + + def __init__(self): + self.contact_init = False + self.contact_ack = False + self.is_active = False + self.term_begun = False + self.sport = 0 + self.dport = 0 + + @staticmethod + def bind_messages(sport, dport): + bind_layers(TCP, TCPCL.MsgHeader, sport=sport, dport=dport) + bind_layers(TCP, TCPCL.MsgHeader, sport=dport, dport=sport) + + @staticmethod + def split_messages(sport, dport): + split_layers(TCP, TCPCL.MsgHeader, sport=sport, dport=dport) + split_layers(TCP, TCPCL.MsgHeader, sport=dport, dport=sport) + + def activate(self): + if not (self.contact_init and self.contact_ack): + raise Exception( + "tried to activate a session before initialization and acknowledgement" + ) + + self.is_active = self.contact_init and self.contact_ack + + Session.bind_messages(self.sport, self.dport) + + def terminate(self): + if not (self.contact_init and self.contact_ack): + raise Exception("tried to terminate a session while none was active") + + self.is_active = self.contact_ack = self.contact_init = False + + Session.split_messages(self.sport, self.dport) + + def init_contact(self, sport, dport): + self.contact_init = True + self.sport = sport + self.dport = dport + + def init_timeout(self): + self.contact_init = False + + def proc_ack(self): + self.contact_ack = True + self.activate() + + def proc_term(self): + self.term_begun = True + + def proc_term_ack(self): + self.terminate() + + +class TestTcpcl: + + @staticmethod + def check_pkt(pkt: Packet, options: List[Packet]): + """Asserts that pkt is equal to one of the packets in options + (according to the raw representation)""" + for opt in options: + assert raw(pkt) in list( + map(raw, options) + ), "Failed to build a properly formatted TCPCL message" + + @staticmethod + def make_prn(): + """Define a function for processing packets that closes over a new Session. + Return it for use in Scapy.sniff.""" + + sess = Session() + + def process(pkt): + # Manage session initialization + if not sess.is_active: + try: # try to find a Contact Header + pay = pkt[Raw].load + contact = TCPCL.ContactHeader( + pay + ) # should raise unhandled error if + # the TCP payload does not fit ContactHeader + # replace pkt's raw payload with a ContactHeader formatted payload + pkt[TCP].remove_payload() + pkt = pkt / contact + + # process ContactHeader + if ( + sess.contact_init + ): # session already initialized, Header is an ack + sess.proc_ack() + print("BEGIN TCPCL SESSION") + else: + sess.init_contact(pkt[TCP].sport, pkt[TCP].dport) + except IndexError: # no TCP payload to process + pass + else: # currently in an active session + if TCPCL.SessTerm in pkt: + # process SessTerm + if sess.term_begun: + sess.proc_term_ack() + print("END TCPCL SESSION") + else: + sess.proc_term() + + return pkt # end of process + + return process # end of make_prn diff --git a/test/contrib/bpv7.uts b/test/contrib/bpv7.uts new file mode 100644 index 00000000000..4e517c992a1 --- /dev/null +++ b/test/contrib/bpv7.uts @@ -0,0 +1,223 @@ +% Bundle Protocol version 7 tests + ++ Simple BPv7 Tests + += Test decode +~ dtn bpv7 +* dissecting valid bundle from string and checking all fields for accuracy + +import scapy.contrib.dtn.bpv7 as BPv7 + +bs = '9f8907040282028202018202820101820100821b000000b4e6fc6dae001a000f4240440512dd21860a021002448218640044db675d49860101000258640000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044dd3243fcff' + +bundle = BPv7.Bundle(bytearray.fromhex(bs)) +bundle + +assert bundle.primary_block.version == 7, "Wrong Bundle Protocol version" +assert bundle.primary_block.flags == BPv7.PrimaryBlock.CtrlFlags.MUST_NOT_BE_FRAGMENTED, "Wrong flags in primary block" +assert bundle.primary_block.crc_type == 2, "Wrong crc type in primary block" +assert bundle.primary_block.dest.scheme_code == 2, "Wrong destination EID type" +assert bundle.primary_block.dest.ssp.node_id == 2, "Wrong destination id" +assert bundle.primary_block.dest.ssp.service_number == 1, "Wrong destination service number" +assert bundle.primary_block.src.scheme_code == 2, "Wrong source EID type" +assert bundle.primary_block.src.ssp.node_id == 1, "Wrong source id" +assert bundle.primary_block.src.ssp.service_number == 1, "Wrong source service number" +assert bundle.primary_block.report.scheme_code == 1, "Wrong report-to EID type" +assert bundle.primary_block.report.ssp.uri == 0, "Wrong report-to id" +assert bundle.primary_block.creation_timestamp.t == 776969416110, "Wrong timestamp" +assert bundle.primary_block.creation_timestamp.seq == 0, "Wrong seq" +assert bundle.primary_block.lifetime == 1000000, "Wrong lifetime" +assert bundle.primary_block.crc == b'\x05\x12\xdd\x21', "Wrong crc in primary block" + +assert len(bundle.canonical_blocks) == 2, "Wrong number of canonical blocks" +block1 = bundle.canonical_blocks[0] +assert block1.type_code == 10, "Expected type hop_count in first block" +assert block1.block_number == 2, "Wrong block number in first canonical block" +assert block1.flags == BPv7.CanonicalBlock.CtrlFlags.DISCARD_IF_NOT_PROCESSED, "Wrong flags in first canonical block" +assert block1.crc_type == 2, "Wrong crc type in first canonical block" +assert block1.data == BPv7.HopCount(limit=100, count=0), "Expected cbor [0x18, 0x00] as hop count data" +assert block1.crc == b'\xdb\x67\x5d\x49', "Wrong crc in first canonical block" + +block2 = bundle.canonical_blocks[1] +assert block2.type_code == 1, "Expected type payload in second block" +assert block2.block_number == 1, "Wrong block number in second canonical block" +assert block2.flags == 0, "Wrong flags in second canonical block" +assert block2.crc_type == 2, "Wrong crc type in second canonical block" +assert block2.data == b'\x00' * 100, "Expected 100 bytes of zero as bundle payload" +assert block2.crc == b'\xdd\x32\x43\xfc', "Wrong crc in second canonical block" + += Test decode invalid bundle +~ dtn bpv7 +* attempting to decode a bundle with two primary block elements (should fail to dissect) + +bs = '9f8907040282028202018202820403820105821b000000b700dfc451001a000f424044b3cf0f1d8907040282028202018202820403820105821b000000b700dfc451001a000f424044b3cf0f1dff' + +try: + BPv7.Bundle(bytearray.fromhex(bs)) + assert False +except: + assert True + += Test encode +~ dtn bpv7 +* building a bundle + +block1 = BPv7.HopCountBlock( + block_number=2, + flags=BPv7.CanonicalBlock.CtrlFlags.DISCARD_IF_NOT_PROCESSED, + crc_type=2, + data=BPv7.HopCount(limit=100, count=0) + # crc=b'\xdb\x67\x5d\x49' +) +block2 = BPv7.PayloadBlock( + block_number=1, + flags=0, + crc_type=2, + data=b'\x00'*100 + # crc=b'\xdd\x32\x43\xfc' +) +canonical_blocks=[block1, block2] +destination=BPv7.IPN(node_id=2, service_number=1) +source=BPv7.IPN(node_id=1, service_number=1) +report_to=BPv7.DTN(uri=0) +creation_time=BPv7.Timestamp(t=776969416110, seq=0) +primary_block = BPv7.PrimaryBlock( + version=7, + flags=BPv7.PrimaryBlock.CtrlFlags.MUST_NOT_BE_FRAGMENTED, + crc_type=2, + dest=BPv7.EndpointID(scheme_code=2, ssp=destination), + src=BPv7.EndpointID(scheme_code=2, ssp=source), + report=BPv7.EndpointID(scheme_code=1, ssp=report_to), + creation_timestamp=creation_time, + lifetime=1000000, + # crc=b'\x05\x12\xdd\x21', +) +bundle = BPv7.Bundle( + primary_block = primary_block, + canonical_blocks=canonical_blocks +) + +bundle + +bs = '9f8907040282028202018202820101820100821b000000b4e6fc6dae001a000f4240440512dd21860a021002448218640044db675d49860101000258640000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044dd3243fcff' + +captured_bytes = bytearray.fromhex(bs) +bundle_bytes = bytearray(raw(bundle)) +captured_bytes == bundle_bytes + += Test blocks +~ dtn bpv7 +* testing a bundle with default block instances can be built without error + +bundle=BPv7.Bundle(canonical_blocks=[ + BPv7.PreviousNodeBlock(), + BPv7.HopCountBlock(), + BPv7.BundleAgeBlock(), + BPv7.BlockIntegrityBlock(), + BPv7.BlockConfidentialityBlock(), + BPv7.EncryptedHopCountBlock(), + BPv7.EncryptedPreviousNodeBlock(), + BPv7.EncryptedBundleAgeBlock(), + BPv7.EncryptedBlockIntegrityBlock(), + BPv7.PayloadBlock(), +]) +b = bundle.build() +b +b is not None + ++ BPv7 with BPSec tests + += Test bpsec bundle integrity block +~ dtn bpv7 bpsec +* testing that a non-default BIB can be created + +b = BPv7.BlockIntegrityBlock( + block_number=3, + flags=BPv7.CanonicalBlock.CtrlFlags.BLOCK_MUST_BE_REPLICATED, + crc_type=BPv7.CrcTypes.NONE, + data=BPv7.AbstractSecurityBlock( + security_targets=BPv7.SecurityTargets(targets=[1,2,3]), + security_context_id=2, + security_context_flags=1, + security_source=BPv7.EndpointID(scheme_code=2, ssp=BPv7.IPN(node_id=1, service_number=1)), + security_context_parameters=BPv7.CBORTupleArray(tuples=[ + BPv7.CBORTuple(id=1,value=bytes.fromhex('136B229B84CA0200243B0000')), + BPv7.CBORTuple(id=2,value=3), + BPv7.CBORTuple(id=4,value=7) + ]), + security_results=BPv7.SecurityResults(results=BPv7.CBORTupleArray(tuples=[ + BPv7.CBORTuple(id=1,value=bytes.fromhex('CA492BCE6F1B4C7AF3995A985432409F')) + ])) + ) +).build() +b +b is not None + += Test bpsec bundle confidentiality blocks +~ dtn bpv7 bpsec +* testing confidentiality block + +f = open(scapy_path("test/pcaps/bpv7_bundle_with_con.pcap"), "rb") +content = f.read() + +raw_bundle = content[22:] + +my_bundle = BPv7.Bundle( + primary_block=BPv7.PrimaryBlock( + version=7, + flags=BPv7.PrimaryBlock.CtrlFlags.MUST_NOT_BE_FRAGMENTED, + crc_type=BPv7.CrcTypes.CRC32C, + dest=BPv7.EndpointID(scheme_code=2, ssp=BPv7.IPN(node_id=2, service_number=1)), + src=BPv7.EndpointID(scheme_code=2, ssp=BPv7.IPN(node_id=1, service_number=1)), + report=BPv7.EndpointID(scheme_code=1, ssp=BPv7.DTN(uri=0)), + creation_timestamp=BPv7.Timestamp(t=785620852727, seq=0), + lifetime=1000000, + # crc=b'\xEFA\xC0e' + ), + canonical_blocks=[ + BPv7.PreviousNodeBlock( + block_number=4, + flags=16, + crc_type=BPv7.CrcTypes.CRC32C, + data=BPv7.EndpointID(scheme_code=2, ssp=BPv7.IPN(node_id=10, service_number=0)), + # crc=b'\xED6\x9A\xB2' + ), + BPv7.BlockConfidentialityBlock( + block_number=3, + flags=BPv7.CanonicalBlock.CtrlFlags.BLOCK_MUST_BE_REPLICATED, + crc_type=BPv7.CrcTypes.NONE, + data=BPv7.AbstractSecurityBlock( + security_targets=BPv7.SecurityTargets(targets=[1]), + security_context_id=2, + security_context_flags=1, + security_source=BPv7.EndpointID(scheme_code=2, ssp=BPv7.IPN(node_id=1, service_number=1)), + security_context_parameters=BPv7.CBORTupleArray(tuples=[ + BPv7.CBORTuple(id=1,value=bytes.fromhex('136B229B84CA0200243B0000')), + BPv7.CBORTuple(id=2,value=3), + BPv7.CBORTuple(id=4,value=7) + ]), + security_results=BPv7.SecurityResults(results=BPv7.CBORTupleArray(tuples=[ + BPv7.CBORTuple(id=1,value=bytes.fromhex('CA492BCE6F1B4C7AF3995A985432409F')) + ])) + ) + ), + BPv7.HopCountBlock( + block_number=2, + flags=BPv7.CanonicalBlock.CtrlFlags.DISCARD_IF_NOT_PROCESSED, + crc_type=BPv7.CrcTypes.CRC32C, + data=BPv7.HopCount(limit=100, count=1), + # crc=b'\x34\x57\x36\x50' + ), + BPv7.PayloadBlock( + block_number=1, + flags=0, + crc_type=BPv7.CrcTypes.CRC32C, + data=bytes.fromhex("7E4B954DCCEA632B68C0732AE92B067895CAA6676D9556D0F1B28BBDA03DB2B9FB3F4C85EECBB3C00B8104968511F80EEC12FB993ADA63D79AFE368D0780A53713AC50E889303D7B8739CA306DD62AD8533DDCDCFD73BBE1D49CDA182CAB3CB17058DDD0"), + # crc=b'\x5\x84\xB5\xF4B' + ) + + ] +) + +my_bundle +raw(my_bundle) == raw_bundle \ No newline at end of file diff --git a/test/contrib/tcpcl.uts b/test/contrib/tcpcl.uts new file mode 100644 index 00000000000..8dcd291d517 --- /dev/null +++ b/test/contrib/tcpcl.uts @@ -0,0 +1,63 @@ +% TCP Convergence Layer tests + ++ Test full TCPCL session + += Test dissect and build +~ dtn tcpcl +* testing packet dissection and build for full TCPCL session from pcap + +import scapy.contrib.dtn.tcpcl as TCPCL +from scapy.contrib.dtn.tcpcl_session import TestTcpcl + +# Test dissection from pcap +pkts=sniff(offline=scapy_path("test/pcaps/tcpcl.pcap"), + prn=TestTcpcl.make_prn()) +assert len(pkts) == 26, "Failed to dissect some packets" + +# Define expected messages +init1 = TCPCL.SessInit( + keepalive=17, + segment_mru=200000, + transfer_mru=10000000, + id=b"ipn:1.0" +) +init2 = TCPCL.SessInit( + keepalive=15, + segment_mru=200000, + transfer_mru=10000000, + id=b"ipn:10.0" +) +bundle0 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fc6dae001a000f4240440512dd21860a021002448218640044db675d49860101000258640000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044dd3243fcff') +bundle1 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fc6db8001a000f424044bb1ffd92860a021002448218640044db675d4986010100025864010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004419e13bfbff') +bundle2 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fc6dc2001a000f42404418438afa860a021002448218640044db675d498601010002586402000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000445178c503ff') +bundle3 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fcfb66001a000f4240449fdacc81860a021002448218640044db675d49860101000258642c0e000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044b1439f4cff') +flags = TCPCL.Xfer.Flag.START | TCPCL.Xfer.Flag.END +xfer0 = TCPCL.XferSegment(flags=flags, id=0) / bundle0 +xfer1 = TCPCL.XferSegment(flags=flags, id=1) / bundle1 +xfer2 = TCPCL.XferSegment(flags=flags, id=2) / bundle2 +xfer3 = TCPCL.XferSegment(flags=flags, id=3628) / bundle3 +ack0 = TCPCL.XferAck(flags=flags, id=0, length=167) +ack1 = TCPCL.XferAck(flags=flags, id=1, length=167) +ack2 = TCPCL.XferAck(flags=flags, id=2, length=167) +ack3 = TCPCL.XferAck(flags=flags, id=3628, length=167) +term0 = TCPCL.SessTerm() +term1 = TCPCL.SessTerm(flags=TCPCL.SessTerm.Flag.REPLY) + +# Test that built TCPCL messages have the expected value +# (including auto-computed fields, such as length fields). +# They should be identical to the packets from the pcap. +for pkt in pkts: + try: + msg = pkt[TCPCL.MsgHeader].payload + mtype = type(msg) + if mtype == TCPCL.SessInit: + TestTcpcl.check_pkt(msg, [init1, init2]) + elif mtype == TCPCL.XferSegment: + TestTcpcl.check_pkt(msg, [xfer0, xfer1, xfer2, xfer3]) + elif mtype == TCPCL.XferAck: + TestTcpcl.check_pkt(msg, [ack0, ack1, ack2, ack3]) + elif mtype == TCPCL.SessTerm: + TestTcpcl.check_pkt(msg, [term0, term1]) + except IndexError: # pkt contains no TCPCL msg + continue + diff --git a/test/pcaps/bpv7_bundle_with_con.pcap b/test/pcaps/bpv7_bundle_with_con.pcap new file mode 100644 index 00000000000..f29eba08415 Binary files /dev/null and b/test/pcaps/bpv7_bundle_with_con.pcap differ diff --git a/test/pcaps/tcpcl.pcap b/test/pcaps/tcpcl.pcap new file mode 100644 index 00000000000..bac0bec11cd Binary files /dev/null and b/test/pcaps/tcpcl.pcap differ diff --git a/tox.ini b/tox.ini index d47bddf8b9d..57de8fb57e0 100644 --- a/tox.ini +++ b/tox.ini @@ -33,6 +33,8 @@ deps = cryptography coverage[toml] python-can + flynn + crcmod # disabled on windows because they require c++ dependencies # brotli 1.1.0 broken https://github.com/google/brotli/issues/1072 brotli < 1.1.0 ; sys_platform != 'win32'