From a8b804136e1c514d45de37e74e62ccb4d410e0c9 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Wed, 17 Dec 2025 16:28:59 +0000 Subject: [PATCH 1/5] feat(flow-filter): Add new flow-filter stage to enforce peering rules The new flow-filter crate introduced in this commit contains a new pipeline stage for validating that a packet matches an existing peering connection, as defined in the configuration provided by the user. All packets that do not have a source IP, port and destination IP, port corresponding to existing, valid connections between the prefixes in exposed lists of peerings, get dropped. This allows us to enforce that traffic matches the peering rule: - Generically, whether or not the appropriate rules are set - For stateless NAT, which would just let the packet go un-NATed if there was no NAT rule to be found, so far - For stateful NAT, allowing us to remove the dirty implementation of similar checks in the stateful NAT code (in a future commit). Unit tests also come in a follow-up commit. Signed-off-by: Quentin Monnet --- Cargo.lock | 17 ++ Cargo.toml | 2 + dataplane/Cargo.toml | 1 + dataplane/src/main.rs | 1 + dataplane/src/packet_processor/mod.rs | 7 + flow-filter/Cargo.toml | 19 ++ flow-filter/src/filter_rw.rs | 80 ++++++++ flow-filter/src/ip_port_prefix_trie.rs | 108 +++++++++++ flow-filter/src/lib.rs | 100 ++++++++++ flow-filter/src/setup.rs | 82 +++++++++ flow-filter/src/tables.rs | 242 +++++++++++++++++++++++++ lpm/src/prefix/range_map.rs | 8 + lpm/src/trie/mod.rs | 10 + mgmt/Cargo.toml | 1 + mgmt/src/processor/proc.rs | 19 ++ mgmt/src/tests/mgmt.rs | 13 +- 16 files changed, 706 insertions(+), 4 deletions(-) create mode 100644 flow-filter/Cargo.toml create mode 100644 flow-filter/src/filter_rw.rs create mode 100644 flow-filter/src/ip_port_prefix_trie.rs create mode 100644 flow-filter/src/lib.rs create mode 100644 flow-filter/src/setup.rs create mode 100644 flow-filter/src/tables.rs diff --git a/Cargo.lock b/Cargo.lock index 5010fe1de..f856f0477 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1147,6 +1147,7 @@ dependencies = [ "dataplane-concurrency", "dataplane-dpdk", "dataplane-dpdk-sysroot-helper", + "dataplane-flow-filter", "dataplane-gwname", "dataplane-id", "dataplane-mgmt", @@ -1298,6 +1299,21 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "dataplane-flow-filter" +version = "0.7.0" +dependencies = [ + "dataplane-config", + "dataplane-lpm", + "dataplane-net", + "dataplane-pipeline", + "dataplane-tracectl", + "left-right", + "linkme", + "tracing", + "tracing-test", +] + [[package]] name = "dataplane-flow-info" version = "0.7.0" @@ -1454,6 +1470,7 @@ dependencies = [ "dataplane-args", "dataplane-concurrency", "dataplane-config", + "dataplane-flow-filter", "dataplane-gwname", "dataplane-id", "dataplane-interface-manager", diff --git a/Cargo.toml b/Cargo.toml index 317eeb906..dcf894ac1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "dpdk-sys", "dpdk-sysroot-helper", "errno", + "flow-filter", "flow-info", "gwname", "hardware", @@ -55,6 +56,7 @@ dpdk-sys = { path = "./dpdk-sys", package = "dataplane-dpdk-sys", features = [] dpdk-sysroot-helper = { path = "./dpdk-sysroot-helper", package = "dataplane-dpdk-sysroot-helper", features = [] } dplane-rpc = { git = "https://github.com/githedgehog/dplane-rpc.git", rev = "e8fc33db10e1d00785f2a2b90cbadcad7900f200", features = [] } errno = { path = "./errno", package = "dataplane-errno", features = [] } +flow-filter = { path = "./flow-filter", package = "dataplane-flow-filter", features = [] } flow-info = { path = "./flow-info", package = "dataplane-flow-info", features = [] } gateway_config = { git = "https://github.com/githedgehog/gateway-proto", tag = "v0.20.0", features = [] } gwname = { path = "./gwname", package = "dataplane-gwname", features = [] } diff --git a/dataplane/Cargo.toml b/dataplane/Cargo.toml index fe8d430f2..6245182e9 100644 --- a/dataplane/Cargo.toml +++ b/dataplane/Cargo.toml @@ -15,6 +15,7 @@ concurrency = { workspace = true } ctrlc = { workspace = true, features = ["termination"] } dpdk = { workspace = true } dyn-iter = { workspace = true } +flow-filter = { workspace = true } futures = { workspace = true } gwname = { workspace = true } hyper = { workspace = true } diff --git a/dataplane/src/main.rs b/dataplane/src/main.rs index bc95d53a2..28a9b1d07 100644 --- a/dataplane/src/main.rs +++ b/dataplane/src/main.rs @@ -165,6 +165,7 @@ fn main() { nattablesw: setup.nattablesw, natallocatorw: setup.natallocatorw, vpcdtablesw: setup.vpcdtablesw, + flowfilterw: setup.flowfiltertablesw, vpc_stats_store: setup.vpc_stats_store, }, }) diff --git a/dataplane/src/packet_processor/mod.rs b/dataplane/src/packet_processor/mod.rs index 1ea6d2bb6..6e21bc6b3 100644 --- a/dataplane/src/packet_processor/mod.rs +++ b/dataplane/src/packet_processor/mod.rs @@ -12,6 +12,7 @@ use super::packet_processor::ipforward::IpForwarder; use concurrency::sync::Arc; +use flow_filter::{FlowFilter, FlowFilterTableWriter}; use pkt_meta::dst_vpcd_lookup::{DstVpcdLookup, VpcDiscTablesWriter}; use pkt_meta::flow_table::{ExpirationsNF, FlowTable, LookupNF}; @@ -39,6 +40,7 @@ where pub nattablesw: NatTablesWriter, pub natallocatorw: NatAllocatorWriter, pub vpcdtablesw: VpcDiscTablesWriter, + pub flowfiltertablesw: FlowFilterTableWriter, pub stats: StatsCollector, pub vpc_stats_store: Arc, } @@ -50,6 +52,7 @@ pub(crate) fn start_router( let nattablesw = NatTablesWriter::new(); let natallocatorw = NatAllocatorWriter::new(); let vpcdtablesw = VpcDiscTablesWriter::new(); + let flowfiltertablesw = FlowFilterTableWriter::new(); let router = Router::new(params)?; let vpcmapw = VpcMapWriter::::new(); @@ -66,6 +69,7 @@ pub(crate) fn start_router( let iftr_factory = router.get_iftabler_factory(); let fibtr_factory = router.get_fibtr_factory(); let vpcdtablesr_factory = vpcdtablesw.get_reader_factory(); + let flowfiltertablesr_factory = flowfiltertablesw.get_reader_factory(); let atabler_factory = router.get_atabler_factory(); let nattabler_factory = nattablesw.get_reader_factory(); let natallocator_factory = natallocatorw.get_reader_factory(); @@ -87,6 +91,7 @@ pub(crate) fn start_router( let stats_stage = Stats::new("stats", writer.clone()); let flow_lookup_nf = LookupNF::new("flow-lookup", flow_table.clone()); let flow_expirations_nf = ExpirationsNF::new(flow_table.clone()); + let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle()); // Build the pipeline for a router. The composition of the pipeline (in stages) is currently // hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last @@ -95,6 +100,7 @@ pub(crate) fn start_router( .add_stage(iprouter1) .add_stage(dst_vpcd_lookup) .add_stage(flow_lookup_nf) + .add_stage(flow_filter) .add_stage(stateless_nat) .add_stage(stateful_nat) .add_stage(iprouter2) @@ -111,6 +117,7 @@ pub(crate) fn start_router( nattablesw, natallocatorw, vpcdtablesw, + flowfiltertablesw, stats, vpc_stats_store, }) diff --git a/flow-filter/Cargo.toml b/flow-filter/Cargo.toml new file mode 100644 index 000000000..a976dedd4 --- /dev/null +++ b/flow-filter/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "dataplane-flow-filter" +edition.workspace = true +license.workspace = true +publish.workspace = true +version.workspace = true + +[dependencies] +config = { workspace = true } +left-right = { workspace = true } +linkme = { workspace = true } +lpm = { workspace = true } +net = { workspace = true } +pipeline = { workspace = true } +tracectl = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tracing-test = { workspace = true, features = [] } diff --git a/flow-filter/src/filter_rw.rs b/flow-filter/src/filter_rw.rs new file mode 100644 index 000000000..f6efa7a36 --- /dev/null +++ b/flow-filter/src/filter_rw.rs @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Left-right integration for [`FlowFilterTable`] + +use crate::tables::FlowFilterTable; +use left_right::{Absorb, ReadGuard, ReadHandle, ReadHandleFactory, WriteHandle, new_from_empty}; +use tracing::debug; + +#[derive(Debug)] +pub(crate) enum FlowFilterTableChange { + UpdateFlowFilterTable(FlowFilterTable), +} + +impl Absorb for FlowFilterTable { + fn absorb_first(&mut self, change: &mut FlowFilterTableChange, _: &Self) { + match change { + FlowFilterTableChange::UpdateFlowFilterTable(table) => { + *self = table.clone(); + } + } + } + fn drop_first(self: Box) {} + fn sync_with(&mut self, first: &Self) { + *self = first.clone(); + } +} + +#[derive(Debug)] +pub struct FlowFilterTableReader(ReadHandle); + +impl FlowFilterTableReader { + pub(crate) fn enter(&self) -> Option> { + self.0.enter() + } + + #[must_use] + pub fn factory(&self) -> FlowFilterTableReaderFactory { + FlowFilterTableReaderFactory(self.0.factory()) + } +} + +#[derive(Debug)] +pub struct FlowFilterTableReaderFactory(ReadHandleFactory); + +impl FlowFilterTableReaderFactory { + #[must_use] + pub fn handle(&self) -> FlowFilterTableReader { + FlowFilterTableReader(self.0.handle()) + } +} + +#[derive(Debug)] +pub struct FlowFilterTableWriter(WriteHandle); + +impl FlowFilterTableWriter { + #[must_use] + #[allow(clippy::new_without_default)] + pub fn new() -> FlowFilterTableWriter { + let (w, _r) = + new_from_empty::(FlowFilterTable::new()); + FlowFilterTableWriter(w) + } + + #[must_use] + pub fn get_reader(&self) -> FlowFilterTableReader { + FlowFilterTableReader(self.0.clone()) + } + + pub fn get_reader_factory(&self) -> FlowFilterTableReaderFactory { + self.get_reader().factory() + } + + pub fn update_flow_filter_table(&mut self, table: FlowFilterTable) { + self.0 + .append(FlowFilterTableChange::UpdateFlowFilterTable(table)); + self.0.publish(); + debug!("Updated flow filter table"); + } +} diff --git a/flow-filter/src/ip_port_prefix_trie.rs b/flow-filter/src/ip_port_prefix_trie.rs new file mode 100644 index 000000000..d7976b3a1 --- /dev/null +++ b/flow-filter/src/ip_port_prefix_trie.rs @@ -0,0 +1,108 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use crate::tables::AssociatedRanges; +use crate::tables::ConnectionTableValue; +use lpm::prefix::{PortRange, Prefix}; +use lpm::trie::IpPrefixTrie; +use std::fmt::Debug; +use std::net::IpAddr; +use std::ops::RangeBounds; + +pub(crate) trait ValueWithAssociatedRanges { + fn covers_all_ports(&self) -> bool; + fn covers_port(&self, port: u16) -> bool; +} + +impl ValueWithAssociatedRanges for AssociatedRanges { + fn covers_all_ports(&self) -> bool { + match self { + AssociatedRanges::AnyPort => true, + AssociatedRanges::Ranges(ranges) => { + ranges.iter().fold(0, |sum, range| sum + range.len()) == PortRange::MAX_LENGTH + } + } + } + + fn covers_port(&self, port: u16) -> bool { + match self { + AssociatedRanges::AnyPort => true, + AssociatedRanges::Ranges(ranges) => ranges.iter().any(|range| range.contains(&port)), + } + } +} + +impl ValueWithAssociatedRanges for ConnectionTableValue { + fn covers_all_ports(&self) -> bool { + match self { + ConnectionTableValue::AnyPort(_) => true, + ConnectionTableValue::Ranges(connection_data) => { + connection_data + .keys() + .fold(0, |sum, range| sum + range.len()) + == PortRange::MAX_LENGTH + } + } + } + + fn covers_port(&self, port: u16) -> bool { + match self { + ConnectionTableValue::AnyPort(_) => true, + ConnectionTableValue::Ranges(ranges) => { + ranges.iter().any(|(range, _)| range.contains(&port)) + } + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct IpPortPrefixTrie(IpPrefixTrie) +where + V: Debug + Clone + ValueWithAssociatedRanges; + +impl IpPortPrefixTrie +where + V: Debug + Clone + ValueWithAssociatedRanges, +{ + #[must_use] + pub(crate) fn new() -> Self { + Self(IpPrefixTrie::new()) + } + + #[must_use] + pub(crate) fn from(prefix: Prefix, value: V) -> Self { + let mut trie = Self::new(); + trie.0.insert(prefix, value); + trie + } + + pub(crate) fn insert(&mut self, prefix: Prefix, value: V) { + self.0.insert(prefix, value); + } + + pub(crate) fn get_mut(&mut self, prefix: Prefix) -> Option<&mut V> { + self.0.get_mut(prefix) + } + + pub(crate) fn lookup(&self, addr: &IpAddr, port_opt: Option) -> Option<(Prefix, &V)> { + // If the longest matching prefix has no associated port range, we assume it matches any + // port, so the lookup is successful + if let Some((prefix, value)) = self.0.lookup(*addr) + && value.covers_all_ports() + { + return Some((prefix, value)); + } + + // Else, we need to check all matching IP prefixes (not necessarily the longest), and their + // port ranges. We expect the trie to contain only one matching IP prefix matching the + // address and associated to a port range matching the port, so we return the first we find. + let port = port_opt?; + let matching_entries = self.0.matching_entries(*addr); + for (prefix, value) in matching_entries { + if value.covers_port(port) { + return Some((prefix, value)); + } + } + None + } +} diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs new file mode 100644 index 000000000..512154ebe --- /dev/null +++ b/flow-filter/src/lib.rs @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use net::buffer::PacketBufferMut; +use net::headers::{TryIp, TryTransport}; +use net::packet::{DoneReason, Packet}; +use pipeline::NetworkFunction; +use std::num::NonZero; +use tracing::{debug, error}; + +mod filter_rw; +mod ip_port_prefix_trie; +mod setup; +mod tables; + +pub use filter_rw::{FlowFilterTableReader, FlowFilterTableReaderFactory, FlowFilterTableWriter}; +pub use tables::FlowFilterTable; + +use tracectl::trace_target; +trace_target!("flow-filter", LevelFilter::INFO, &["pipeline"]); + +/// A structure to implement the flow filter pipeline stage. +pub struct FlowFilter { + name: String, + tablesr: FlowFilterTableReader, +} + +impl FlowFilter { + /// Create a new [`FlowFilter`] instance. + pub fn new(name: &str, tablesr: FlowFilterTableReader) -> Self { + Self { + name: name.to_string(), + tablesr, + } + } + + /// Process a packet. + fn process_packet( + &self, + tablesr: &left_right::ReadGuard<'_, FlowFilterTable>, + packet: &mut Packet, + ) { + let nfi = &self.name; + + let Some(net) = packet.try_ip() else { + debug!("{nfi}: Packet has no IP headers: dropping"); + packet.done(DoneReason::NotIp); + return; + }; + + let (Some(src_vpcd), Some(dst_vpcd)) = (packet.meta.src_vpcd, packet.meta.dst_vpcd) else { + debug!("{nfi}: Packet missing VPC discriminants: dropping"); + packet.done(DoneReason::Unroutable); + return; + }; + + let src_ip = net.src_addr(); + let dst_ip = net.dst_addr(); + let ports = packet.try_transport().and_then(|t| { + t.src_port() + .map(NonZero::get) + .zip(t.dst_port().map(NonZero::get)) + }); + + if !tablesr.contains(src_vpcd, &src_ip, &dst_ip, ports) { + debug!( + "{nfi}: Flow not allowed, dropping packet: src_vpcd={src_vpcd}, dst_vpcd={dst_vpcd}, src={src_ip}:{}, dst={dst_ip}:{}", + ports.map_or(String::new(), |p| format!("{}", p.0)), + ports.map_or(String::new(), |p| format!("{}", p.1)), + ); + packet.done(DoneReason::Filtered); + return; + } + + debug!( + "{nfi}: Flow allowed: src_vpcd={src_vpcd}, src={src_ip}:{}, dst={dst_ip}:{}", + ports.map_or(String::new(), |p| format!("{}", p.0)), + ports.map_or(String::new(), |p| format!("{}", p.1)), + ); + } +} + +impl NetworkFunction for FlowFilter { + fn process<'a, Input: Iterator> + 'a>( + &'a mut self, + input: Input, + ) -> impl Iterator> + 'a { + input.filter_map(|mut packet| { + if let Some(tablesr) = &self.tablesr.enter() { + if !packet.is_done() { + self.process_packet(tablesr, &mut packet); + } + } else { + error!("{}: failed to read flow filter table", self.name); + packet.done(DoneReason::InternalFailure); + } + packet.enforce() + }) + } +} diff --git a/flow-filter/src/setup.rs b/flow-filter/src/setup.rs new file mode 100644 index 000000000..82071dab1 --- /dev/null +++ b/flow-filter/src/setup.rs @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use crate::FlowFilterTable; +use config::ConfigError; +use config::external::overlay::Overlay; +use config::external::overlay::vpc::Peering; +use config::external::overlay::vpcpeering::VpcExpose; +use config::utils::{ConfigUtilError, collapse_prefixes_peering}; +use net::packet::VpcDiscriminant; + +impl FlowFilterTable { + /// Build a [`FlowFilterTable`] from an overlay + pub fn build_from_overlay(overlay: &Overlay) -> Result { + let mut table = FlowFilterTable::new(); + for vpc in overlay.vpc_table.values() { + for peering in &vpc.peerings { + // Get the destination VPC discriminant + let src_vpcd = VpcDiscriminant::VNI(vpc.vni); + let dst_vpcd = Self::get_dst_vpcd_for_peering(overlay, peering)?; + table.add_peering(peering, src_vpcd, dst_vpcd)?; + } + } + Ok(table) + } + + fn get_dst_vpcd_for_peering( + overlay: &Overlay, + peering: &Peering, + ) -> Result { + Ok(VpcDiscriminant::VNI( + overlay + .vpc_table + .get_vpc_by_vpcid(&peering.remote_id) + .ok_or_else(|| { + ConfigError::FailureApply(format!( + "Remote VPC {} not found in VPC table", + peering.remote_id + )) + })? + .vni, + )) + } + + fn add_peering( + &mut self, + peering: &Peering, + src_vpcd: VpcDiscriminant, + dst_vpcd: VpcDiscriminant, + ) -> Result<(), ConfigError> { + // "Collapse" prefixes to get rid of exclusion prefixes + let collapsed_peering = collapse_prefixes_peering(peering).map_err(|e| match e { + ConfigUtilError::SplitPrefixError(prefix) => { + ConfigError::FailureApply(format!("Failed to split prefix: {prefix}")) + } + })?; + + for local_prefix in collapsed_peering + .local + .exposes + .iter() + .flat_map(VpcExpose::public_ips) + { + for remote_prefix in collapsed_peering + .remote + .exposes + .iter() + .flat_map(VpcExpose::public_ips) + { + self.insert( + src_vpcd, + dst_vpcd, + local_prefix.prefix(), + local_prefix.ports().into(), + remote_prefix.prefix(), + remote_prefix.ports().into(), + ); + } + } + Ok(()) + } +} diff --git a/flow-filter/src/tables.rs b/flow-filter/src/tables.rs new file mode 100644 index 000000000..576af0136 --- /dev/null +++ b/flow-filter/src/tables.rs @@ -0,0 +1,242 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use crate::ip_port_prefix_trie::IpPortPrefixTrie; +use lpm::prefix::range_map::DisjointRangesBTreeMap; +use lpm::prefix::{PortRange, Prefix}; +use net::packet::VpcDiscriminant; +use std::collections::{BTreeSet, HashMap}; +use std::fmt::Debug; +use std::net::IpAddr; + +/// A structure to store information about allowed flows between VPCs. +#[derive(Debug, Clone)] +pub struct FlowFilterTable(HashMap); + +impl FlowFilterTable { + #[allow(clippy::new_without_default)] + pub(crate) fn new() -> Self { + Self(HashMap::new()) + } + + fn insert_table(&mut self, src_vpcd: VpcDiscriminant, table: VpcConnectionsTable) { + self.0.insert(src_vpcd, table); + } + + fn get_table(&self, src_vpcd: VpcDiscriminant) -> Option<&VpcConnectionsTable> { + self.0.get(&src_vpcd) + } + + fn get_table_mut(&mut self, src_vpcd: VpcDiscriminant) -> Option<&mut VpcConnectionsTable> { + self.0.get_mut(&src_vpcd) + } + + // Check whether the destination address and port match valid prefixes and port ranges in one, + // or several, of the remote exposes data retrieved for a given source prefix and ports + fn find_from_remote_exposes_data( + dst_addr: &IpAddr, + dst_port: Option, + remote_exposes_data: &[RemoteExposeData], + ) -> bool { + remote_exposes_data + .iter() + .any(|remote| remote.prefixes.lookup(dst_addr, dst_port).is_some()) + } + + /// Check whether a flow is in the table, in other words, whether it's allowed. + pub(crate) fn contains( + &self, + src_vpcd: VpcDiscriminant, + src_addr: &IpAddr, + dst_addr: &IpAddr, + ports: Option<(u16, u16)>, + ) -> bool { + let Some(table) = self.get_table(src_vpcd) else { + return false; + }; + + let (src_port, dst_port) = ports.unzip(); + let Some((_, connection_data)) = table.lookup(src_addr, src_port) else { + return false; + }; + + match connection_data { + ConnectionTableValue::AnyPort(remote_exposes_data) => { + // Check whether the destination address and port match valid prefixes and port + // ranges in one, or several, of the remote exposes data + Self::find_from_remote_exposes_data(dst_addr, dst_port, remote_exposes_data) + } + ConnectionTableValue::Ranges(ranges) => { + let Some(src_port) = src_port else { + // If we don't have a source port, we can't hope to find a matching port range + return false; + }; + // Look for remote expose data for the port range associated to our source port + let Some((_, remote_exposes_data)) = ranges.lookup(&src_port) else { + return false; + }; + Self::find_from_remote_exposes_data(dst_addr, dst_port, remote_exposes_data) + } + } + } + + pub(crate) fn insert( + &mut self, + src_vpcd: VpcDiscriminant, + dst_vpcd: VpcDiscriminant, + src_prefix: Prefix, + src_port_range: OptionalPortRange, + dst_prefix: Prefix, + dst_port_range: OptionalPortRange, + ) { + if let Some(table) = self.get_table_mut(src_vpcd) { + table.insert( + dst_vpcd, + src_prefix, + src_port_range, + dst_prefix, + dst_port_range, + ); + } else { + let mut table = VpcConnectionsTable::new(); + table.insert( + dst_vpcd, + src_prefix, + src_port_range, + dst_prefix, + dst_port_range, + ); + self.insert_table(src_vpcd, table); + } + } +} + +#[derive(Debug, Clone)] +struct VpcConnectionsTable(IpPortPrefixTrie); + +impl VpcConnectionsTable { + fn new() -> Self { + Self(IpPortPrefixTrie::new()) + } + + fn lookup(&self, addr: &IpAddr, port: Option) -> Option<(Prefix, &ConnectionTableValue)> { + self.0.lookup(addr, port) + } + + fn upsert_remote_data( + remote_exposes_data: &mut Vec, + dst_vpcd: VpcDiscriminant, + dst_prefix: Prefix, + dst_port_range: OptionalPortRange, + ) { + let remote_expose = remote_exposes_data + .iter_mut() + .find(|remote| remote._vpcd == dst_vpcd); + match remote_expose { + Some(expose) => expose.prefixes.insert(dst_prefix, dst_port_range.into()), + None => remote_exposes_data.push(RemoteExposeData { + _vpcd: dst_vpcd, + prefixes: IpPortPrefixTrie::from(dst_prefix, dst_port_range.into()), + }), + } + } + + fn insert( + &mut self, + dst_vpcd: VpcDiscriminant, + src_prefix: Prefix, + src_port_range: OptionalPortRange, + dst_prefix: Prefix, + dst_port_range: OptionalPortRange, + ) { + if let Some(value) = self.0.get_mut(src_prefix) { + match value { + ConnectionTableValue::AnyPort(remote_exposes_data) => { + Self::upsert_remote_data( + remote_exposes_data, + dst_vpcd, + dst_prefix, + dst_port_range, + ); + } + ConnectionTableValue::Ranges(map) => { + let OptionalPortRange::Some(src_port_range) = src_port_range else { + // We already have an entry with port ranges for this src_prefix, and we're + // trying to add a port range that covers all existing ports: this means + // we've got some overlap, this should never happen. + unreachable!() + }; + let remote_exposes_data = map + .get_mut(&src_port_range) + // We found an entry for this port range, we must have the port range in the map + .unwrap_or_else(|| unreachable!()); + Self::upsert_remote_data( + remote_exposes_data, + dst_vpcd, + dst_prefix, + dst_port_range, + ); + } + } + } else { + // No entry yet for this src_prefix, create and insert one + let remote_exposes_data = vec![RemoteExposeData { + _vpcd: dst_vpcd, + prefixes: IpPortPrefixTrie::from(dst_prefix, dst_port_range.into()), + }]; + let value = match src_port_range { + OptionalPortRange::NoPortRangeMeansAllPorts => { + ConnectionTableValue::AnyPort(remote_exposes_data) + } + OptionalPortRange::Some(port_range) => { + let mut map = DisjointRangesBTreeMap::new(); + map.insert(port_range, remote_exposes_data); + ConnectionTableValue::Ranges(map) + } + }; + self.0.insert(src_prefix, value); + } + } +} + +#[derive(Debug, Clone)] +pub(crate) enum ConnectionTableValue { + AnyPort(Vec), + Ranges(DisjointRangesBTreeMap>), +} + +#[derive(Debug, Clone)] +pub(crate) enum AssociatedRanges { + AnyPort, + Ranges(BTreeSet), +} + +#[derive(Debug, Clone)] +pub(crate) struct RemoteExposeData { + _vpcd: VpcDiscriminant, // Unused at the moment; useful for replacing dst_vpcd lookup in the future? + prefixes: IpPortPrefixTrie, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum OptionalPortRange { + NoPortRangeMeansAllPorts, + Some(PortRange), +} + +impl From> for OptionalPortRange { + fn from(opt: Option) -> Self { + match opt { + Some(range) => OptionalPortRange::Some(range), + None => OptionalPortRange::NoPortRangeMeansAllPorts, + } + } +} + +impl From for AssociatedRanges { + fn from(optional_port_range: OptionalPortRange) -> Self { + match optional_port_range { + OptionalPortRange::NoPortRangeMeansAllPorts => AssociatedRanges::AnyPort, + OptionalPortRange::Some(range) => AssociatedRanges::Ranges(BTreeSet::from([range])), + } + } +} diff --git a/lpm/src/prefix/range_map.rs b/lpm/src/prefix/range_map.rs index df5b7a2a4..2e5c6cf47 100644 --- a/lpm/src/prefix/range_map.rs +++ b/lpm/src/prefix/range_map.rs @@ -51,6 +51,10 @@ where self.0.get(range) } + pub fn get_mut(&mut self, range: &R) -> Option<&mut V> { + self.0.get_mut(range) + } + pub fn lookup(&self, key: &K) -> Option<(&R, &V)> where R: UpperBoundFrom + RangeBounds, @@ -81,6 +85,10 @@ where pub fn range_mut(&mut self, range: impl RangeBounds) -> impl Iterator { self.0.range_mut(range) } + + pub fn keys(&self) -> impl Iterator { + self.0.keys() + } } impl Default for DisjointRangesBTreeMap { diff --git a/lpm/src/trie/mod.rs b/lpm/src/trie/mod.rs index eef40f015..afbb71769 100644 --- a/lpm/src/trie/mod.rs +++ b/lpm/src/trie/mod.rs @@ -101,6 +101,16 @@ impl IpPrefixTrie { } } + pub fn get_mut(&mut self, prefix: Q) -> Option<&mut V> + where + Q: Into, + { + match prefix.into() { + Prefix::IPV4(prefix) => self.ipv4.get_mut(prefix), + Prefix::IPV6(prefix) => self.ipv6.get_mut(prefix), + } + } + pub fn matching_entries(&self, addr: Q) -> Box + '_> where Q: Into, diff --git a/mgmt/Cargo.toml b/mgmt/Cargo.toml index 5fb394b73..41b503b51 100644 --- a/mgmt/Cargo.toml +++ b/mgmt/Cargo.toml @@ -20,6 +20,7 @@ bolero = ["dep:bolero", "interface-manager/bolero", "id/bolero", "net/bolero", " args = { workspace = true } config = { workspace = true } concurrency = { workspace = true } +flow-filter = { workspace = true } gateway_config = { workspace = true } id = { workspace = true } interface-manager = { workspace = true } diff --git a/mgmt/src/processor/proc.rs b/mgmt/src/processor/proc.rs index 91a93a9df..9fedded86 100644 --- a/mgmt/src/processor/proc.rs +++ b/mgmt/src/processor/proc.rs @@ -19,6 +19,7 @@ use config::{external::overlay::Overlay, internal::device::tracecfg::TracingConf use crate::processor::confbuild::internal::build_internal_config; use crate::processor::confbuild::router::generate_router_config; +use flow_filter::{FlowFilterTable, FlowFilterTableWriter}; use nat::stateful::NatAllocatorWriter; use nat::stateless::NatTablesWriter; use nat::stateless::setup::build_nat_configuration; @@ -84,6 +85,9 @@ pub struct ConfigProcessorParams { // writer for VPC routing table pub vpcdtablesw: VpcDiscTablesWriter, + // writer for flow filter table + pub flowfilterw: FlowFilterTableWriter, + // store for vpc stats pub vpc_stats_store: Arc, } @@ -169,6 +173,7 @@ impl ConfigProcessor { &mut self.proc_params.nattablesw, &mut self.proc_params.natallocatorw, &mut self.proc_params.vpcdtablesw, + &mut self.proc_params.flowfilterw, ) .await?; @@ -199,6 +204,7 @@ impl ConfigProcessor { &mut self.proc_params.nattablesw, &mut self.proc_params.natallocatorw, &mut self.proc_params.vpcdtablesw, + &mut self.proc_params.flowfilterw, ) .await; } @@ -539,6 +545,15 @@ fn apply_dst_vpcd_lookup_config( Ok(()) } +fn apply_flow_filtering_config( + overlay: &Overlay, + flowfilterw: &mut FlowFilterTableWriter, +) -> ConfigResult { + let flow_filter_table = FlowFilterTable::build_from_overlay(overlay)?; + flowfilterw.update_flow_filter_table(flow_filter_table); + Ok(()) +} + fn apply_tracing_config(tracing: &Option) -> ConfigResult { // Apply tracing config if provided. Otherwise, apply an empty/default config. let default = TracingConfig::default(); @@ -570,6 +585,7 @@ async fn apply_gw_config( nattablesw: &mut NatTablesWriter, natallocatorw: &mut NatAllocatorWriter, vpcdtablesw: &mut VpcDiscTablesWriter, + flowfilterw: &mut FlowFilterTableWriter, ) -> ConfigResult { let genid = config.genid(); @@ -612,6 +628,9 @@ async fn apply_gw_config( /* apply dst_vpcd_lookup config */ apply_dst_vpcd_lookup_config(&config.external.overlay, vpcdtablesw)?; + /* apply flow filtering config */ + apply_flow_filtering_config(&config.external.overlay, flowfilterw)?; + /* update stats mappings and seed names to the stats store */ let pairs = update_stats_vpc_mappings(config, vpcmapw); drop(pairs); // pairs used by caller diff --git a/mgmt/src/tests/mgmt.rs b/mgmt/src/tests/mgmt.rs index 13674e2ea..d50a9dca0 100644 --- a/mgmt/src/tests/mgmt.rs +++ b/mgmt/src/tests/mgmt.rs @@ -11,6 +11,7 @@ pub mod test { use config::external::gwgroup::GwGroupTable; use fixin::wrap; + use flow_filter::FlowFilterTableWriter; use lpm::prefix::Prefix; use nat::stateful::NatAllocatorWriter; use nat::stateless::NatTablesWriter; @@ -422,16 +423,19 @@ pub mod test { /* vpcmappings for vpc name resolution for vpc stats */ let vpcmapw = VpcMapWriter::::new(); - /* crate NatTables for stateless nat */ + /* create NatTables for stateless nat */ let nattablesw = NatTablesWriter::new(); - /* crate NatAllocator for stateful nat */ + /* create NatAllocator for stateful nat */ let natallocatorw = NatAllocatorWriter::new(); - /* crate VniTables for dst_vni_lookup */ + /* create VniTables for dst_vni_lookup */ let vpcdtablesw = VpcDiscTablesWriter::new(); - /* NEW: VPC stats store (Arc) */ + /* create FlowFilterTable for flow filtering */ + let flowfilterw = FlowFilterTableWriter::new(); + + /* create VPC stats store (Arc) */ let vpc_stats_store = VpcStatsStore::new(); /* build configuration of mgmt config processor */ @@ -441,6 +445,7 @@ pub mod test { nattablesw, natallocatorw, vpcdtablesw, + flowfilterw, vpc_stats_store, }; From eff4a4dc2db2044b488920e4a276b8bc362acc78 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Fri, 19 Dec 2025 01:54:43 +0000 Subject: [PATCH 2/5] feat(flow-filter): Split overlapping prefixes for shared IPs support When looking up for the destination VPC for a packet, we need a special treatment when IP/port ranges are shared across peerings. We already have the checks in place to make sure that the lookup stage does not return an answer when there are multiple possible matching destination VPCs (based on source VPC and destination address). However, the restriction is too strong: when we have overlapping, but distinct IP ranges shared across peerings, we can tell the destination VPC for some portion of the ranges. Consider the following case, for example: VPC A <-> VPC B <-> VPC C 1.0.0.0/24 2.0.0.0/24 2.0.0.0/24 1.0.0.0/24 In this case, we just can't tell, for packets coming from VPC B, what the destination VPC is (unless there's some stateful NAT session in place, but that's beyond the destination VPC lookup stage's scope). But if, instead, we have: VPC A <-> VPC B <-> VPC C 1.0.0.0/23 2.0.0.0/24 2.0.0.0/24 1.0.0.0/24 Then for packets going to 1.0.1.0/24, we know the destination is VPC A. Right now the destination VPC lookup does not account for the unambiguous portion of prefixes. This commit does not change that, but it improves the flow-filter stage instead, which is used to determine whether packets belong to allowed connections (as defined by the user's peering configuration). This is not necessary to check that the packet is allowed (if multiple valid connections are present, the packet is definitely allowed), but due to the context present in the flow-filter table, we consider replacing the destination VPC lookup stage by the flow-filter stage only. This would make one less stage in the pipeline, and would address the shared IPs issues described above. Signed-off-by: Quentin Monnet --- flow-filter/src/setup.rs | 123 +++++++++++++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 17 deletions(-) diff --git a/flow-filter/src/setup.rs b/flow-filter/src/setup.rs index 82071dab1..1a8e413df 100644 --- a/flow-filter/src/setup.rs +++ b/flow-filter/src/setup.rs @@ -4,16 +4,20 @@ use crate::FlowFilterTable; use config::ConfigError; use config::external::overlay::Overlay; -use config::external::overlay::vpc::Peering; -use config::external::overlay::vpcpeering::VpcExpose; +use config::external::overlay::vpc::{Peering, Vpc}; +use config::external::overlay::vpcpeering::{VpcExpose, VpcManifest}; +use config::internal::interfaces::interface::InterfaceConfigTable; use config::utils::{ConfigUtilError, collapse_prefixes_peering}; +use lpm::prefix::{IpRangeWithPorts, PrefixWithOptionalPorts}; use net::packet::VpcDiscriminant; impl FlowFilterTable { /// Build a [`FlowFilterTable`] from an overlay pub fn build_from_overlay(overlay: &Overlay) -> Result { + let clean_vpc_table = cleanup_vpc_table(overlay.vpc_table.values().collect())?; let mut table = FlowFilterTable::new(); - for vpc in overlay.vpc_table.values() { + + for vpc in &clean_vpc_table { for peering in &vpc.peerings { // Get the destination VPC discriminant let src_vpcd = VpcDiscriminant::VNI(vpc.vni); @@ -48,20 +52,8 @@ impl FlowFilterTable { src_vpcd: VpcDiscriminant, dst_vpcd: VpcDiscriminant, ) -> Result<(), ConfigError> { - // "Collapse" prefixes to get rid of exclusion prefixes - let collapsed_peering = collapse_prefixes_peering(peering).map_err(|e| match e { - ConfigUtilError::SplitPrefixError(prefix) => { - ConfigError::FailureApply(format!("Failed to split prefix: {prefix}")) - } - })?; - - for local_prefix in collapsed_peering - .local - .exposes - .iter() - .flat_map(VpcExpose::public_ips) - { - for remote_prefix in collapsed_peering + for local_prefix in peering.local.exposes.iter().flat_map(|expose| &expose.ips) { + for remote_prefix in peering .remote .exposes .iter() @@ -80,3 +72,100 @@ impl FlowFilterTable { Ok(()) } } + +fn clone_skipping_peerings(vpc: &Vpc) -> Vpc { + Vpc { + name: vpc.name.clone(), + id: vpc.id.clone(), + vni: vpc.vni, + interfaces: InterfaceConfigTable::default(), + peerings: vec![], + } +} + +fn clone_skipping_local_exposes(peering: &Peering) -> Peering { + Peering { + name: peering.name.clone(), + local: VpcManifest { + name: peering.local.name.clone(), + exposes: vec![], + }, + remote: peering.remote.clone(), + remote_id: peering.remote_id.clone(), + gwgroup: peering.gwgroup.clone(), + adv_communities: peering.adv_communities.clone(), + } +} + +fn cleanup_vpc_table(vpcs: Vec<&Vpc>) -> Result, ConfigError> { + let mut new_set = Vec::new(); + for vpc in vpcs { + let mut tmp_vpc = clone_skipping_peerings(vpc); + + for peering in &vpc.peerings { + // "Collapse" prefixes to get rid of exclusion prefixes + let collapsed_peering = collapse_prefixes_peering(peering).map_err(|e| match e { + ConfigUtilError::SplitPrefixError(prefix) => { + ConfigError::FailureApply(format!("Failed to split prefix: {prefix}")) + } + })?; + tmp_vpc.peerings.push(collapsed_peering); + } + + let new_vpc = split_overlaps_in_src_prefixes(&mut tmp_vpc); + + new_set.push(new_vpc); + } + Ok(new_set) +} + +fn split_overlaps_in_src_prefixes(vpc: &mut Vpc) -> Vpc { + let mut new_vpc = clone_skipping_peerings(vpc); + while let Some(mut peering) = vpc.peerings.pop() { + let mut new_peering = clone_skipping_local_exposes(&peering); + while let Some(mut expose) = peering.local.exposes.pop() { + let mut new_expose = VpcExpose::default(); + 'next_prefix: while let Some(prefix) = expose.ips.pop_first() { + for other_peering in &vpc.peerings { + for other_expose in &other_peering.local.exposes { + for other_prefix in other_expose.ips.iter() { + if prefix.overlaps(other_prefix) && !other_prefix.covers(&prefix) { + expose.ips.extend(split_overlapping(prefix, *other_prefix)); + continue 'next_prefix; + } + } + } + } + for new_peering in &new_vpc.peerings { + for new_expose in &new_peering.local.exposes { + for new_prefix in new_expose.ips.iter() { + if prefix.overlaps(new_prefix) && !new_prefix.covers(&prefix) { + expose.ips.extend(split_overlapping(prefix, *new_prefix)); + continue 'next_prefix; + } + } + } + } + new_expose = new_expose.ip(prefix); + } + new_peering.local.exposes.push(new_expose); + } + new_vpc.peerings.push(new_peering); + } + new_vpc +} + +fn split_overlapping( + prefix_to_split: PrefixWithOptionalPorts, + mask_prefix: PrefixWithOptionalPorts, +) -> Vec { + debug_assert!(prefix_to_split.overlaps(&mask_prefix) && !mask_prefix.covers(&prefix_to_split)); + let mut split_prefixes = prefix_to_split.subtract(&mask_prefix); + split_prefixes.push( + prefix_to_split + .intersection(&mask_prefix) + // Intersection non-empty given that prefixes overlap + .unwrap_or_else(|| unreachable!()), + ); + split_prefixes +} From 8f3cea0480e4a53742ffc4d89b210f33ec8294f8 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Fri, 19 Dec 2025 02:24:47 +0000 Subject: [PATCH 3/5] feat(flow-filter): Set destination VPC for packet from flow-filter In preparation for replacing the destination VPC lookup stage, make the flow-filter set the destination VPC in the packet's metadata. The expectation is that the flow-filter stage should always find the same destination VPC as the dst_vpcd_lookup stage, with additional precision in the case of overlapping but distinct prefixes shared across peerings. The dst_vpcd_lookup stage is not remove yet. Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 49 ++++++++++++++++++++++++++++----------- flow-filter/src/tables.rs | 37 +++++++++++++++++++---------- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 512154ebe..0cb4a7b6e 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -5,6 +5,7 @@ use net::buffer::PacketBufferMut; use net::headers::{TryIp, TryTransport}; use net::packet::{DoneReason, Packet}; use pipeline::NetworkFunction; +use std::net::IpAddr; use std::num::NonZero; use tracing::{debug, error}; @@ -17,6 +18,8 @@ pub use filter_rw::{FlowFilterTableReader, FlowFilterTableReaderFactory, FlowFil pub use tables::FlowFilterTable; use tracectl::trace_target; + +use crate::tables::VpcdLookupResult; trace_target!("flow-filter", LevelFilter::INFO, &["pipeline"]); /// A structure to implement the flow filter pipeline stage. @@ -43,13 +46,13 @@ impl FlowFilter { let nfi = &self.name; let Some(net) = packet.try_ip() else { - debug!("{nfi}: Packet has no IP headers: dropping"); + debug!("{nfi}: No IP headers found, dropping packet"); packet.done(DoneReason::NotIp); return; }; - let (Some(src_vpcd), Some(dst_vpcd)) = (packet.meta.src_vpcd, packet.meta.dst_vpcd) else { - debug!("{nfi}: Packet missing VPC discriminants: dropping"); + let Some(src_vpcd) = packet.meta.src_vpcd else { + debug!("{nfi}: Missing source VPC discriminant, dropping packet"); packet.done(DoneReason::Unroutable); return; }; @@ -61,22 +64,28 @@ impl FlowFilter { .map(NonZero::get) .zip(t.dst_port().map(NonZero::get)) }); + let log_str = format_packet_addrs_ports(&src_ip, &dst_ip, ports); - if !tablesr.contains(src_vpcd, &src_ip, &dst_ip, ports) { - debug!( - "{nfi}: Flow not allowed, dropping packet: src_vpcd={src_vpcd}, dst_vpcd={dst_vpcd}, src={src_ip}:{}, dst={dst_ip}:{}", - ports.map_or(String::new(), |p| format!("{}", p.0)), - ports.map_or(String::new(), |p| format!("{}", p.1)), - ); + let (allowed, dst_vpcd_lookup_res) = tablesr.contains(src_vpcd, &src_ip, &dst_ip, ports); + if !allowed { + debug!("{nfi}: Flow not allowed, dropping packet: {log_str}"); packet.done(DoneReason::Filtered); return; } - debug!( - "{nfi}: Flow allowed: src_vpcd={src_vpcd}, src={src_ip}:{}, dst={dst_ip}:{}", - ports.map_or(String::new(), |p| format!("{}", p.0)), - ports.map_or(String::new(), |p| format!("{}", p.1)), - ); + match dst_vpcd_lookup_res { + VpcdLookupResult::Some(dst_vpcd) => { + debug!("{nfi}: Set packet dst_vpcd to {dst_vpcd}: {log_str}"); + packet.meta.dst_vpcd = Some(dst_vpcd); + } + VpcdLookupResult::MultipleMatches => {} + VpcdLookupResult::None => { + debug!("{nfi}: No matching VPC found, dropping packet: {log_str}"); + packet.done(DoneReason::Unroutable); + } + } + + debug!("{nfi}: Flow allowed: {log_str}"); } } @@ -98,3 +107,15 @@ impl NetworkFunction for FlowFilter { }) } } + +fn format_packet_addrs_ports( + src_addr: &IpAddr, + dst_addr: &IpAddr, + ports: Option<(u16, u16)>, +) -> String { + format!( + "src={src_addr}{}, dst={dst_addr}{}", + ports.map_or(String::new(), |p| format!(":{}", p.0)), + ports.map_or(String::new(), |p| format!(":{}", p.1)) + ) +} diff --git a/flow-filter/src/tables.rs b/flow-filter/src/tables.rs index 576af0136..5627ed190 100644 --- a/flow-filter/src/tables.rs +++ b/flow-filter/src/tables.rs @@ -13,6 +13,13 @@ use std::net::IpAddr; #[derive(Debug, Clone)] pub struct FlowFilterTable(HashMap); +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum VpcdLookupResult { + Some(VpcDiscriminant), + MultipleMatches, + None, +} + impl FlowFilterTable { #[allow(clippy::new_without_default)] pub(crate) fn new() -> Self { @@ -37,10 +44,16 @@ impl FlowFilterTable { dst_addr: &IpAddr, dst_port: Option, remote_exposes_data: &[RemoteExposeData], - ) -> bool { - remote_exposes_data + ) -> (bool, VpcdLookupResult) { + let lookup_result: Vec<_> = remote_exposes_data .iter() - .any(|remote| remote.prefixes.lookup(dst_addr, dst_port).is_some()) + .filter(|remote| remote.prefixes.lookup(dst_addr, dst_port).is_some()) + .collect(); + match (lookup_result.len(), lookup_result.first()) { + (0, _) => (false, VpcdLookupResult::None), + (1, Some(remote)) => (true, VpcdLookupResult::Some(remote.vpcd)), + _ => (true, VpcdLookupResult::MultipleMatches), + } } /// Check whether a flow is in the table, in other words, whether it's allowed. @@ -50,14 +63,14 @@ impl FlowFilterTable { src_addr: &IpAddr, dst_addr: &IpAddr, ports: Option<(u16, u16)>, - ) -> bool { + ) -> (bool, VpcdLookupResult) { let Some(table) = self.get_table(src_vpcd) else { - return false; + return (false, VpcdLookupResult::None); }; let (src_port, dst_port) = ports.unzip(); let Some((_, connection_data)) = table.lookup(src_addr, src_port) else { - return false; + return (false, VpcdLookupResult::None); }; match connection_data { @@ -69,11 +82,11 @@ impl FlowFilterTable { ConnectionTableValue::Ranges(ranges) => { let Some(src_port) = src_port else { // If we don't have a source port, we can't hope to find a matching port range - return false; + return (false, VpcdLookupResult::None); }; // Look for remote expose data for the port range associated to our source port let Some((_, remote_exposes_data)) = ranges.lookup(&src_port) else { - return false; + return (false, VpcdLookupResult::None); }; Self::find_from_remote_exposes_data(dst_addr, dst_port, remote_exposes_data) } @@ -131,11 +144,11 @@ impl VpcConnectionsTable { ) { let remote_expose = remote_exposes_data .iter_mut() - .find(|remote| remote._vpcd == dst_vpcd); + .find(|remote| remote.vpcd == dst_vpcd); match remote_expose { Some(expose) => expose.prefixes.insert(dst_prefix, dst_port_range.into()), None => remote_exposes_data.push(RemoteExposeData { - _vpcd: dst_vpcd, + vpcd: dst_vpcd, prefixes: IpPortPrefixTrie::from(dst_prefix, dst_port_range.into()), }), } @@ -181,7 +194,7 @@ impl VpcConnectionsTable { } else { // No entry yet for this src_prefix, create and insert one let remote_exposes_data = vec![RemoteExposeData { - _vpcd: dst_vpcd, + vpcd: dst_vpcd, prefixes: IpPortPrefixTrie::from(dst_prefix, dst_port_range.into()), }]; let value = match src_port_range { @@ -213,7 +226,7 @@ pub(crate) enum AssociatedRanges { #[derive(Debug, Clone)] pub(crate) struct RemoteExposeData { - _vpcd: VpcDiscriminant, // Unused at the moment; useful for replacing dst_vpcd lookup in the future? + vpcd: VpcDiscriminant, prefixes: IpPortPrefixTrie, } From 8dd12f0204f475e5ad246e78a0394703239c8b97 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Fri, 19 Dec 2025 02:42:23 +0000 Subject: [PATCH 4/5] refactor(nat): Switch to IpPortPrefixTrie for stateless NAT tables Rather than implementing longest-prefix-match lookups for IP prefixes with associated port ranges, reuse the generic struct that we recently introduced in the flow-filter crate. There should be no functional change. Signed-off-by: Quentin Monnet --- Cargo.lock | 1 + flow-filter/src/ip_port_prefix_trie.rs | 23 +++++++--- flow-filter/src/lib.rs | 1 + nat/Cargo.toml | 1 + nat/src/stateless/setup/mod.rs | 4 +- nat/src/stateless/setup/tables.rs | 61 +++++++++++++------------- 6 files changed, 51 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f856f0477..5b8354d81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1515,6 +1515,7 @@ dependencies = [ "bolero", "dataplane-concurrency", "dataplane-config", + "dataplane-flow-filter", "dataplane-flow-info", "dataplane-lpm", "dataplane-net", diff --git a/flow-filter/src/ip_port_prefix_trie.rs b/flow-filter/src/ip_port_prefix_trie.rs index d7976b3a1..64868d3c2 100644 --- a/flow-filter/src/ip_port_prefix_trie.rs +++ b/flow-filter/src/ip_port_prefix_trie.rs @@ -9,7 +9,7 @@ use std::fmt::Debug; use std::net::IpAddr; use std::ops::RangeBounds; -pub(crate) trait ValueWithAssociatedRanges { +pub trait ValueWithAssociatedRanges { fn covers_all_ports(&self) -> bool; fn covers_port(&self, port: u16) -> bool; } @@ -56,7 +56,7 @@ impl ValueWithAssociatedRanges for ConnectionTableValue { } #[derive(Debug, Clone)] -pub(crate) struct IpPortPrefixTrie(IpPrefixTrie) +pub struct IpPortPrefixTrie(IpPrefixTrie) where V: Debug + Clone + ValueWithAssociatedRanges; @@ -65,26 +65,26 @@ where V: Debug + Clone + ValueWithAssociatedRanges, { #[must_use] - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self(IpPrefixTrie::new()) } #[must_use] - pub(crate) fn from(prefix: Prefix, value: V) -> Self { + pub fn from(prefix: Prefix, value: V) -> Self { let mut trie = Self::new(); trie.0.insert(prefix, value); trie } - pub(crate) fn insert(&mut self, prefix: Prefix, value: V) { + pub fn insert(&mut self, prefix: Prefix, value: V) { self.0.insert(prefix, value); } - pub(crate) fn get_mut(&mut self, prefix: Prefix) -> Option<&mut V> { + pub fn get_mut(&mut self, prefix: Prefix) -> Option<&mut V> { self.0.get_mut(prefix) } - pub(crate) fn lookup(&self, addr: &IpAddr, port_opt: Option) -> Option<(Prefix, &V)> { + pub fn lookup(&self, addr: &IpAddr, port_opt: Option) -> Option<(Prefix, &V)> { // If the longest matching prefix has no associated port range, we assume it matches any // port, so the lookup is successful if let Some((prefix, value)) = self.0.lookup(*addr) @@ -106,3 +106,12 @@ where None } } + +impl Default for IpPortPrefixTrie +where + V: Debug + Clone + ValueWithAssociatedRanges, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 0cb4a7b6e..0058dfde9 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -15,6 +15,7 @@ mod setup; mod tables; pub use filter_rw::{FlowFilterTableReader, FlowFilterTableReaderFactory, FlowFilterTableWriter}; +pub use ip_port_prefix_trie::{IpPortPrefixTrie, ValueWithAssociatedRanges}; pub use tables::FlowFilterTable; use tracectl::trace_target; diff --git a/nat/Cargo.toml b/nat/Cargo.toml index bd20cc35c..9fac71863 100644 --- a/nat/Cargo.toml +++ b/nat/Cargo.toml @@ -14,6 +14,7 @@ arc-swap = { workspace = true } bnum = { workspace = true } concurrency = { workspace = true, features = [] } config = { workspace = true } +flow-filter = { workspace = true } flow-info = { workspace = true } left-right = { workspace = true } linkme = { workspace = true } diff --git a/nat/src/stateless/setup/mod.rs b/nat/src/stateless/setup/mod.rs index b9b7b39ff..59931038c 100644 --- a/nat/src/stateless/setup/mod.rs +++ b/nat/src/stateless/setup/mod.rs @@ -79,7 +79,7 @@ impl PerVniTable { let (prefix, value) = res?; // It's OK if the prefix already exists in the trie, we may try to insert it // multiple times if we have disjoint port ranges for this prefix. - let _ = peering_table.insert(prefix, value); + peering_table.insert(prefix, value); Ok(()) }) })?; @@ -94,7 +94,7 @@ impl PerVniTable { let (prefix, value) = res?; // It's OK if the prefix already exists in the trie, we may try to insert it // multiple times if we have disjoint port ranges for this prefix. - let _ = self.dst_nat.insert(prefix, value); + self.dst_nat.insert(prefix, value); Ok(()) }) })?; diff --git a/nat/src/stateless/setup/tables.rs b/nat/src/stateless/setup/tables.rs index 203e76aa8..ff8518018 100644 --- a/nat/src/stateless/setup/tables.rs +++ b/nat/src/stateless/setup/tables.rs @@ -3,9 +3,9 @@ use ahash::RandomState; use bnum::cast::CastFrom; +use flow_filter::{IpPortPrefixTrie, ValueWithAssociatedRanges}; use lpm::prefix::range_map::{DisjointRangesBTreeMap, UpperBoundFrom}; use lpm::prefix::{IpPrefix, IpRangeWithPorts, PortRange, Prefix, PrefixSize, PrefixWithPortsSize}; -use lpm::trie::IpPrefixTrie; use net::vxlan::Vni; use std::collections::{BTreeSet, HashMap}; use std::fmt::Debug; @@ -166,22 +166,18 @@ fn addr_offset_in_prefix_with_ports( /// From a current address prefix, find the target address prefix. #[derive(Debug, Default, Clone)] -pub struct NatRuleTable(IpPrefixTrie); +pub struct NatRuleTable(IpPortPrefixTrie); impl NatRuleTable { #[must_use] /// Creates a new empty [`NatRuleTable`] pub fn new() -> Self { - Self(IpPrefixTrie::new()) + Self(IpPortPrefixTrie::new()) } /// Inserts a new entry in the table - /// - /// # Returns - /// - /// Returns the previous value associated with the prefix if it existed, or `None` otherwise. - pub fn insert(&mut self, prefix: Prefix, value: NatTableValue) -> Option { - self.0.insert(prefix, value) + pub fn insert(&mut self, prefix: Prefix, value: NatTableValue) { + self.0.insert(prefix, value); } /// Looks up for the value associated with the given address. @@ -192,28 +188,7 @@ impl NatRuleTable { /// If the address does not match any prefix, it returns `None`. #[must_use] pub fn lookup(&self, addr: &IpAddr, port_opt: Option) -> Option<(Prefix, &NatTableValue)> { - // If we have a matching NatTableValue::Nat for the address, return it - let result = self.0.lookup(*addr); - if matches!(result, Some((_prefix, NatTableValue::Nat(_value)))) { - return result; - } - - // Else, we need to check all matching IP prefixes (not necessarily the longest), and their - // port ranges. We expect the trie to contain only one matching IP prefix matching the - // address and associated to a port range matching the port, so we return the first we find. - let port = port_opt?; - let matching_entries = self.0.matching_entries(*addr); - for (prefix, value) in matching_entries { - if let NatTableValue::Pat(pat_value) = value - && pat_value - .prefix_port_ranges - .iter() - .any(|pr| pr.contains(&port)) - { - return Some((prefix, value)); - } - } - None + self.0.lookup(addr, port_opt) } } @@ -227,6 +202,30 @@ pub enum NatTableValue { Pat(PortAddrTranslationValue), } +impl ValueWithAssociatedRanges for NatTableValue { + fn covers_all_ports(&self) -> bool { + match self { + NatTableValue::Nat(_) => true, + NatTableValue::Pat(value) => { + value + .prefix_port_ranges + .iter() + .fold(0, |sum, range| sum + range.len()) + == PortRange::MAX_LENGTH + } + } + } + + fn covers_port(&self, port: u16) -> bool { + match self { + NatTableValue::Nat(_) => true, + NatTableValue::Pat(value) => { + value.prefix_port_ranges.iter().any(|pr| pr.contains(&port)) + } + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct AddrTranslationValue { ranges_tree: DisjointRangesBTreeMap, From 999f16346f48c9f9a6db4c50e0da7b22325e6ec3 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Fri, 19 Dec 2025 16:46:18 +0000 Subject: [PATCH 5/5] test(flow-filter): Add unit tests for flow-filter pipeline stage Add unit tests to test the various modules in the recent flow-filter crate. Most of these were generated by Claude, with manual control and adjustments. Co-Authored-By: Claude Signed-off-by: Quentin Monnet --- flow-filter/Cargo.toml | 1 + flow-filter/src/ip_port_prefix_trie.rs | 225 ++++++++++++++++ flow-filter/src/lib.rs | 335 +++++++++++++++++++++++ flow-filter/src/setup.rs | 357 +++++++++++++++++++++++++ flow-filter/src/tables.rs | 311 +++++++++++++++++++++ 5 files changed, 1229 insertions(+) diff --git a/flow-filter/Cargo.toml b/flow-filter/Cargo.toml index a976dedd4..f20f43c42 100644 --- a/flow-filter/Cargo.toml +++ b/flow-filter/Cargo.toml @@ -16,4 +16,5 @@ tracectl = { workspace = true } tracing = { workspace = true } [dev-dependencies] +lpm = { workspace = true, features = ["testing"] } tracing-test = { workspace = true, features = [] } diff --git a/flow-filter/src/ip_port_prefix_trie.rs b/flow-filter/src/ip_port_prefix_trie.rs index 64868d3c2..844c0c76a 100644 --- a/flow-filter/src/ip_port_prefix_trie.rs +++ b/flow-filter/src/ip_port_prefix_trie.rs @@ -115,3 +115,228 @@ where Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + use lpm::prefix::{PortRange, Prefix}; + use std::collections::BTreeSet; + + #[derive(Debug, Clone)] + enum TestValue { + AnyPort, + Ranges(BTreeSet), + } + + impl ValueWithAssociatedRanges for TestValue { + fn covers_all_ports(&self) -> bool { + match self { + TestValue::AnyPort => true, + TestValue::Ranges(ranges) => { + ranges.iter().fold(0, |sum, range| sum + range.len()) == PortRange::MAX_LENGTH + } + } + } + + fn covers_port(&self, port: u16) -> bool { + match self { + TestValue::AnyPort => true, + TestValue::Ranges(ranges) => ranges.iter().any(|range| range.contains(&port)), + } + } + } + + #[test] + fn test_new() { + let trie: IpPortPrefixTrie = IpPortPrefixTrie::new(); + assert!(trie.lookup(&"192.168.1.1".parse().unwrap(), None).is_none()); + } + + #[test] + fn test_from() { + let prefix = Prefix::from("192.168.1.0/24"); + let value = TestValue::AnyPort; + let trie = IpPortPrefixTrie::from(prefix, value); + + let result = trie.lookup(&"192.168.1.5".parse().unwrap(), None); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix); + } + + #[test] + fn test_insert_and_lookup_any_port() { + let mut trie = IpPortPrefixTrie::new(); + let prefix = Prefix::from("10.0.0.0/16"); + let value = TestValue::AnyPort; + + trie.insert(prefix, value); + + // Should match with any port + let result = trie.lookup(&"10.0.1.5".parse().unwrap(), Some(80)); + assert!(result.is_some()); + let (matched_prefix, matched_value) = result.unwrap(); + assert_eq!(matched_prefix, prefix); + assert!(matches!(matched_value, TestValue::AnyPort)); + + // Should match without port + let result = trie.lookup(&"10.0.1.5".parse().unwrap(), None); + assert!(result.is_some()); + } + + #[test] + fn test_insert_and_lookup_with_port_ranges() { + let mut trie = IpPortPrefixTrie::new(); + let prefix = Prefix::from("172.16.0.0/12"); + let ranges = BTreeSet::from([PortRange::new(80, 90).unwrap()]); + let value = TestValue::Ranges(ranges); + + trie.insert(prefix, value); + + // Should match port in range + let result = trie.lookup(&"172.16.5.10".parse().unwrap(), Some(85)); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix); + + // Should not match port outside range + let result = trie.lookup(&"172.16.5.10".parse().unwrap(), Some(100)); + assert!(result.is_none()); + + // Should not match without port + let result = trie.lookup(&"172.16.5.10".parse().unwrap(), None); + assert!(result.is_none()); + } + + #[test] + fn test_lookup_longest_prefix_match_no_ports() { + let mut trie = IpPortPrefixTrie::new(); + + // Insert prefix with port range + let prefix_with_ports = Prefix::from("192.168.0.0/24"); + let ranges = BTreeSet::from([PortRange::new(80, 90).unwrap()]); + trie.insert(prefix_with_ports, TestValue::Ranges(ranges)); + + // Insert prefix covering all ports + let prefix_alone = Prefix::from("192.168.1.0/24"); + trie.insert(prefix_alone, TestValue::AnyPort); + + // Match wihout port + let result = trie.lookup(&"192.168.1.5".parse().unwrap(), None); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix_alone); + + // Match with a port + let result = trie.lookup(&"192.168.1.5".parse().unwrap(), Some(443)); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix_alone); + + // Fail to match prefix_with_ports without a port + let result = trie.lookup(&"192.168.0.5".parse().unwrap(), None); + assert!(result.is_none()); + + // Match with a port + let result = trie.lookup(&"192.168.0.5".parse().unwrap(), Some(80)); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix_with_ports); + } + + #[test] + fn test_lookup_longest_prefix_match_with_ports() { + let mut trie = IpPortPrefixTrie::new(); + + // Insert broader prefix + let prefix_16 = Prefix::from("192.168.0.0/16"); + let ranges = BTreeSet::from([PortRange::new(80, 90).unwrap()]); + trie.insert(prefix_16, TestValue::Ranges(ranges)); + + // Insert more specific prefix + let prefix_24 = Prefix::from("192.168.1.0/24"); + let ranges = BTreeSet::from([PortRange::new(443, 443).unwrap()]); + trie.insert(prefix_24, TestValue::Ranges(ranges)); + + // Without port, there is not match + let result = trie.lookup(&"192.168.1.5".parse().unwrap(), None); + assert!(result.is_none()); + + // Based on port, we match the more specific prefix + let result = trie.lookup(&"192.168.1.5".parse().unwrap(), Some(443)); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix_24); + + // Based on port, we match the broader prefix + let result = trie.lookup(&"192.168.1.5".parse().unwrap(), Some(80)); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix_16); + } + + #[test] + fn test_get_mut() { + let mut trie = IpPortPrefixTrie::new(); + let prefix = Prefix::from("203.0.113.0/24"); + let ranges = BTreeSet::from([PortRange::new(8080, 8090).unwrap()]); + trie.insert(prefix, TestValue::Ranges(ranges)); + + // Modify the value + if let Some(value) = trie.get_mut(prefix) { + *value = TestValue::AnyPort; + } + + // Should now match with any port + let result = trie.lookup(&"203.0.113.5".parse().unwrap(), Some(9999)); + assert!(result.is_some()); + let (_, matched_value) = result.unwrap(); + assert!(matches!(matched_value, TestValue::AnyPort)); + } + + #[test] + fn test_ipv6_lookup() { + let mut trie = IpPortPrefixTrie::new(); + let prefix = Prefix::from("2001:db8::/32"); + trie.insert(prefix, TestValue::AnyPort); + + let result = trie.lookup(&"2001:db8::1".parse().unwrap(), None); + assert!(result.is_some()); + let (matched_prefix, _) = result.unwrap(); + assert_eq!(matched_prefix, prefix); + + let result = trie.lookup(&"2001:db9::1".parse().unwrap(), None); + assert!(result.is_none()); + } + + #[test] + fn test_covers_all_ports() { + let any_port = TestValue::AnyPort; + assert!(any_port.covers_all_ports()); + + let mut ranges = BTreeSet::new(); + ranges.insert(PortRange::new(0, 32767).unwrap()); + ranges.insert(PortRange::new(32768, 65535).unwrap()); + let full_range = TestValue::Ranges(ranges); + assert!(full_range.covers_all_ports()); + + let partial_ranges = BTreeSet::from([PortRange::new(80, 443).unwrap()]); + let partial_range = TestValue::Ranges(partial_ranges); + assert!(!partial_range.covers_all_ports()); + } + + #[test] + fn test_covers_port() { + let any_port = TestValue::AnyPort; + assert!(any_port.covers_port(80)); + assert!(any_port.covers_port(65535)); + + let mut ranges = BTreeSet::new(); + ranges.insert(PortRange::new(80, 80).unwrap()); + ranges.insert(PortRange::new(443, 443).unwrap()); + let specific_ports = TestValue::Ranges(ranges); + assert!(specific_ports.covers_port(80)); + assert!(specific_ports.covers_port(443)); + assert!(!specific_ports.covers_port(8080)); + } +} diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 0058dfde9..b9fdf3822 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -120,3 +120,338 @@ fn format_packet_addrs_ports( ports.map_or(String::new(), |p| format!(":{}", p.1)) ) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::filter_rw::FlowFilterTableWriter; + use crate::tables::OptionalPortRange; + use lpm::prefix::Prefix; + use net::buffer::TestBuffer; + use net::headers::{Net, TryHeadersMut, TryIpMut}; + use net::ipv4::addr::UnicastIpv4Addr; + use net::ipv6::addr::UnicastIpv6Addr; + use net::packet::test_utils::{build_test_ipv4_packet, build_test_ipv6_packet}; + use net::packet::{DoneReason, Packet, VpcDiscriminant}; + use net::vxlan::Vni; + + fn vpcd(vni: u32) -> VpcDiscriminant { + VpcDiscriminant::VNI(Vni::new_checked(vni).unwrap()) + } + + fn set_src_addr(packet: &mut Packet, addr: IpAddr) { + let net = packet.headers_mut().try_ip_mut().unwrap(); + match net { + Net::Ipv4(ip) => { + ip.set_source(UnicastIpv4Addr::try_from(addr).unwrap()); + } + Net::Ipv6(ip) => { + ip.set_source(UnicastIpv6Addr::try_from(addr).unwrap()); + } + } + } + + fn set_dst_addr(packet: &mut Packet, addr: IpAddr) { + let net = packet.headers_mut().try_ip_mut().unwrap(); + match net { + Net::Ipv4(ip) => { + ip.set_destination(UnicastIpv4Addr::try_from(addr).unwrap().into()); + } + Net::Ipv6(ip) => { + ip.set_destination(UnicastIpv6Addr::try_from(addr).unwrap().into()); + } + } + } + + fn create_test_packet( + src_vpcd: Option, + src_addr: IpAddr, + dst_addr: IpAddr, + ) -> Packet { + let mut packet = match dst_addr { + IpAddr::V4(_) => build_test_ipv4_packet(100).unwrap(), + IpAddr::V6(_) => build_test_ipv6_packet(100).unwrap(), + }; + set_src_addr(&mut packet, src_addr); + set_dst_addr(&mut packet, dst_addr); + packet.meta.src_vpcd = src_vpcd.map(VpcDiscriminant::VNI); + packet + } + + #[test] + fn test_flow_filter_packet_allowed() { + // Setup table + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + table.insert( + src_vpcd, + dst_vpcd, + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create test packet + let packet = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "10.0.0.5".parse().unwrap(), + "20.0.0.10".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 1); + assert!(!packets[0].is_done()); + assert_eq!(packets[0].meta.dst_vpcd, Some(dst_vpcd)); + } + + #[test] + fn test_flow_filter_packet_filtered() { + // Setup table + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + table.insert( + src_vpcd, + dst_vpcd, + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create test packet with non-matching destination + let packet = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "10.0.0.5".parse().unwrap(), + "30.0.0.10".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 1); + assert_eq!(packets[0].get_done(), Some(DoneReason::Filtered)); + } + + #[test] + fn test_flow_filter_missing_src_vpcd() { + let table = FlowFilterTable::new(); + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create test packet without src_vpcd + let packet = create_test_packet( + None, + "10.0.0.5".parse().unwrap(), + "20.0.0.10".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 1); + assert_eq!(packets[0].get_done(), Some(DoneReason::Unroutable)); + } + + #[test] + fn test_flow_filter_no_matching_src_prefix() { + // Setup table + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + table.insert( + src_vpcd, + dst_vpcd, + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create test packet with non-matching source address + let packet = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "11.0.0.5".parse().unwrap(), + "20.0.0.10".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 1); + assert_eq!(packets[0].get_done(), Some(DoneReason::Filtered)); + } + + #[test] + fn test_flow_filter_multiple_matches_no_dst_vpcd() { + // Setup table with overlapping destination prefixes from different VPCs + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + + // Manually set up a scenario where dst_vpcd lookup returns MultipleMatches + // This happens when the same destination can be reached from multiple VPCs + table.insert( + src_vpcd, + vpcd(200), + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + table.insert( + src_vpcd, + vpcd(300), + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create test packet + let packet = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "10.0.0.5".parse().unwrap(), + "20.0.0.10".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 1); + assert!(!packets[0].is_done()); + assert!(packets[0].meta.dst_vpcd.is_none()); + } + + #[test] + fn test_flow_filter_ipv6() { + // Setup table + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + table.insert( + src_vpcd, + dst_vpcd, + Prefix::from("2001:db8::/32"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("2001:db9::/32"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create test packet + let packet = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "2001:db8::1".parse().unwrap(), + "2001:db9::1".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 1); + assert!(!packets[0].is_done()); + assert_eq!(packets[0].meta.dst_vpcd, Some(dst_vpcd)); + } + + #[test] + fn test_flow_filter_batch_processing() { + // Setup table + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + table.insert( + src_vpcd, + dst_vpcd, + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let mut writer = FlowFilterTableWriter::new(); + writer.update_flow_filter_table(table); + + let mut flow_filter = FlowFilter::new("test-filter", writer.get_reader()); + + // Create multiple test packets + let packet1 = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "10.0.0.5".parse().unwrap(), + "20.0.0.10".parse().unwrap(), + ); + let packet2 = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "10.0.0.6".parse().unwrap(), + "30.0.0.10".parse().unwrap(), // Should be filtered + ); + let packet3 = create_test_packet( + Some(Vni::new_checked(100).unwrap()), + "10.0.0.7".parse().unwrap(), + "20.0.0.20".parse().unwrap(), + ); + + let packets = flow_filter + .process([packet1, packet2, packet3].into_iter()) + .collect::>(); + + assert_eq!(packets.len(), 3); + assert!(!packets[0].is_done()); + assert_eq!(packets[0].meta.dst_vpcd, Some(dst_vpcd)); + assert_eq!(packets[1].get_done(), Some(DoneReason::Filtered)); + assert!(!packets[2].is_done()); + assert_eq!(packets[2].meta.dst_vpcd, Some(dst_vpcd)); + } + + #[test] + fn test_format_packet_addrs_ports() { + let src_addr = "10.0.0.1".parse().unwrap(); + let dst_addr = "20.0.0.2".parse().unwrap(); + + let result = format_packet_addrs_ports(&src_addr, &dst_addr, Some((8080, 443))); + assert_eq!(result, "src=10.0.0.1:8080, dst=20.0.0.2:443"); + + let result_no_ports = format_packet_addrs_ports(&src_addr, &dst_addr, None); + assert_eq!(result_no_ports, "src=10.0.0.1, dst=20.0.0.2"); + } +} diff --git a/flow-filter/src/setup.rs b/flow-filter/src/setup.rs index 1a8e413df..2143eed24 100644 --- a/flow-filter/src/setup.rs +++ b/flow-filter/src/setup.rs @@ -169,3 +169,360 @@ fn split_overlapping( ); split_prefixes } + +#[cfg(test)] +mod tests { + use super::*; + use crate::VpcdLookupResult; + use config::external::overlay::vpc::{Vpc, VpcTable}; + use config::external::overlay::vpcpeering::{VpcExpose, VpcManifest, VpcPeeringTable}; + use lpm::prefix::{PortRange, Prefix, PrefixWithPortsSize}; + use net::vxlan::Vni; + use std::collections::BTreeSet; + use std::ops::Bound; + + #[test] + fn test_split_overlapping_basic() { + // Test splitting 10.0.0.0/16 with mask 10.0.1.0/24 + let prefix_to_split = PrefixWithOptionalPorts::new(Prefix::from("10.0.0.0/16"), None); + let mask_prefix = PrefixWithOptionalPorts::new(Prefix::from("10.0.1.0/24"), None); + + let result: BTreeSet<_> = split_overlapping(prefix_to_split, mask_prefix) + .into_iter() + .collect(); + + // Should produce the intersection (10.0.1.0/24) and the remainder parts + assert!(!result.is_empty()); + + // Verify that one of the results is the intersection + assert!(result.contains(&mask_prefix)); + + // Verify all results together are the same size as the original prefix + let total_ips = result + .iter() + .fold(PrefixWithPortsSize::from(0u8), |sum, prefix| { + sum + prefix.size() + }); + let original_ips = prefix_to_split.size(); + assert_eq!(total_ips, original_ips); + + // Verify all results are within the original prefix + for prefix in &result { + assert!(prefix_to_split.covers(prefix)); + } + + // Verify results do not overlap + for i in &result.clone() { + for j in result.range((Bound::Excluded(i), Bound::Unbounded)) { + assert!(!i.overlaps(j)); + } + } + + // Just to be on the safe side for this test, check the list manually + let expected = BTreeSet::from([ + PrefixWithOptionalPorts::new(Prefix::from("10.0.128.0/17"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.64.0/18"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.32.0/19"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.16.0/20"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.8.0/21"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.4.0/22"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.3.0/23"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.1.0/24"), None), + PrefixWithOptionalPorts::new(Prefix::from("10.0.0.0/24"), None), + ]); + assert_eq!(result, expected); + } + + #[test] + fn test_split_overlapping_with_ports() { + // Test splitting with port ranges + let port_range1 = PortRange::new(80, 443).unwrap(); + let port_range2 = PortRange::new(100, 200).unwrap(); + + let prefix_to_split = + PrefixWithOptionalPorts::new(Prefix::from("192.168.0.0/16"), Some(port_range1)); + let mask_prefix = + PrefixWithOptionalPorts::new(Prefix::from("192.168.1.0/24"), Some(port_range2)); + + let result: BTreeSet<_> = split_overlapping(prefix_to_split, mask_prefix) + .into_iter() + .collect(); + + // Should produce multiple prefixes including the intersection + assert!(!result.is_empty()); + + // The intersection should have the intersection of both IP prefix and port range + let intersection = prefix_to_split.intersection(&mask_prefix).unwrap(); + assert!(result.contains(&intersection)); + + // Check the list manually + let expected = BTreeSet::from([ + PrefixWithOptionalPorts::new( + Prefix::from("192.168.0.0/16"), + Some(PortRange::new(80, 99).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.0.0/16"), + Some(PortRange::new(201, 443).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.128.0/17"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.64.0/18"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.32.0/19"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.16.0/20"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.8.0/21"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.4.0/22"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.3.0/23"), + Some(PortRange::new(100, 200).unwrap()), + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.1.0/24"), + Some(PortRange::new(100, 200).unwrap()), // Corresponds to the mask + ), + PrefixWithOptionalPorts::new( + Prefix::from("192.168.0.0/24"), + Some(PortRange::new(100, 200).unwrap()), + ), + ]); + assert_eq!(result, expected, "{result:#?},\n {expected:#?}"); + } + + #[test] + fn test_split_overlaps_in_src_prefixes_no_overlap() { + // Create a VPC with non-overlapping peerings + let mut vpc = Vpc::new("test-vpc", "VPC01", 100).unwrap(); + + let manifest1 = VpcManifest { + name: "remote1".to_string(), + exposes: vec![VpcExpose::empty().ip("10.0.0.0/24".into())], + }; + let manifest2 = VpcManifest { + name: "remote2".to_string(), + exposes: vec![VpcExpose::empty().ip("20.0.0.0/24".into())], + }; + + vpc.peerings.push(Peering { + name: "peering1".to_string(), + local: VpcManifest { + name: "local1".to_string(), + exposes: vec![VpcExpose::empty().ip("192.168.1.0/24".into())], + }, + remote: manifest1, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + vpc.peerings.push(Peering { + name: "peering2".to_string(), + local: VpcManifest { + name: "local2".to_string(), + exposes: vec![VpcExpose::empty().ip("192.168.2.0/24".into())], + }, + remote: manifest2, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + let mut expected = vpc.clone(); + expected.peerings.sort_by_key(|p| p.name.clone()); + + let mut result = split_overlaps_in_src_prefixes(&mut vpc); + result.peerings.sort_by_key(|p| p.name.clone()); + + // No overlaps, so peerings should remain the same + assert_eq!(result, expected); + } + + #[test] + fn test_split_overlaps_in_src_prefixes_with_overlap() { + // Create a VPC with overlapping peerings + let mut vpc = Vpc::new("test-vpc", "VPC01", 100).unwrap(); + + let manifest1 = VpcManifest { + name: "remote1".to_string(), + exposes: vec![VpcExpose::empty().ip("10.0.0.0/24".into())], + }; + let manifest2 = VpcManifest { + name: "remote2".to_string(), + exposes: vec![VpcExpose::empty().ip("20.0.0.0/24".into())], + }; + + // These two local prefixes overlap: 192.168.0.0/16 contains 192.168.1.0/24 + vpc.peerings.push(Peering { + name: "peering1".to_string(), + local: VpcManifest { + name: "local1".to_string(), + exposes: vec![VpcExpose::empty().ip("192.168.0.0/16".into())], + }, + remote: manifest1, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + vpc.peerings.push(Peering { + name: "peering2".to_string(), + local: VpcManifest { + name: "local2".to_string(), + exposes: vec![VpcExpose::empty().ip("192.168.1.0/24".into())], + }, + remote: manifest2, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + let mut result = split_overlaps_in_src_prefixes(&mut vpc); + assert_eq!(result.peerings.len(), 2); + result.peerings.sort_by_key(|p| p.name.clone()); + + // The broader prefix (192.168.0.0/16) should be split into multiple parts + // to avoid overlap with the more specific prefix (192.168.1.0/24) + let peering1_prefixes = &result.peerings[0].local.exposes[0].ips; + + let expected = BTreeSet::from([ + PrefixWithOptionalPorts::new(Prefix::from("192.168.128.0/17"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.64.0/18"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.32.0/19"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.16.0/20"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.8.0/21"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.4.0/22"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.2.0/23"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.1.0/24"), None), + PrefixWithOptionalPorts::new(Prefix::from("192.168.0.0/24"), None), + ]); + assert_eq!(peering1_prefixes, &expected); + + let peering2_prefixes = &result.peerings[1].local.exposes[0].ips; + let expected = BTreeSet::from([PrefixWithOptionalPorts::new( + Prefix::from("192.168.1.0/24"), + None, + )]); + assert_eq!(peering2_prefixes, &expected); + } + + #[test] + fn test_clone_skipping_peerings() { + let mut vpc = Vpc::new("test-vpc", "VPC01", 100).unwrap(); + + vpc.peerings.push(Peering { + name: "peering1".to_string(), + local: VpcManifest { + name: "local1".to_string(), + exposes: vec![], + }, + remote: VpcManifest { + name: "remote1".to_string(), + exposes: vec![], + }, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + let cloned = clone_skipping_peerings(&vpc); + + assert_eq!(cloned.name, vpc.name); + assert_eq!(cloned.id, vpc.id); + assert_eq!(cloned.vni, vpc.vni); + assert_eq!(cloned.peerings.len(), 0); + } + + #[test] + fn test_cleanup_vpc_table() { + let mut vpc = Vpc::new("test-vpc", "VPC01", 100).unwrap(); + + // Add a peering with some exposes + vpc.peerings.push(Peering { + name: "peering1".to_string(), + local: VpcManifest { + name: "local1".to_string(), + exposes: vec![VpcExpose::empty().ip("10.0.0.0/24".into())], + }, + remote: VpcManifest { + name: "remote1".to_string(), + exposes: vec![VpcExpose::empty().ip("20.0.0.0/24".into())], + }, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + let vpcs = vec![&vpc]; + let result = cleanup_vpc_table(vpcs); + + assert!(result.is_ok()); + let cleaned_vpcs = result.unwrap(); + assert_eq!(cleaned_vpcs.len(), 1); + assert_eq!(cleaned_vpcs[0].name, vpc.name); + } + + #[test] + fn test_build_from_overlay() { + // Create a simple overlay with two VPCs and a peering + let mut vpc_table = VpcTable::new(); + + let vni1 = Vni::new_checked(100).unwrap(); + let vni2 = Vni::new_checked(200).unwrap(); + + let mut vpc1 = Vpc::new("vpc1", "VPC01", vni1.as_u32()).unwrap(); + let vpc2 = Vpc::new("vpc2", "VPC02", vni2.as_u32()).unwrap(); + + // Add peering from vpc1 to vpc2 + vpc1.peerings.push(Peering { + name: "vpc1-to-vpc2".to_string(), + local: VpcManifest { + name: "vpc1-local".to_string(), + exposes: vec![VpcExpose::empty().ip("10.0.0.0/24".into())], + }, + remote: VpcManifest { + name: "vpc2-remote".to_string(), + exposes: vec![VpcExpose::empty().ip("20.0.0.0/24".into())], + }, + remote_id: "VPC02".try_into().unwrap(), + gwgroup: None, + adv_communities: vec![], + }); + + vpc_table.add(vpc1).unwrap(); + vpc_table.add(vpc2).unwrap(); + + let overlay = Overlay { + vpc_table, + peering_table: VpcPeeringTable::new(), + }; + + let result = FlowFilterTable::build_from_overlay(&overlay); + assert!(result.is_ok()); + + let table = result.unwrap(); + // Should be able to look up flows + let src_vpcd = VpcDiscriminant::VNI(vni1); + let src_addr = "10.0.0.5".parse().unwrap(); + let dst_addr = "20.0.0.5".parse().unwrap(); + + let (allowed, dst_vpcd) = table.contains(src_vpcd, &src_addr, &dst_addr, None); + assert!(allowed); + assert_eq!(dst_vpcd, VpcdLookupResult::Some(VpcDiscriminant::VNI(vni2))); + } +} diff --git a/flow-filter/src/tables.rs b/flow-filter/src/tables.rs index 5627ed190..d99396b5f 100644 --- a/flow-filter/src/tables.rs +++ b/flow-filter/src/tables.rs @@ -253,3 +253,314 @@ impl From for AssociatedRanges { } } } + +#[cfg(test)] +mod tests { + use super::*; + use lpm::prefix::Prefix; + use net::vxlan::Vni; + + fn vpcd(vni: u32) -> VpcDiscriminant { + VpcDiscriminant::VNI(Vni::new_checked(vni).unwrap()) + } + + #[test] + fn test_flow_filter_table_new() { + let table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let src_addr = "10.0.0.1".parse().unwrap(); + let dst_addr = "20.0.0.1".parse().unwrap(); + + let (allowed, _) = table.contains(src_vpcd, &src_addr, &dst_addr, None); + assert!(!allowed); + } + + #[test] + fn test_flow_filter_table_insert_and_contains_simple() { + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + let src_prefix = Prefix::from("10.0.0.0/24"); + let dst_prefix = Prefix::from("20.0.0.0/24"); + + table.insert( + src_vpcd, + dst_vpcd, + src_prefix, + OptionalPortRange::NoPortRangeMeansAllPorts, + dst_prefix, + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + // Should allow traffic from src to dst + let src_addr = "10.0.0.5".parse().unwrap(); + let dst_addr = "20.0.0.10".parse().unwrap(); + let (allowed, vpcd_result) = table.contains(src_vpcd, &src_addr, &dst_addr, None); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd)); + + // Should not allow traffic from different src + let wrong_src_addr = "10.1.0.5".parse().unwrap(); + let (allowed, _) = table.contains(src_vpcd, &wrong_src_addr, &dst_addr, None); + assert!(!allowed); + + // Should not allow traffic to different dst + let wrong_dst_addr = "30.0.0.10".parse().unwrap(); + let (allowed, _) = table.contains(src_vpcd, &src_addr, &wrong_dst_addr, None); + assert!(!allowed); + } + + #[test] + fn test_flow_filter_table_with_port_ranges() { + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + let src_prefix = Prefix::from("10.0.0.0/24"); + let dst_prefix = Prefix::from("20.0.0.0/24"); + let src_port_range = OptionalPortRange::Some(PortRange::new(1024, 2048).unwrap()); + let dst_port_range = OptionalPortRange::Some(PortRange::new(80, 80).unwrap()); + + table.insert( + src_vpcd, + dst_vpcd, + src_prefix, + src_port_range, + dst_prefix, + dst_port_range, + ); + + let src_addr = "10.0.0.5".parse().unwrap(); + let dst_addr = "20.0.0.10".parse().unwrap(); + + // Should allow with matching ports + let (allowed, vpcd_result) = + table.contains(src_vpcd, &src_addr, &dst_addr, Some((1500, 80))); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd)); + + // Should not allow with non-matching src port + let (allowed, _) = table.contains(src_vpcd, &src_addr, &dst_addr, Some((500, 80))); + assert!(!allowed); + + // Should not allow with non-matching dst port + let (allowed, _) = table.contains(src_vpcd, &src_addr, &dst_addr, Some((1500, 443))); + assert!(!allowed); + + // Should not allow without ports + let (allowed, _) = table.contains(src_vpcd, &src_addr, &dst_addr, None); + assert!(!allowed); + } + + #[test] + fn test_flow_filter_table_multiple_entries() { + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd1 = vpcd(200); + let dst_vpcd2 = vpcd(300); + + // Add two entries for different destination prefixes + table.insert( + src_vpcd, + dst_vpcd1, + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + table.insert( + src_vpcd, + dst_vpcd2, + Prefix::from("10.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("30.0.0.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let src_addr = "10.0.0.5".parse().unwrap(); + + // Should route to dst_vpcd1 + let (allowed, vpcd_result) = + table.contains(src_vpcd, &src_addr, &"20.0.0.10".parse().unwrap(), None); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd1)); + + // Should route to dst_vpcd2 + let (allowed, vpcd_result) = + table.contains(src_vpcd, &src_addr, &"30.0.0.10".parse().unwrap(), None); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd2)); + } + + #[test] + fn test_vpc_connections_table_lookup() { + let mut table = VpcConnectionsTable::new(); + let dst_vpcd = vpcd(200); + + let src_prefix = Prefix::from("10.0.0.0/24"); + let dst_prefix = Prefix::from("20.0.0.0/24"); + + table.insert( + dst_vpcd, + src_prefix, + OptionalPortRange::NoPortRangeMeansAllPorts, + dst_prefix, + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + // Lookup should succeed + let result = table.lookup(&"10.0.0.5".parse().unwrap(), None); + assert!(result.is_some()); + let (prefix, _) = result.unwrap(); + assert_eq!(prefix, src_prefix); + + // Lookup for non-matching address should fail + let result = table.lookup(&"11.0.0.5".parse().unwrap(), None); + assert!(result.is_none()); + } + + #[test] + fn test_vpc_connections_table_with_ports() { + let mut table = VpcConnectionsTable::new(); + let dst_vpcd = vpcd(200); + + let src_prefix = Prefix::from("10.0.0.0/24"); + let dst_prefix = Prefix::from("20.0.0.0/24"); + let src_port_range = OptionalPortRange::Some(PortRange::new(8080, 8090).unwrap()); + let dst_port_range = OptionalPortRange::NoPortRangeMeansAllPorts; + + table.insert( + dst_vpcd, + src_prefix, + src_port_range, + dst_prefix, + dst_port_range, + ); + + // Lookup with matching port + let result = table.lookup(&"10.0.0.5".parse().unwrap(), Some(8085)); + assert!(result.is_some()); + + // Lookup with non-matching port + let result = table.lookup(&"10.0.0.5".parse().unwrap(), Some(9000)); + assert!(result.is_none()); + } + + #[test] + fn test_optional_port_range_from() { + let from_some = OptionalPortRange::from(Some(PortRange::new(80, 80).unwrap())); + assert!(matches!(from_some, OptionalPortRange::Some(_))); + + let from_none = OptionalPortRange::from(None); + assert!(matches!( + from_none, + OptionalPortRange::NoPortRangeMeansAllPorts + )); + } + + #[test] + fn test_associated_ranges_from_optional_port_range() { + let any_port = AssociatedRanges::from(OptionalPortRange::NoPortRangeMeansAllPorts); + assert!(matches!(any_port, AssociatedRanges::AnyPort)); + + let with_range = + AssociatedRanges::from(OptionalPortRange::Some(PortRange::new(80, 443).unwrap())); + match with_range { + AssociatedRanges::Ranges(ranges) => { + assert_eq!(ranges.len(), 1); + assert_eq!(*ranges.first().unwrap(), PortRange::new(80, 443).unwrap()); + } + _ => panic!("Expected Ranges variant"), + } + } + + #[test] + fn test_flow_filter_table_ipv6() { + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd = vpcd(200); + + let src_prefix = Prefix::from("2001:db8::/32"); + let dst_prefix = Prefix::from("2001:db9::/32"); + + table.insert( + src_vpcd, + dst_vpcd, + src_prefix, + OptionalPortRange::NoPortRangeMeansAllPorts, + dst_prefix, + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + let src_addr = "2001:db8::1".parse().unwrap(); + let dst_addr = "2001:db9::1".parse().unwrap(); + let (allowed, vpcd_result) = table.contains(src_vpcd, &src_addr, &dst_addr, None); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd)); + } + + #[test] + fn test_flow_filter_table_longest_prefix_match() { + let mut table = FlowFilterTable::new(); + let src_vpcd = vpcd(100); + let dst_vpcd1 = vpcd(200); + let dst_vpcd2 = vpcd(300); + + // Insert broader prefix + table.insert( + src_vpcd, + dst_vpcd1, + Prefix::from("10.0.0.0/16"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.0.0/16"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + // Insert more specific prefix + table.insert( + src_vpcd, + dst_vpcd2, + Prefix::from("10.0.1.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + Prefix::from("20.0.1.0/24"), + OptionalPortRange::NoPortRangeMeansAllPorts, + ); + + // Should match the more specific prefix for source + let (allowed, vpcd_result) = table.contains( + src_vpcd, + &"10.0.1.5".parse().unwrap(), + &"20.0.1.10".parse().unwrap(), + None, + ); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd2)); + + // Should match the broader prefix for source + let (allowed, vpcd_result) = table.contains( + src_vpcd, + &"10.0.2.5".parse().unwrap(), + &"20.0.2.10".parse().unwrap(), + None, + ); + assert!(allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::Some(d) if d == dst_vpcd1)); + } + + #[test] + fn test_flow_filter_table_no_src_vpcd() { + let table = FlowFilterTable::new(); + let src_vpcd = vpcd(999); // Non-existent VPC + + let (allowed, vpcd_result) = table.contains( + src_vpcd, + &"10.0.0.1".parse().unwrap(), + &"20.0.0.1".parse().unwrap(), + None, + ); + assert!(!allowed); + assert!(matches!(vpcd_result, VpcdLookupResult::None)); + } +}