diff --git a/Cargo.toml b/Cargo.toml index 3feea5f..73bab41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ serde={version="1.0", features=["derive"], optional = true} itertools = {version = "0.11.0", optional = true} ipnet = {version="2.7", optional = true} bitflags = {version="2.3.3", features = ["serde"], optional = true} +thiserror = {version = "1.0.44", optional = true} ####################### # Parser dependencies # @@ -54,6 +55,7 @@ models = [ "ipnet", "itertools", "bitflags", + "thiserror", ] parser = [ "bytes", diff --git a/examples/cache_reading.rs b/examples/cache_reading.rs index 85c7023..a9bfe5e 100644 --- a/examples/cache_reading.rs +++ b/examples/cache_reading.rs @@ -15,7 +15,10 @@ fn main() { let parser = BgpkitParser::new_cached(item.url.as_str(), "/tmp/bgpkit-cache-example/").unwrap(); // iterating through the parser. the iterator returns `BgpElem` one at a time. - let elems = parser.into_elem_iter().collect::>(); + let elems = parser + .into_elem_iter() + .filter_map(Result::ok) + .collect::>(); log::info!("{} {} {}", item.collector_id, item.url, elems.len()); } } diff --git a/examples/deprecated_attributes.rs b/examples/deprecated_attributes.rs index c1d0e6d..04fbae1 100644 --- a/examples/deprecated_attributes.rs +++ b/examples/deprecated_attributes.rs @@ -9,6 +9,7 @@ fn main() { ) .unwrap() { + let elem = elem.unwrap(); if elem.deprecated.is_some() { println!( "{}", diff --git a/examples/display_elems.rs b/examples/display_elems.rs index 67106d1..7b9ee47 100644 --- a/examples/display_elems.rs +++ b/examples/display_elems.rs @@ -4,6 +4,7 @@ fn main() { let url = "http://archive.routeviews.org/bgpdata/\ 2021.10/UPDATES/updates.20211001.0000.bz2"; for elem in BgpkitParser::new(url).unwrap() { + let elem = elem.unwrap(); println!( "{:?}|{:?}|{:?}|{:?}|{:?}", elem.elem_type, elem.timestamp, elem.prefix, elem.as_path, elem.next_hop, diff --git a/examples/extended_communities.rs b/examples/extended_communities.rs index 11f0c79..d249264 100644 --- a/examples/extended_communities.rs +++ b/examples/extended_communities.rs @@ -16,6 +16,7 @@ fn main() { log::info!("parsing updates file"); // iterating through the parser. the iterator returns `BgpElem` one at a time. for elem in parser { + let elem = elem.unwrap(); if let Some(cs) = &elem.communities { for c in cs { match c { diff --git a/examples/filters.rs b/examples/filters.rs index 7c13623..eee3a72 100644 --- a/examples/filters.rs +++ b/examples/filters.rs @@ -1,3 +1,4 @@ +use bgpkit_parser::filter::Filter; use bgpkit_parser::BgpkitParser; /// This example shows how to parse a MRT file and filter by prefix. @@ -27,13 +28,12 @@ fn main() { "http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2", ) .unwrap() - .add_filter("prefix", "211.98.251.0/24") - .unwrap(); + .add_filter(Filter::prefix("211.98.251.0/24").unwrap()); log::info!("parsing updates file"); // iterating through the parser. the iterator returns `BgpElem` one at a time. for elem in parser { - log::info!("{}", &elem); + log::info!("{}", elem.unwrap()); } log::info!("done"); } diff --git a/examples/find_as_set_messages.rs b/examples/find_as_set_messages.rs index 79cc732..611a808 100644 --- a/examples/find_as_set_messages.rs +++ b/examples/find_as_set_messages.rs @@ -19,6 +19,7 @@ fn main() { let collector = item.collector_id.clone(); let mut origins: HashSet = HashSet::new(); for elem in parser { + let elem = elem.unwrap(); if !elem.elem_type.is_announce() { continue; } diff --git a/examples/only-to-customer.rs b/examples/only-to-customer.rs index a2e1090..d11e0dd 100644 --- a/examples/only-to-customer.rs +++ b/examples/only-to-customer.rs @@ -6,6 +6,7 @@ fn main() { ) .unwrap() { + let elem = elem.unwrap(); if let Some(otc) = elem.only_to_customer { println!( "OTC found: {} for path {}\n{}\n", diff --git a/examples/parse-files-from-broker.rs b/examples/parse-files-from-broker.rs index d339851..297f3f5 100644 --- a/examples/parse-files-from-broker.rs +++ b/examples/parse-files-from-broker.rs @@ -19,6 +19,7 @@ fn main() { // iterating through the parser. the iterator returns `BgpElem` one at a time. let elems = parser .into_elem_iter() + .map(Result::unwrap) .filter_map(|elem| { if let Some(origins) = &elem.origin_asns { if origins.contains(&13335.into()) { diff --git a/examples/parse-single-file.rs b/examples/parse-single-file.rs index e4b6994..db7acc1 100644 --- a/examples/parse-single-file.rs +++ b/examples/parse-single-file.rs @@ -15,7 +15,7 @@ fn main() { log::info!("parsing updates file"); // iterating through the parser. the iterator returns `BgpElem` one at a time. for elem in parser { - log::info!("{}", &elem); + log::info!("{}", elem.unwrap()); } log::info!("done"); } diff --git a/examples/peer_index_table.rs b/examples/peer_index_table.rs index e6ae29d..a97ef3a 100644 --- a/examples/peer_index_table.rs +++ b/examples/peer_index_table.rs @@ -8,6 +8,6 @@ fn main() { let url = "https://data.ris.ripe.net/rrc03/2021.11/bview.20211128.1600.gz"; let parser = bgpkit_parser::BgpkitParser::new(url).unwrap(); for record in parser.into_record_iter().take(1) { - println!("{}", to_string_pretty(&json!(record)).unwrap()); + println!("{}", to_string_pretty(&json!(record.unwrap())).unwrap()); } } diff --git a/examples/records_iter.rs b/examples/records_iter.rs index e5f9322..efd1152 100644 --- a/examples/records_iter.rs +++ b/examples/records_iter.rs @@ -6,7 +6,7 @@ fn main() { let url = "http://archive.routeviews.org/route-views.amsix/bgpdata/2023.02/UPDATES/updates.20230222.0430.bz2"; let parser = BgpkitParser::new(url).unwrap(); for record in parser.into_record_iter() { - match record.message { + match record.unwrap().message { MrtMessage::TableDumpMessage(_) => {} MrtMessage::TableDumpV2Message(_) => {} MrtMessage::Bgp4Mp(msg) => match msg { diff --git a/src/bin/main.rs b/src/bin/main.rs index 4b416fd..3839d0f 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -1,10 +1,12 @@ -use itertools::Itertools; -use serde_json::json; -use std::io::Write; +use std::fmt::Display; +use std::io; +use std::io::{stdout, BufWriter, Write}; use std::net::IpAddr; use std::path::PathBuf; -use bgpkit_parser::{BgpkitParser, Elementor}; +use bgpkit_parser::filter::Filter; +use bgpkit_parser::models::ElemType; +use bgpkit_parser::{BgpkitParser, Elementor, PrefixMatchType}; use clap::Parser; use ipnet::IpNet; @@ -104,43 +106,39 @@ fn main() { }; if let Some(v) = opts.filters.as_path { - parser = parser.add_filter("as_path", v.as_str()).unwrap(); + parser = parser.add_filter(Filter::as_path(v.as_str()).unwrap()); } if let Some(v) = opts.filters.origin_asn { - parser = parser - .add_filter("origin_asn", v.to_string().as_str()) - .unwrap(); + parser = parser.add_filter(Filter::OriginAsn(v)); } if let Some(v) = opts.filters.prefix { let filter_type = match (opts.filters.include_super, opts.filters.include_sub) { - (false, false) => "prefix", - (true, false) => "prefix_super", - (false, true) => "prefix_sub", - (true, true) => "prefix_super_sub", + (false, false) => PrefixMatchType::Exact, + (true, false) => PrefixMatchType::IncludeSuper, + (false, true) => PrefixMatchType::IncludeSub, + (true, true) => PrefixMatchType::IncludeSuperSub, }; - parser = parser - .add_filter(filter_type, v.to_string().as_str()) - .unwrap(); + parser = parser.add_filter(Filter::Prefix(v, filter_type)); } if !opts.filters.peer_ip.is_empty() { - let v = opts.filters.peer_ip.iter().map(|p| p.to_string()).join(","); - parser = parser.add_filter("peer_ips", v.as_str()).unwrap(); + parser = parser.add_filter(Filter::PeerIps(opts.filters.peer_ip.to_owned())); } if let Some(v) = opts.filters.peer_asn { - parser = parser - .add_filter("peer_asn", v.to_string().as_str()) - .unwrap(); + parser = parser.add_filter(Filter::PeerAsn(v)); } if let Some(v) = opts.filters.elem_type { - parser = parser.add_filter("type", v.as_str()).unwrap(); + let filter_type = match v.as_str() { + "w" | "withdraw" | "withdrawal" => ElemType::WITHDRAW, + "a" | "announce" | "announcement" => ElemType::ANNOUNCE, + x => panic!("cannot parse elem type from {}", x), + }; + parser = parser.add_filter(Filter::Type(filter_type)); } if let Some(v) = opts.filters.start_ts { - parser = parser - .add_filter("start_ts", v.to_string().as_str()) - .unwrap(); + parser = parser.add_filter(Filter::TsStart(v)); } if let Some(v) = opts.filters.end_ts { - parser = parser.add_filter("end_ts", v.to_string().as_str()).unwrap(); + parser = parser.add_filter(Filter::TsEnd(v)); } match (opts.elems_count, opts.records_count) { @@ -148,8 +146,13 @@ fn main() { let mut elementor = Elementor::new(); let (mut records_count, mut elems_count) = (0, 0); for record in parser.into_record_iter() { - records_count += 1; - elems_count += elementor.record_to_elems(record).len(); + match record { + Ok(record) => { + records_count += 1; + elems_count += elementor.record_to_elems(record).len(); + } + Err(err) => handle_non_fatal_error(&mut stdout(), err), + } } println!("total records: {}", records_count); println!("total elems: {}", elems_count); @@ -158,28 +161,71 @@ fn main() { println!("total records: {}", parser.into_record_iter().count()); } (true, false) => { - println!("total records: {}", parser.into_elem_iter().count()); + println!("total elems: {}", parser.into_elem_iter().count()); } (false, false) => { - let mut stdout = std::io::stdout(); + let mut stdout = BufWriter::new(stdout().lock()); + for elem in parser { - let output_str = if opts.json { - let val = json!(elem); - if opts.pretty { - serde_json::to_string_pretty(&val).unwrap() - } else { - val.to_string() + match elem { + Ok(elem) => { + if opts.json { + let res = if opts.pretty { + serde_json::to_writer_pretty(&mut stdout, &elem) + } else { + serde_json::to_writer(&mut stdout, &elem) + }; + + handle_serde_json_result(&mut stdout, res); + } else { + let res = writeln!(stdout, "{}", elem); + handle_io_result(&mut stdout, res); + } } - } else { - elem.to_string() - }; - if let Err(e) = writeln!(stdout, "{}", &output_str) { - if e.kind() != std::io::ErrorKind::BrokenPipe { - eprintln!("{}", e); + Err(err) => { + let res = stdout.flush(); + handle_io_result(&mut stdout, res); + eprintln!("{}", err); } - std::process::exit(1); } } } } } + +fn handle_serde_json_result(stdout: &mut W, res: serde_json::Result<()>) { + if let Err(err) = res { + if err.is_io() { + // If it was an IO error, we likely wont be able to flush stdout + eprintln!("{}", err); + std::process::exit(1); + } + + handle_non_fatal_error(stdout, err); + } +} + +fn handle_non_fatal_error(stdout: &mut W, err: E) { + // Attempt to flush stdout before printing the error to avoid mangling combined CLI output + if let Err(flush_err) = stdout.flush() { + eprintln!("{}", err); + eprintln!("{}", flush_err); + std::process::exit(1); + } + + // Write the error to stderr then flush stderr to avoid mangling combined CLI output + eprintln!("{}", err); + if io::stderr().flush().is_err() { + // If this fails, then we are out of options for logging errors + std::process::exit(1); + } +} + +fn handle_io_result(stdout: &mut W, res: io::Result<()>) { + if let Err(err) = res { + // We can try flushing stdout, but it will almost certainly fail + let _ = stdout.flush(); + eprintln!("{}", err); + std::process::exit(1); + } +} diff --git a/src/error.rs b/src/error.rs index 6e6eacb..c80c33d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,122 +1,74 @@ /*! error module defines the error types used in bgpkit-parser. */ -use crate::models::{Afi, Bgp4MpType, BgpState, EntryType, Safi, TableDumpV2Type}; -use num_enum::TryFromPrimitiveError; -use oneio::OneIoError; -use std::fmt::{Display, Formatter}; -use std::io::ErrorKind; -use std::{error::Error, fmt, io}; +use crate::models::{AttrType, EntryType}; +use num_enum::{TryFromPrimitive, TryFromPrimitiveError}; +use std::io; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum ParserError { - IoError(io::Error), - IoNotEnoughBytes(), - EofError(io::Error), - OneIoError(OneIoError), - EofExpected, - ParseError(String), - TruncatedMsg(String), - Unsupported(String), - FilterError(String), + /// This error represents a [num_enum::TryFromPrimitiveError] error for any of a number of + /// different types. + /// + /// ## Occurs during: + /// - Parsing of an MRT message body + #[error("unrecognized value {value} for {type_name}")] + UnrecognizedEnumVariant { type_name: &'static str, value: u64 }, + /// Indicates that the MRT message header type could not be determined while parsing a MRT + /// header. + /// + /// ## Occurs during: + /// - Parsing of an MRT message header + #[error("unrecognized type {0} in MRT header")] + UnrecognizedMrtType(u16), + /// This error represents a [ipnet::PrefixLenError] error. It occurs if an address mask is + /// larger than the length of the address it is being applied to. + /// + /// ## Occurs during: + /// - Reading network prefixes (parsing of an MRT message body) + #[error("invalid network prefix mask")] + InvalidPrefixLength(#[from] ipnet::PrefixLenError), + /// A general IO error triggered by the internal reader. + /// + /// ## Occurs during: + /// - Reading of an MRT record header + /// - Buffering of an MRT record body before parsing + #[error(transparent)] + IoError(#[from] io::Error), + #[error("unable to parse unsupported MRT type {mrt_type:?} subtype {subtype}")] + UnsupportedMrtType { mrt_type: EntryType, subtype: u16 }, + #[error("unable to parse unsupported attribute type {0:?}")] + UnsupportedAttributeType(AttrType), + /// Indicates internal length inconsistencies within an MRT message. This includes fixed-length + /// and length-prefixed data requiring more or less space than is available within the enclosing + /// container. + #[error( + "encountered truncated value during {name}; expected {expected} bytes, but found {found}" + )] + InconsistentFieldLength { + name: &'static str, + expected: usize, + found: usize, + }, + #[error("invalid BGP message length {0} (expected 19 <= length <= 4096)")] + InvalidBgpMessageLength(u16), + #[error("invalid length {0} for MP_NEXT_HOP")] + InvalidNextHopLength(usize), + #[error("invalid length {0} for AGGREGATOR attribute (should be 6 or 8)")] + InvalidAggregatorAttrLength(usize), } -impl Error for ParserError {} - -#[derive(Debug)] -pub struct ParserErrorWithBytes { - pub error: ParserError, - pub bytes: Option>, -} - -impl Display for ParserErrorWithBytes { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.error) - } -} - -impl Error for ParserErrorWithBytes {} - -/// implement Display trait for Error which satistifies the std::error::Error -/// trait's requirement (must implement Display and Debug traits, Debug already derived) -impl Display for ParserError { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - ParserError::IoError(e) => write!(f, "Error: {}", e), - ParserError::EofError(e) => write!(f, "Error: {}", e), - ParserError::ParseError(s) => write!(f, "Error: {}", s), - ParserError::TruncatedMsg(s) => write!(f, "Error: {}", s), - ParserError::Unsupported(s) => write!(f, "Error: {}", s), - ParserError::EofExpected => write!(f, "Error: reach end of file"), - ParserError::OneIoError(e) => write!(f, "Error: {}", e), - ParserError::FilterError(e) => write!(f, "Error: {}", e), - ParserError::IoNotEnoughBytes() => write!(f, "Error: Not enough bytes to read"), - } - } -} - -impl From for ParserErrorWithBytes { - fn from(error: OneIoError) -> Self { - ParserErrorWithBytes { - error: ParserError::OneIoError(error), - bytes: None, +impl From> for ParserError +where + T: TryFromPrimitive, + T::Primitive: Into, +{ + #[inline] + fn from(value: TryFromPrimitiveError) -> Self { + ParserError::UnrecognizedEnumVariant { + type_name: T::NAME, + value: value.number.into(), } } } - -impl From for ParserError { - fn from(error: OneIoError) -> Self { - ParserError::OneIoError(error) - } -} - -impl From for ParserErrorWithBytes { - fn from(error: ParserError) -> Self { - ParserErrorWithBytes { error, bytes: None } - } -} - -impl From for ParserError { - fn from(io_error: io::Error) -> Self { - match io_error.kind() { - ErrorKind::UnexpectedEof => ParserError::EofError(io_error), - _ => ParserError::IoError(io_error), - } - } -} - -impl From> for ParserError { - fn from(value: TryFromPrimitiveError) -> Self { - ParserError::ParseError(format!("cannot parse bgp4mp subtype: {}", value.number)) - } -} - -impl From> for ParserError { - fn from(value: TryFromPrimitiveError) -> Self { - ParserError::ParseError(format!("cannot parse bgp4mp state: {}", value.number)) - } -} - -impl From> for ParserError { - fn from(value: TryFromPrimitiveError) -> Self { - ParserError::ParseError(format!("cannot parse table dump v2 type: {}", value.number)) - } -} - -impl From> for ParserError { - fn from(value: TryFromPrimitiveError) -> Self { - ParserError::ParseError(format!("cannot parse entry type: {}", value.number)) - } -} - -impl From> for ParserError { - fn from(value: TryFromPrimitiveError) -> Self { - ParserError::ParseError(format!("Unknown AFI type: {}", value.number)) - } -} - -impl From> for ParserError { - fn from(value: TryFromPrimitiveError) -> Self { - ParserError::ParseError(format!("Unknown SAFI type: {}", value.number)) - } -} diff --git a/src/lib.rs b/src/lib.rs index 6d10ee6..856f3fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ Here is an example that does so. use bgpkit_parser::BgpkitParser; let parser = BgpkitParser::new("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2").unwrap(); for elem in parser { - println!("{}", elem) + println!("{}", elem.unwrap()) } ``` @@ -72,6 +72,7 @@ for item in broker.into_iter().take(2) { // iterating through the parser. the iterator returns `BgpElem` one at a time. let elems = parser .into_elem_iter() + .filter_map(Result::ok) .filter_map(|elem| { if let Some(origins) = &elem.origin_asns { if origins.contains(&13335.into()) { @@ -100,6 +101,7 @@ For all types of filters, check out the [Filter][filter] enum documentation. ```no_run use bgpkit_parser::BgpkitParser; +use bgpkit_parser::filter::Filter; /// This example shows how to parse a MRT file and filter by prefix. env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -108,12 +110,12 @@ log::info!("downloading updates file"); // create a parser that takes the buffered reader let parser = BgpkitParser::new("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2").unwrap() - .add_filter("prefix", "211.98.251.0/24").unwrap(); + .add_filter(Filter::prefix("211.98.251.0/24").unwrap()); log::info!("parsing updates file"); // iterating through the parser. the iterator returns `BgpElem` one at a time. for elem in parser { - log::info!("{}", &elem); + log::info!("{}", elem.unwrap()); } log::info!("done"); ``` diff --git a/src/models/err.rs b/src/models/err.rs deleted file mode 100644 index 2a4bb49..0000000 --- a/src/models/err.rs +++ /dev/null @@ -1,26 +0,0 @@ -use ipnet::AddrParseError; -use std::error::Error; -use std::fmt::{Display, Formatter}; - -#[derive(Debug)] -pub enum BgpModelsError { - PrefixParsingError(String), -} - -impl Display for BgpModelsError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - BgpModelsError::PrefixParsingError(msg) => { - write!(f, "cannot convert str to IP prefix: {}", msg) - } - } - } -} - -impl Error for BgpModelsError {} - -impl From for BgpModelsError { - fn from(err: AddrParseError) -> Self { - BgpModelsError::PrefixParsingError(err.to_string()) - } -} diff --git a/src/models/mod.rs b/src/models/mod.rs index 628fc99..9159e99 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -52,11 +52,9 @@ RFCs. Here is a list of them: */ mod bgp; -mod err; mod mrt; mod network; pub use bgp::*; -pub use err::BgpModelsError; pub use mrt::*; pub use network::*; diff --git a/src/models/network/prefix.rs b/src/models/network/prefix.rs index 4e44ee3..d14182c 100644 --- a/src/models/network/prefix.rs +++ b/src/models/network/prefix.rs @@ -1,5 +1,4 @@ -use crate::models::BgpModelsError; -use ipnet::IpNet; +use ipnet::{AddrParseError, IpNet}; use std::fmt::{Debug, Display, Formatter}; use std::ops::Deref; use std::str::FromStr; @@ -31,7 +30,7 @@ impl Debug for NetworkPrefix { } impl FromStr for NetworkPrefix { - type Err = BgpModelsError; + type Err = AddrParseError; fn from_str(s: &str) -> Result { let prefix = IpNet::from_str(s)?; diff --git a/src/parser/bgp/attributes/attr_01_origin.rs b/src/parser/bgp/attributes/attr_01_origin.rs index 3afbc79..9066aff 100644 --- a/src/parser/bgp/attributes/attr_01_origin.rs +++ b/src/parser/bgp/attributes/attr_01_origin.rs @@ -5,12 +5,8 @@ use bytes::Bytes; use std::convert::TryFrom; pub fn parse_origin(mut input: Bytes) -> Result { - match Origin::try_from(input.read_u8()?) { - Ok(v) => Ok(AttributeValue::Origin(v)), - Err(_) => Err(ParserError::ParseError( - "Failed to parse attribute type: origin".to_string(), - )), - } + input.expect_remaining_eq(1, "ORIGIN")?; + Ok(AttributeValue::Origin(Origin::try_from(input.read_u8()?)?)) } #[cfg(test)] @@ -52,7 +48,7 @@ mod tests { ); assert!(matches!( parse_origin(Bytes::from_static(&[3u8])).unwrap_err(), - ParserError::ParseError(_) + ParserError::UnrecognizedEnumVariant { .. } )); } } diff --git a/src/parser/bgp/attributes/attr_02_17_as_path.rs b/src/parser/bgp/attributes/attr_02_17_as_path.rs index 3df804e..0f31413 100644 --- a/src/parser/bgp/attributes/attr_02_17_as_path.rs +++ b/src/parser/bgp/attributes/attr_02_17_as_path.rs @@ -1,19 +1,25 @@ use crate::models::*; use crate::parser::ReadUtils; use crate::ParserError; -use bytes::{Buf, Bytes}; +use bytes::Bytes; +use num_enum::TryFromPrimitive; -const AS_PATH_AS_SET: u8 = 1; -const AS_PATH_AS_SEQUENCE: u8 = 2; -// https://datatracker.ietf.org/doc/html/rfc5065 -const AS_PATH_CONFED_SEQUENCE: u8 = 3; -const AS_PATH_CONFED_SET: u8 = 4; +#[allow(non_camel_case_types)] +#[derive(Debug, TryFromPrimitive)] +#[repr(u8)] +enum AsSegmentType { + AS_PATH_AS_SET = 1, + AS_PATH_AS_SEQUENCE = 2, + // https://datatracker.ietf.org/doc/html/rfc5065 + AS_PATH_CONFED_SEQUENCE = 3, + AS_PATH_CONFED_SET = 4, +} pub fn parse_as_path(mut input: Bytes, asn_len: &AsnLength) -> Result { let mut output = AsPath { segments: Vec::with_capacity(5), }; - while input.remaining() > 0 { + while !input.is_empty() { let segment = parse_as_path_segment(&mut input, asn_len)?; output.append_segment(segment); } @@ -25,18 +31,14 @@ fn parse_as_path_segment( input: &mut Bytes, asn_len: &AsnLength, ) -> Result { - let segment_type = input.read_u8()?; + let segment_type = AsSegmentType::try_from(input.read_u8()?)?; let count = input.read_u8()? as usize; let path = input.read_asns(asn_len, count)?; match segment_type { - AS_PATH_AS_SET => Ok(AsPathSegment::AsSet(path)), - AS_PATH_AS_SEQUENCE => Ok(AsPathSegment::AsSequence(path)), - AS_PATH_CONFED_SEQUENCE => Ok(AsPathSegment::ConfedSequence(path)), - AS_PATH_CONFED_SET => Ok(AsPathSegment::ConfedSet(path)), - _ => Err(ParserError::ParseError(format!( - "Invalid AS path segment type: {}", - segment_type - ))), + AsSegmentType::AS_PATH_AS_SET => Ok(AsPathSegment::AsSet(path)), + AsSegmentType::AS_PATH_AS_SEQUENCE => Ok(AsPathSegment::AsSequence(path)), + AsSegmentType::AS_PATH_CONFED_SEQUENCE => Ok(AsPathSegment::ConfedSequence(path)), + AsSegmentType::AS_PATH_CONFED_SET => Ok(AsPathSegment::ConfedSet(path)), } } @@ -145,6 +147,6 @@ mod tests { 0, 1, ]); let res = parse_as_path_segment(&mut data, &AsnLength::Bits16).unwrap_err(); - assert!(matches!(res, ParserError::ParseError(_))); + assert!(matches!(res, ParserError::UnrecognizedEnumVariant { .. })); } } diff --git a/src/parser/bgp/attributes/attr_03_next_hop.rs b/src/parser/bgp/attributes/attr_03_next_hop.rs index eaf385b..55eddd2 100644 --- a/src/parser/bgp/attributes/attr_03_next_hop.rs +++ b/src/parser/bgp/attributes/attr_03_next_hop.rs @@ -2,34 +2,38 @@ use crate::models::*; use crate::parser::ReadUtils; use crate::ParserError; use bytes::Bytes; +use std::net::IpAddr; pub fn parse_next_hop(mut input: Bytes, afi: &Option) -> Result { - if let Some(afi) = afi { - Ok(input.read_address(afi).map(AttributeValue::NextHop)?) - } else { - Ok(input - .read_address(&Afi::Ipv4) - .map(AttributeValue::NextHop)?) + match afi.unwrap_or(Afi::Ipv4) { + Afi::Ipv4 => { + input.expect_remaining_eq(4, "NEXT_HOP")?; + Ok(input + .read_ipv4_address() + .map(IpAddr::V4) + .map(AttributeValue::NextHop)?) + } + Afi::Ipv6 => { + input.expect_remaining_eq(16, "NEXT_HOP")?; + Ok(input + .read_ipv6_address() + .map(IpAddr::V6) + .map(AttributeValue::NextHop)?) + } } } pub fn parse_mp_next_hop(mut input: Bytes) -> Result, ParserError> { - let output = match input.len() { - 0 => None, - 4 => Some(input.read_ipv4_address().map(NextHopAddress::Ipv4)?), - 16 => Some(input.read_ipv6_address().map(NextHopAddress::Ipv6)?), - 32 => Some(NextHopAddress::Ipv6LinkLocal( + match input.len() { + 0 => Ok(None), + 4 => Ok(Some(input.read_ipv4_address().map(NextHopAddress::Ipv4)?)), + 16 => Ok(Some(input.read_ipv6_address().map(NextHopAddress::Ipv6)?)), + 32 => Ok(Some(NextHopAddress::Ipv6LinkLocal( input.read_ipv6_address()?, input.read_ipv6_address()?, - )), - v => { - return Err(ParserError::ParseError(format!( - "Invalid next hop length found: {}", - v - ))); - } - }; - Ok(output) + ))), + v => Err(ParserError::InvalidNextHopLength(v)), + } } #[cfg(test)] diff --git a/src/parser/bgp/attributes/attr_04_med.rs b/src/parser/bgp/attributes/attr_04_med.rs index 2562031..cacf8bb 100644 --- a/src/parser/bgp/attributes/attr_04_med.rs +++ b/src/parser/bgp/attributes/attr_04_med.rs @@ -4,6 +4,7 @@ use crate::ParserError; use bytes::Bytes; pub fn parse_med(mut input: Bytes) -> Result { + input.expect_remaining_eq(4, "MULTI_EXIT_DISCRIMINATOR")?; Ok(AttributeValue::MultiExitDiscriminator(input.read_u32()?)) } diff --git a/src/parser/bgp/attributes/attr_05_local_pref.rs b/src/parser/bgp/attributes/attr_05_local_pref.rs index 9facc13..ef8634c 100644 --- a/src/parser/bgp/attributes/attr_05_local_pref.rs +++ b/src/parser/bgp/attributes/attr_05_local_pref.rs @@ -4,6 +4,7 @@ use crate::ParserError; use bytes::Bytes; pub fn parse_local_pref(mut input: Bytes) -> Result { + input.expect_remaining_eq(4, "LOCAL_PREFERENCE")?; Ok(AttributeValue::LocalPreference(input.read_u32()?)) } diff --git a/src/parser/bgp/attributes/attr_07_18_aggregator.rs b/src/parser/bgp/attributes/attr_07_18_aggregator.rs index 40b39d4..4a4578d 100644 --- a/src/parser/bgp/attributes/attr_07_18_aggregator.rs +++ b/src/parser/bgp/attributes/attr_07_18_aggregator.rs @@ -22,12 +22,7 @@ pub fn parse_aggregator( let asn_len_found = match input.remaining() { 8 => AsnLength::Bits32, 6 => AsnLength::Bits16, - _ => { - return Err(ParserError::ParseError(format!( - "Aggregator attribute length is invalid: found {}, should 6 or 8", - input.remaining() - ))) - } + x => return Err(ParserError::InvalidAggregatorAttrLength(x)), }; if asn_len_found != *asn_len { warn!( diff --git a/src/parser/bgp/attributes/attr_09_originator.rs b/src/parser/bgp/attributes/attr_09_originator.rs index b4a22ce..8093a7d 100644 --- a/src/parser/bgp/attributes/attr_09_originator.rs +++ b/src/parser/bgp/attributes/attr_09_originator.rs @@ -1,14 +1,10 @@ use crate::models::*; use crate::parser::ReadUtils; use crate::ParserError; -use bytes::{Buf, Bytes}; +use bytes::Bytes; pub fn parse_originator_id(mut input: Bytes) -> Result { - if input.remaining() != 4 { - return Err(ParserError::ParseError( - "ORIGINATOR_ID attribute must be 4 bytes".to_string(), - )); - } + input.expect_remaining_eq(4, "ORIGINATOR_ID")?; Ok(AttributeValue::OriginatorId(input.read_ipv4_address()?)) } diff --git a/src/parser/bgp/attributes/attr_14_15_nlri.rs b/src/parser/bgp/attributes/attr_14_15_nlri.rs index d709cce..7ba2db6 100644 --- a/src/parser/bgp/attributes/attr_14_15_nlri.rs +++ b/src/parser/bgp/attributes/attr_14_15_nlri.rs @@ -56,7 +56,7 @@ pub fn parse_nlri( let mut next_hop = None; if reachable { let next_hop_length = input.read_u8()? as usize; - input.has_n_remaining(next_hop_length)?; + input.require_n_remaining(next_hop_length, "mp next hop")?; let next_hop_bytes = input.split_to(next_hop_length); next_hop = match parse_mp_next_hop(next_hop_bytes) { Ok(x) => x, diff --git a/src/parser/bgp/attributes/attr_32_large_communities.rs b/src/parser/bgp/attributes/attr_32_large_communities.rs index 68cbfed..3142f20 100644 --- a/src/parser/bgp/attributes/attr_32_large_communities.rs +++ b/src/parser/bgp/attributes/attr_32_large_communities.rs @@ -6,9 +6,9 @@ use bytes::{Buf, Bytes}; pub fn parse_large_communities(mut input: Bytes) -> Result { let mut communities = Vec::new(); while input.remaining() > 0 { - input.has_n_remaining(12)?; // 12 bytes for large community (3x 32 bits integers) - let global_administrator = input.get_u32(); - let local_data = [input.get_u32(), input.get_u32()]; + input.require_n_remaining(12, "large community")?; // 12 bytes for large community (3x 32 bits integers) + let global_administrator = input.read_u32()?; + let local_data = [input.read_u32()?, input.read_u32()?]; communities.push(LargeCommunity::new(global_administrator, local_data)); } Ok(AttributeValue::LargeCommunities(communities)) diff --git a/src/parser/bgp/attributes/attr_35_otc.rs b/src/parser/bgp/attributes/attr_35_otc.rs index 2dfc89c..3d10b0d 100644 --- a/src/parser/bgp/attributes/attr_35_otc.rs +++ b/src/parser/bgp/attributes/attr_35_otc.rs @@ -20,6 +20,7 @@ use bytes::Bytes; /// 2. If a route already contains the OTC Attribute, it MUST NOT be propagated to Providers, Peers, or RSes. /// ``` pub fn parse_only_to_customer(mut input: Bytes) -> Result { + input.expect_remaining_eq(4, "ONLY_TO_CUSTOMER")?; let remote_asn = input.read_u32()?; Ok(AttributeValue::OnlyToCustomer(Asn::new_32bit(remote_asn))) } diff --git a/src/parser/bgp/attributes/mod.rs b/src/parser/bgp/attributes/mod.rs index e4e35d6..407ab42 100644 --- a/src/parser/bgp/attributes/mod.rs +++ b/src/parser/bgp/attributes/mod.rs @@ -65,11 +65,11 @@ impl AttributeParser { // thus the while loop condition is set to be at least 3 bytes to read. // has content to read - let flag = AttrFlags::from_bits_retain(data.get_u8()); - let attr_type = data.get_u8(); + let flag = AttrFlags::from_bits_retain(data.read_u8()?); + let attr_type = data.read_u8()?; let attr_length = match flag.contains(AttrFlags::EXTENDED) { - false => data.get_u8() as usize, - true => data.get_u16() as usize, + false => data.read_u8()? as usize, + true => data.read_u16()? as usize, }; let mut partial = false; @@ -116,18 +116,8 @@ impl AttributeParser { t => t, }; - let bytes_left = data.remaining(); - - if data.remaining() < attr_length { - warn!( - "not enough bytes: input bytes left - {}, want to read - {}; skipping", - bytes_left, attr_length - ); - // break and return already parsed attributes - break; - } - // we know data has enough bytes to read, so we can split the bytes into a new Bytes object + data.require_n_remaining(attr_length, "Attribute")?; let mut attr_data = data.split_to(attr_length); let attr = match attr_type { @@ -189,15 +179,13 @@ impl AttributeParser { AttrType::DEVELOPMENT => { let mut value = vec![]; for _i in 0..attr_length { - value.push(attr_data.get_u8()); + value.push(attr_data.read_u8()?); } Ok(AttributeValue::Development(value)) } AttrType::ONLY_TO_CUSTOMER => parse_only_to_customer(attr_data), - _ => Err(ParserError::Unsupported(format!( - "unsupported attribute type: {:?}", - attr_type - ))), + // TODO: Should it be treated as a raw attribute instead? + _ => Err(ParserError::UnsupportedAttributeType(attr_type)), }; match attr { @@ -205,14 +193,14 @@ impl AttributeParser { assert_eq!(attr_type, value.attr_type()); attributes.push(Attribute { value, flag }); } + Err(e) if partial => { + // TODO: Is this correct? If we don't have enough bytes, split_to would panic. + // it's ok to have errors when reading partial bytes + warn!("PARTIAL: {}", e); + } Err(e) => { - if partial { - // it's ok to have errors when reading partial bytes - warn!("PARTIAL: {}", e.to_string()); - } else { - warn!("{}", e.to_string()); - } - continue; + warn!("{}", e); + return Err(e); } }; } diff --git a/src/parser/bgp/messages.rs b/src/parser/bgp/messages.rs index 3ff46f6..2951f63 100644 --- a/src/parser/bgp/messages.rs +++ b/src/parser/bgp/messages.rs @@ -32,7 +32,7 @@ pub fn parse_bgp_message( asn_len: &AsnLength, ) -> Result { let total_size = data.len(); - data.has_n_remaining(19)?; + data.require_n_remaining(19, "BGP message marker")?; data.advance(16); /* This 2-octet unsigned integer indicates the total length of the @@ -45,30 +45,22 @@ pub fn parse_bgp_message( have the smallest value required, given the rest of the message. */ - let length = data.get_u16(); + let length = data.read_u16()?; if !(19..=4096).contains(&length) { - return Err(ParserError::ParseError(format!( - "invalid BGP message length {}", - length - ))); + return Err(ParserError::InvalidBgpMessageLength(length)); } + // TODO: Why do we sometimes change our length estimate? let bgp_msg_length = if (length as usize) > total_size { total_size - 19 } else { length as usize - 19 }; - let msg_type: BgpMessageType = match BgpMessageType::try_from(data.get_u8()) { - Ok(t) => t, - Err(_) => { - return Err(ParserError::ParseError( - "Unknown BGP Message Type".to_string(), - )) - } - }; + let msg_type: BgpMessageType = BgpMessageType::try_from(data.read_u8()?)?; if data.remaining() != bgp_msg_length { + // TODO: Why is this not a hard error? warn!( "BGP message length {} does not match the actual length {}", bgp_msg_length, @@ -111,16 +103,17 @@ pub fn parse_bgp_notification_message( /// /// The parsing of BGP OPEN messages also includes decoding the BGP capabilities. pub fn parse_bgp_open_message(input: &mut Bytes) -> Result { - input.has_n_remaining(10)?; - let version = input.get_u8(); - let asn = Asn::new_16bit(input.get_u16()); - let hold_time = input.get_u16(); + input.require_n_remaining(10, "BGP open message header")?; + let version = input.read_u8()?; + let asn = Asn::new_16bit(input.read_u16()?); + let hold_time = input.read_u16()?; let sender_ip = input.read_ipv4_address()?; - let opt_params_len = input.get_u8(); + let opt_params_len = input.read_u8()?; // let pos_end = input.position() + opt_params_len as u64; if input.remaining() != opt_params_len as usize { + // TODO: This seems like it should become a hard error warn!( "BGP open message length {} does not match the actual length {}", opt_params_len, @@ -133,7 +126,7 @@ pub fn parse_bgp_open_message(input: &mut Bytes) -> Result = vec![]; while input.remaining() >= 2 { - let param_type = input.get_u8(); + let param_type = input.read_u8()?; if first { // first parameter, check if it is extended length message if opt_params_len == 255 && param_type == 255 { @@ -146,7 +139,7 @@ pub fn parse_bgp_open_message(input: &mut Bytes) -> Result) -> std::fmt::Result { - match self { - ParserBmpError::InvalidOpenBmpHeader => { - write!(f, "Invalid OpenBMP header") - } - ParserBmpError::UnsupportedOpenBmpMessage => { - write!(f, "Unsupported OpenBMP message") - } - ParserBmpError::CorruptedBmpMessage => { - write!(f, "Corrupted BMP message") - } - ParserBmpError::TruncatedBmpMessage => { - write!(f, "Truncated BMP message") - } - } - } -} - -impl Error for ParserBmpError {} - -// TODO: These conversions make the error difficult to debug as they drop all of the error context -impl From for ParserBmpError { - fn from(_: std::io::Error) -> Self { - ParserBmpError::InvalidOpenBmpHeader - } -} - -impl From for ParserBmpError { - fn from(_: ParserError) -> Self { - ParserBmpError::CorruptedBmpMessage - } -} - -impl From> for ParserBmpError { - fn from(_: TryFromPrimitiveError) -> Self { - ParserBmpError::InvalidOpenBmpHeader - } -} - -impl From> for ParserBmpError { - fn from(_: TryFromPrimitiveError) -> Self { - ParserBmpError::InvalidOpenBmpHeader - } -} - -impl From> for ParserBmpError { - fn from(_: TryFromPrimitiveError) -> Self { - ParserBmpError::CorruptedBmpMessage - } -} - -impl From> for ParserBmpError { - fn from(_: TryFromPrimitiveError) -> Self { - ParserBmpError::CorruptedBmpMessage - } -} - -impl From> for ParserBmpError { - fn from(_: TryFromPrimitiveError) -> Self { - ParserBmpError::CorruptedBmpMessage +impl From> for ParserBmpError +where + T: TryFromPrimitive, + ParserError: From>, +{ + fn from(value: TryFromPrimitiveError) -> Self { + ParserBmpError::ParseError(ParserError::from(value)) } } diff --git a/src/parser/bmp/messages/headers.rs b/src/parser/bmp/messages/headers.rs index 327d83e..1c5b8c1 100644 --- a/src/parser/bmp/messages/headers.rs +++ b/src/parser/bmp/messages/headers.rs @@ -58,7 +58,7 @@ pub fn parse_bmp_common_header(data: &mut Bytes) -> Result Result 4 { - let info_type: InitiationTlvType = InitiationTlvType::try_from(data.get_u16())?; - let info_len = data.get_u16(); + let info_type: InitiationTlvType = InitiationTlvType::try_from(data.read_u16()?)?; + let info_len = data.read_u16()?; if data.remaining() < info_len as usize { // not enough bytes to read break; diff --git a/src/parser/bmp/messages/peer_down_notification.rs b/src/parser/bmp/messages/peer_down_notification.rs index 6053f51..5efb4d9 100644 --- a/src/parser/bmp/messages/peer_down_notification.rs +++ b/src/parser/bmp/messages/peer_down_notification.rs @@ -1,66 +1,51 @@ use crate::parser::bmp::error::ParserBmpError; use crate::parser::ReadUtils; use bytes::{Buf, Bytes}; +use num_enum::{IntoPrimitive, TryFromPrimitive}; + +#[derive(Debug, Hash, Eq, PartialEq, TryFromPrimitive, IntoPrimitive)] +#[repr(u8)] +pub enum PeerDownReason { + /// The local system closed the session. Following the reason is a BGP PDU containing a BGP + /// NOTIFICATION message that would have been sent to the peer. + LocalSystemClosedSession = 1, + /// The local system closed the session. No notification message was sent. Following the + /// reason code is a 2-byte field containing the code corresponding to the Finite State Machine + /// (FSM) Event that caused the system to close the session (see Section 8.1 of [RFC4271]). Two + /// bytes both set to 0 are used to indicate that no relevant Event code is defined. + LocalSystemClosedSessionWithoutNotification = 2, + /// The remote system closed the session with a notification message. Following the Reason is a + /// BGP PDU containing the BGP NOTIFICATION message as received from the peer. + RemoteSystemClosedSession = 3, + /// The remote system closed the session without a notification message. This includes any + /// unexpected termination of the transport session, so in some cases both the local and remote + /// systems might consider this to apply. + RemoteSystemClosedSessionWithoutNotification = 4, + /// Information for this peer will no longer be sent to the monitoring station for configuration + /// reasons. This does not, strictly speaking, indicate that the peer has gone down, but it + /// does indicate that the monitoring station will not receive updates for the peer. + DisabledDueToConfig = 5, +} #[derive(Debug)] pub struct PeerDownNotification { - pub reason: u8, + pub reason: PeerDownReason, pub data: Option>, } pub fn parse_peer_down_notification( data: &mut Bytes, ) -> Result { - let reason = data.read_u8()?; + let reason = PeerDownReason::try_from(data.read_u8()?)?; let bytes_left = data.remaining(); - let data: Option> = match reason { - 1 => { - /* - The local system closed the session. Following the - Reason is a BGP PDU containing a BGP NOTIFICATION message that - would have been sent to the peer. - */ - Some(data.read_n_bytes(bytes_left)?) - } - 2 => { - /* - The local system closed the session. No notification - message was sent. Following the reason code is a 2-byte field - containing the code corresponding to the Finite State Machine - (FSM) Event that caused the system to close the session (see - Section 8.1 of [RFC4271]). Two bytes both set to 0 are used to - indicate that no relevant Event code is defined. - */ + let data = match reason { + PeerDownReason::LocalSystemClosedSession => Some(data.read_n_bytes(bytes_left)?), + PeerDownReason::LocalSystemClosedSessionWithoutNotification => { Some(data.read_n_bytes(bytes_left)?) } - 3 => { - /* - The remote system closed the session with a notification - message. Following the Reason is a BGP PDU containing the BGP - NOTIFICATION message as received from the peer. - */ - Some(data.read_n_bytes(bytes_left)?) - } - 4 => { - /* - The remote system closed the session without a - notification message. This includes any unexpected termination of - the transport session, so in some cases both the local and remote - systems might consider this to apply. - */ - None - } - 5 => { - /* - Information for this peer will no longer be sent to the - monitoring station for configuration reasons. This does not, - strictly speaking, indicate that the peer has gone down, but it - does indicate that the monitoring station will not receive updates - for the peer. - */ - None - } - _ => return Err(ParserBmpError::CorruptedBmpMessage), + PeerDownReason::RemoteSystemClosedSession => Some(data.read_n_bytes(bytes_left)?), + PeerDownReason::RemoteSystemClosedSessionWithoutNotification => None, + PeerDownReason::DisabledDueToConfig => None, }; Ok(PeerDownNotification { reason, data }) } diff --git a/src/parser/bmp/messages/route_mirroring.rs b/src/parser/bmp/messages/route_mirroring.rs index 555e046..99a6c17 100644 --- a/src/parser/bmp/messages/route_mirroring.rs +++ b/src/parser/bmp/messages/route_mirroring.rs @@ -2,7 +2,7 @@ use crate::models::*; use crate::parser::bgp::messages::parse_bgp_update_message; use crate::parser::bmp::error::ParserBmpError; use crate::parser::ReadUtils; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use num_enum::{IntoPrimitive, TryFromPrimitive}; use std::convert::TryFrom; @@ -23,7 +23,23 @@ pub enum RouteMirroringValue { Information(RouteMirroringInfo), } -#[derive(Debug, TryFromPrimitive, IntoPrimitive)] +impl RouteMirroringValue { + pub const fn mirroring_type(&self) -> RouteMirroringTlvType { + match self { + RouteMirroringValue::BgpMessage(_) => RouteMirroringTlvType::BgpMessage, + RouteMirroringValue::Information(_) => RouteMirroringTlvType::Information, + } + } +} + +#[derive(Debug, TryFromPrimitive, IntoPrimitive, Hash, Eq, PartialEq)] +#[repr(u16)] +pub enum RouteMirroringTlvType { + BgpMessage = 0, + Information = 1, +} + +#[derive(Debug, TryFromPrimitive, IntoPrimitive, Hash, Eq, PartialEq)] #[repr(u16)] pub enum RouteMirroringInfo { ErroredPdu = 0, @@ -35,9 +51,9 @@ pub fn parse_route_mirroring( asn_len: &AsnLength, ) -> Result { let mut tlvs = vec![]; - while data.remaining() > 4 { - match data.read_u16()? { - 0 => { + while !data.is_empty() { + match RouteMirroringTlvType::try_from(data.read_u16()?)? { + RouteMirroringTlvType::BgpMessage => { let info_len = data.read_u16()?; let bytes = data.split_to(info_len as usize); let value = parse_bgp_update_message(bytes, false, asn_len)?; @@ -46,7 +62,7 @@ pub fn parse_route_mirroring( value: RouteMirroringValue::BgpMessage(value), }); } - 1 => { + RouteMirroringTlvType::Information => { let info_len = data.read_u16()?; let value = RouteMirroringInfo::try_from(data.read_u16()?)?; tlvs.push(RouteMirroringTlv { @@ -54,7 +70,6 @@ pub fn parse_route_mirroring( value: RouteMirroringValue::Information(value), }); } - _ => return Err(ParserBmpError::CorruptedBmpMessage), } } Ok(RouteMirroring { tlvs }) diff --git a/src/parser/bmp/messages/stats_report.rs b/src/parser/bmp/messages/stats_report.rs index 914ae28..76f274d 100644 --- a/src/parser/bmp/messages/stats_report.rs +++ b/src/parser/bmp/messages/stats_report.rs @@ -30,7 +30,7 @@ pub fn parse_stats_report(data: &mut Bytes) -> Result StatsData::Counter(data.read_u32()?), 8 => StatsData::Gauge(data.read_u64()?), - _ => return Err(ParserBmpError::CorruptedBmpMessage), + x => return Err(ParserBmpError::InvalidStatsDataLength(x)), }; counters.push(StatCounter { stat_type, diff --git a/src/parser/filter.rs b/src/parser/filter.rs index ec0400e..218e540 100644 --- a/src/parser/filter.rs +++ b/src/parser/filter.rs @@ -24,19 +24,21 @@ and returns a new parser with specified filter added. See the example below. ```no_run use bgpkit_parser::BgpkitParser; +use bgpkit_parser::filter::Filter; +use bgpkit_parser::models::ElemType::ANNOUNCE; /// This example shows how to parse a MRT file and filter by prefix. env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); log::info!("downloading updates file"); let parser = BgpkitParser::new("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2").unwrap() - .add_filter("prefix", "211.98.251.0/24").unwrap() - .add_filter("type", "a").unwrap(); + .add_filter(Filter::prefix("211.98.251.0/24").unwrap()) + .add_filter(Filter::Type(ANNOUNCE)); // iterating through the parser. the iterator returns `BgpElem` one at a time. log::info!("parsing updates file"); for elem in parser { - log::info!("{}", &elem); + log::info!("{}", elem.unwrap()); } log::info!("done"); ``` @@ -52,8 +54,7 @@ later releases. */ use crate::models::*; -use crate::ParserError; -use crate::ParserError::FilterError; +use chrono::DateTime; use ipnet::IpNet; use regex::Regex; use std::net::IpAddr; @@ -89,114 +90,31 @@ pub enum PrefixMatchType { IncludeSuperSub, } -fn parse_time_str(time_str: &str) -> Option { - if let Ok(t) = time_str.parse::() { - return chrono::NaiveDateTime::from_timestamp_opt(t as i64, 0); +fn parse_time_str(time_str: &str) -> chrono::ParseResult { + if let Ok(unix_timestamp) = time_str.parse::() { + return Ok(unix_timestamp); } - if let Ok(t) = chrono::DateTime::parse_from_rfc3339(time_str) { - return Some(t.naive_utc()); - } - None + + DateTime::parse_from_rfc3339(time_str).map(|x| x.naive_utc().timestamp() as f64) } +/// Constructors provided for some backwards compatability with the removed `Filter::new`. impl Filter { - pub fn new(filter_type: &str, filter_value: &str) -> Result { - match filter_type { - "origin_asn" => match u32::from_str(filter_value) { - Ok(v) => Ok(Filter::OriginAsn(v)), - Err(_) => Err(FilterError(format!( - "cannot parse origin asn from {}", - filter_value - ))), - }, - "prefix" => match IpNet::from_str(filter_value) { - Ok(v) => Ok(Filter::Prefix(v, PrefixMatchType::Exact)), - Err(_) => Err(FilterError(format!( - "cannot parse prefix from {}", - filter_value - ))), - }, - "prefix_super" => match IpNet::from_str(filter_value) { - Ok(v) => Ok(Filter::Prefix(v, PrefixMatchType::IncludeSuper)), - Err(_) => Err(FilterError(format!( - "cannot parse prefix from {}", - filter_value - ))), - }, - "prefix_sub" => match IpNet::from_str(filter_value) { - Ok(v) => Ok(Filter::Prefix(v, PrefixMatchType::IncludeSub)), - Err(_) => Err(FilterError(format!( - "cannot parse prefix from {}", - filter_value - ))), - }, - "prefix_super_sub" => match IpNet::from_str(filter_value) { - Ok(v) => Ok(Filter::Prefix(v, PrefixMatchType::IncludeSuperSub)), - Err(_) => Err(FilterError(format!( - "cannot parse prefix from {}", - filter_value - ))), - }, - "peer_ip" => match IpAddr::from_str(filter_value) { - Ok(v) => Ok(Filter::PeerIp(v)), - Err(_) => Err(FilterError(format!( - "cannot parse peer IP from {}", - filter_value - ))), - }, - "peer_ips" => { - let mut ips = vec![]; - for ip_str in filter_value.replace(' ', "").split(',') { - match IpAddr::from_str(ip_str) { - Ok(v) => ips.push(v), - Err(_) => { - return Err(FilterError(format!( - "cannot parse peer IP from {}", - ip_str - ))) - } - } - } - Ok(Filter::PeerIps(ips)) - } - "peer_asn" => match u32::from_str(filter_value) { - Ok(v) => Ok(Filter::PeerAsn(v)), - Err(_) => Err(FilterError(format!( - "cannot parse peer asn from {}", - filter_value - ))), - }, - "type" => match filter_value { - "w" | "withdraw" | "withdrawal" => Ok(Filter::Type(ElemType::WITHDRAW)), - "a" | "announce" | "announcement" => Ok(Filter::Type(ElemType::ANNOUNCE)), - _ => Err(FilterError(format!( - "cannot parse elem type from {}", - filter_value - ))), - }, - "ts_start" | "start_ts" => match parse_time_str(filter_value) { - Some(t) => Ok(Filter::TsStart(t.timestamp() as f64)), - None => Err(FilterError(format!( - "cannot parse TsStart filter from {}", - filter_value - ))), - }, - "ts_end" | "end_ts" => match parse_time_str(filter_value) { - Some(t) => Ok(Filter::TsEnd(t.timestamp() as f64)), - None => Err(FilterError(format!( - "cannot parse TsEnd filter from {}", - filter_value - ))), - }, - "as_path" => match Regex::from_str(filter_value) { - Ok(v) => Ok(Filter::AsPath(v)), - Err(_) => Err(FilterError(format!( - "cannot parse AS path regex from {}", - filter_value - ))), - }, - _ => Err(FilterError(format!("unknown filter type: {}", filter_type))), - } + pub fn ts_start(time_str: &str) -> chrono::ParseResult { + parse_time_str(time_str).map(Filter::TsStart) + } + + pub fn ts_end(time_str: &str) -> chrono::ParseResult { + parse_time_str(time_str).map(Filter::TsEnd) + } + + #[allow(clippy::wrong_self_convention)] + pub fn as_path(regex: &str) -> Result { + Regex::new(regex).map(Filter::AsPath) + } + + pub fn prefix(prefix: &str) -> Result { + IpNet::from_str(prefix).map(|prefix| Filter::Prefix(prefix, PrefixMatchType::Exact)) } } @@ -205,54 +123,17 @@ pub trait Filterable { fn match_filters(&self, filters: &[Filter]) -> bool; } -const fn same_family(prefix_1: &IpNet, prefix_2: &IpNet) -> bool { - matches!( - (prefix_1, prefix_2), - (IpNet::V4(_), IpNet::V4(_)) | (IpNet::V6(_), IpNet::V6(_)) - ) -} - fn prefix_match(match_prefix: &IpNet, input_prefix: &IpNet, t: &PrefixMatchType) -> bool { - let exact = input_prefix.eq(match_prefix); + if input_prefix == match_prefix { + return true; + } + match t { - PrefixMatchType::Exact => exact, - PrefixMatchType::IncludeSuper => { - if exact { - exact - } else if !same_family(match_prefix, input_prefix) { - // version not match - false - } else { - // input_prefix is super prefix of match_prefix - match_prefix.addr() >= input_prefix.addr() - && match_prefix.broadcast() <= input_prefix.broadcast() - } - } - PrefixMatchType::IncludeSub => { - if exact { - exact - } else if !same_family(match_prefix, input_prefix) { - // version not match - false - } else { - // input_prefix is sub prefix of match_prefix - match_prefix.addr() <= input_prefix.addr() - && match_prefix.broadcast() >= input_prefix.broadcast() - } - } + PrefixMatchType::Exact => false, + PrefixMatchType::IncludeSuper => input_prefix.contains(match_prefix), + PrefixMatchType::IncludeSub => match_prefix.contains(input_prefix), PrefixMatchType::IncludeSuperSub => { - if exact { - exact - } else if !same_family(match_prefix, input_prefix) { - // version not match - false - } else { - // input_prefix is super prefix of match_prefix - (match_prefix.addr() >= input_prefix.addr() - && match_prefix.broadcast() <= input_prefix.broadcast()) - || (match_prefix.addr() <= input_prefix.addr() - && match_prefix.broadcast() >= input_prefix.broadcast()) - } + input_prefix.contains(match_prefix) || match_prefix.contains(input_prefix) } } } @@ -295,13 +176,16 @@ mod tests { use super::*; use crate::BgpkitParser; use anyhow::Result; + use chrono::NaiveDateTime; + use itertools::Itertools; + use std::net::{Ipv4Addr, Ipv6Addr}; use std::str::FromStr; #[test] fn test_filter() { let url = "https://spaces.bgpkit.org/parser/update-example.gz"; let parser = BgpkitParser::new(url).unwrap(); - let elems = parser.into_elem_iter().collect::>(); + let elems: Vec = parser.into_elem_iter().try_collect().unwrap(); let filters = vec![Filter::PeerIp(IpAddr::from_str("185.1.8.65").unwrap())]; let count = elems.iter().filter(|e| e.match_filters(&filters)).count(); @@ -383,29 +267,31 @@ mod tests { #[test] fn test_parsing_time_str() { - let ts = chrono::NaiveDateTime::from_str("2021-11-20T19:49:58").unwrap(); - assert_eq!(parse_time_str("1637437798"), Some(ts)); - assert_eq!(parse_time_str("2021-11-20T19:49:58Z"), Some(ts)); - assert_eq!(parse_time_str("2021-11-20T19:49:58+00:00"), Some(ts)); - - assert_eq!(parse_time_str("2021-11-20T19:49:58"), None); - assert_eq!(parse_time_str("2021-11-20T19:49:58ZDXV"), None); - assert_eq!(parse_time_str("2021-11-20 19:49:58"), None); - assert_eq!(parse_time_str("2021-11-20"), None); + let ts = NaiveDateTime::from_str("2021-11-20T19:49:58") + .unwrap() + .timestamp() as f64; + assert_eq!(parse_time_str("1637437798").ok(), Some(ts)); + assert_eq!(parse_time_str("2021-11-20T19:49:58Z").ok(), Some(ts)); + assert_eq!(parse_time_str("2021-11-20T19:49:58+00:00").ok(), Some(ts)); + + assert_eq!(parse_time_str("2021-11-20T19:49:58").ok(), None); + assert_eq!(parse_time_str("2021-11-20T19:49:58ZDXV").ok(), None); + assert_eq!(parse_time_str("2021-11-20 19:49:58").ok(), None); + assert_eq!(parse_time_str("2021-11-20").ok(), None); } #[test] fn test_filter_iter() -> Result<()> { let url = "https://spaces.bgpkit.org/parser/update-example.gz"; let parser = BgpkitParser::new(url)? - .add_filter("peer_ip", "185.1.8.50")? - .add_filter("type", "w")?; + .add_filter(Filter::PeerIp(Ipv4Addr::new(185, 1, 8, 50).into())) + .add_filter(Filter::Type(ElemType::WITHDRAW)); let count = parser.into_elem_iter().count(); assert_eq!(count, 39); let parser = BgpkitParser::new(url)? - .add_filter("ts_start", "1637437798")? - .add_filter("ts_end", "2021-11-20T19:49:58Z")?; + .add_filter(Filter::ts_start("1637437798")?) + .add_filter(Filter::ts_end("2021-11-20T19:49:58Z")?); let count = parser.into_elem_iter().count(); assert_eq!(count, 13); Ok(()) @@ -416,8 +302,10 @@ mod tests { let url = "https://spaces.bgpkit.org/parser/update-example.gz"; let parser = BgpkitParser::new(url) .unwrap() - .add_filter("peer_ips", "185.1.8.65, 2001:7f8:73:0:3:fa4:0:1") - .unwrap(); + .add_filter(Filter::PeerIps(vec![ + Ipv4Addr::new(185, 1, 8, 65).into(), + Ipv6Addr::from([0x2001, 0x7f8, 0x73, 0x0, 0x3, 0xfa4, 0x0, 0x1]).into(), + ])); let count = parser.into_elem_iter().count(); assert_eq!(count, 3393 + 834); } diff --git a/src/parser/iters.rs b/src/parser/iters.rs index dd6f193..4355021 100644 --- a/src/parser/iters.rs +++ b/src/parser/iters.rs @@ -5,12 +5,11 @@ use crate::error::ParserError; use crate::models::*; use crate::parser::BgpkitParser; use crate::{Elementor, Filterable}; -use log::{error, warn}; use std::io::Read; /// Use [ElemIterator] as the default iterator to return [BgpElem]s instead of [MrtRecord]s. impl IntoIterator for BgpkitParser { - type Item = BgpElem; + type Item = Result; type IntoIter = ElemIterator; fn into_iter(self) -> Self::IntoIter { @@ -35,6 +34,7 @@ pub struct RecordIterator { pub parser: BgpkitParser, pub count: u64, elementor: Elementor, + had_fatal_error: bool, } impl RecordIterator { @@ -43,86 +43,46 @@ impl RecordIterator { parser, count: 0, elementor: Elementor::new(), + had_fatal_error: false, } } } impl Iterator for RecordIterator { - type Item = MrtRecord; + type Item = Result; + + fn next(&mut self) -> Option { + if self.had_fatal_error { + return None; + } - fn next(&mut self) -> Option { - self.count += 1; loop { - return match self.parser.next_record() { - Ok(v) => { - // if None, the reaches EoF. - let filters = &self.parser.filters; - if filters.is_empty() { - Some(v) - } else { - if let MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable( - _, - )) = &v.message - { - let _ = self.elementor.record_to_elems(v.clone()); - return Some(v); - } - let elems = self.elementor.record_to_elems(v.clone()); - if elems.iter().any(|e| e.match_filters(&self.parser.filters)) { - Some(v) - } else { - continue; - } - } - } - Err(e) => { - match e.error { - ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => { - if self.parser.options.show_warnings { - warn!("parser warn: {}", err_str); - } - if let Some(bytes) = e.bytes { - std::fs::write("mrt_core_dump", bytes) - .expect("Unable to write to mrt_core_dump"); - } - continue; - } - ParserError::ParseError(err_str) => { - error!("parser error: {}", err_str); - if self.parser.core_dump { - if let Some(bytes) = e.bytes { - std::fs::write("mrt_core_dump", bytes) - .expect("Unable to write to mrt_core_dump"); - } - None - } else { - continue; - } - } - ParserError::EofExpected => { - // normal end of file - None - } - ParserError::IoError(err) | ParserError::EofError(err) => { - // when reaching IO error, stop iterating - error!("{:?}", err); - if self.parser.core_dump { - if let Some(bytes) = e.bytes { - std::fs::write("mrt_core_dump", bytes) - .expect("Unable to write to mrt_core_dump"); - } - } - None - } - ParserError::OneIoError(_) - | ParserError::FilterError(_) - | ParserError::IoNotEnoughBytes() => { - // this should not happen at this stage - None - } - } + self.count += 1; + let record = match self.parser.next_record() { + Ok(None) => return None, + Ok(Some(v)) => v, + Err(err @ (ParserError::IoError(_) | ParserError::UnrecognizedMrtType(_))) => { + self.had_fatal_error = true; + return Some(Err(err)); } + Err(err) => return Some(Err(err)), }; + + if self.parser.filters.is_empty() { + return Some(Ok(record)); + } + + if let MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(_)) = + &record.message + { + let _ = self.elementor.record_to_elems(record.clone()); + return Some(Ok(record)); + } + + let elems = self.elementor.record_to_elems(record.clone()); + if elems.iter().any(|e| e.match_filters(&self.parser.filters)) { + return Some(Ok(record)); + } } } } @@ -132,64 +92,50 @@ BgpElem Iterator **********/ pub struct ElemIterator { - cache_elems: Vec, - record_iter: RecordIterator, + parser: BgpkitParser, + element_queue: Vec, elementor: Elementor, - count: u64, + had_fatal_error: bool, } impl ElemIterator { fn new(parser: BgpkitParser) -> Self { ElemIterator { - record_iter: RecordIterator::new(parser), - count: 0, - cache_elems: vec![], + parser, + element_queue: vec![], elementor: Elementor::new(), + had_fatal_error: false, } } } impl Iterator for ElemIterator { - type Item = BgpElem; - - fn next(&mut self) -> Option { - self.count += 1; + type Item = Result; + fn next(&mut self) -> Option { loop { - if self.cache_elems.is_empty() { - // refill cache elems - loop { - match self.record_iter.next() { - None => { - // no more records - return None; - } - Some(r) => { - let mut elems = self.elementor.record_to_elems(r); - if elems.is_empty() { - // somehow this record does not contain any elems, continue to parse next record - continue; - } else { - elems.reverse(); - self.cache_elems = elems; - break; - } - } - } - } - // when reaching here, the `self.cache_elems` has been refilled with some more elems + if let Some(element) = self.element_queue.pop() { + return Some(Ok(element)); } - // popping cached elems. note that the original elems order is preseved by reversing the - // vector before putting it on to cache_elems. - let elem = self.cache_elems.pop(); - match elem { - None => return None, - Some(e) => match e.match_filters(&self.record_iter.parser.filters) { - true => return Some(e), - false => continue, - }, + if self.had_fatal_error { + return None; } + + let record = match self.parser.next_record() { + Ok(None) => return None, + Ok(Some(v)) => v, + Err(err @ (ParserError::IoError(_) | ParserError::UnrecognizedMrtType(_))) => { + self.had_fatal_error = true; + return Some(Err(err)); + } + Err(err) => return Some(Err(err)), + }; + + let new_elements = self.elementor.record_to_elems(record); + self.element_queue.extend(new_elements.into_iter().rev()); + self.element_queue + .retain(|element| element.match_filters(&self.parser.filters)); } } } diff --git a/src/parser/mod.rs b/src/parser/mod.rs index f2f700f..458e51e 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -21,9 +21,9 @@ pub(crate) use mrt::{parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2 use crate::models::MrtRecord; use filter::Filter; pub use mrt::mrt_elem::Elementor; -use oneio::{get_cache_reader, get_reader}; +use oneio::{get_cache_reader, get_reader, OneIoError}; -pub use crate::error::{ParserError, ParserErrorWithBytes}; +pub use crate::error::ParserError; pub use bmp::{parse_bmp_msg, parse_openbmp_header, parse_openbmp_msg}; pub use filter::*; pub use iters::*; @@ -52,7 +52,7 @@ impl Default for ParserOptions { impl BgpkitParser> { /// Creating a new parser from a object that implements [Read] trait. - pub fn new(path: &str) -> Result { + pub fn new(path: &str) -> Result { let reader = get_reader(path)?; Ok(BgpkitParser { reader, @@ -67,7 +67,7 @@ impl BgpkitParser> { /// The cache file name is generated by the following format: `cache--`. /// For example, the remote file `http://archive.routeviews.org/route-views.chile/bgpdata/2023.03/RIBS/rib.20230326.0600.bz2` /// will be cached as `cache-682cb1eb-rib.20230326.0600.bz2` in the cache directory. - pub fn new_cached(path: &str, cache_dir: &str) -> Result { + pub fn new_cached(path: &str, cache_dir: &str) -> Result { let file_name = path.rsplit('/').next().unwrap().to_string(); let new_file_name = format!( "cache-{}", @@ -108,8 +108,8 @@ impl BgpkitParser { } /// This is used in for loop `for item in parser{}` - pub fn next_record(&mut self) -> Result { - parse_mrt_record(&mut self.reader) + pub fn next_record(&mut self) -> Result, ParserError> { + try_parse_mrt_record(&mut self.reader) } } @@ -134,19 +134,9 @@ impl BgpkitParser { } } - pub fn add_filter( - self, - filter_type: &str, - filter_value: &str, - ) -> Result { - let mut filters = self.filters; - filters.push(Filter::new(filter_type, filter_value)?); - Ok(BgpkitParser { - reader: self.reader, - core_dump: self.core_dump, - filters, - options: self.options, - }) + pub fn add_filter(mut self, filter: Filter) -> Self { + self.filters.push(filter); + self } } diff --git a/src/parser/mrt/messages/bgp4mp.rs b/src/parser/mrt/messages/bgp4mp.rs index 7d1e6ee..0fc97e8 100644 --- a/src/parser/mrt/messages/bgp4mp.rs +++ b/src/parser/mrt/messages/bgp4mp.rs @@ -2,7 +2,7 @@ use crate::error::ParserError; use crate::models::*; use crate::parser::bgp::messages::parse_bgp_message; use crate::parser::ReadUtils; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use std::convert::TryFrom; /// Parse MRT BGP4MP type @@ -82,13 +82,7 @@ pub fn parse_bgp4mp_message( let local_ip = data.read_address(&afi)?; let should_read = total_should_read(&afi, &asn_len, total_size); - if should_read != data.remaining() { - return Err(ParserError::TruncatedMsg(format!( - "truncated bgp4mp message: should read {} bytes, have {} bytes available", - should_read, - data.remaining() - ))); - } + data.expect_remaining_eq(should_read, "bgp4mp message")?; let bgp_message: BgpMessage = parse_bgp_message(&mut data, add_path, &asn_len)?; Ok(Bgp4MpMessage { diff --git a/src/parser/mrt/messages/table_dump_message.rs b/src/parser/mrt/messages/table_dump_message.rs index 46e7d55..b55f117 100644 --- a/src/parser/mrt/messages/table_dump_message.rs +++ b/src/parser/mrt/messages/table_dump_message.rs @@ -45,16 +45,7 @@ pub fn parse_table_dump_message( // - create data slice reader cursor // determine address family based on the sub_type value defined in the MRT [CommonHeader]. - let afi = match sub_type { - 1 => Afi::Ipv4, - 2 => Afi::Ipv6, - _ => { - return Err(ParserError::ParseError(format!( - "Invalid subtype found for TABLE_DUMP (V1) message: {}", - sub_type - ))) - } - }; + let afi = Afi::try_from(sub_type)?; // #### // Step 1. read simple fields @@ -90,7 +81,7 @@ pub fn parse_table_dump_message( let attr_parser = AttributeParser::new(false); - data.has_n_remaining(attribute_length)?; + data.require_n_remaining(attribute_length, "rib attributes")?; let attr_data_slice = data.split_to(attribute_length); // for TABLE_DUMP type, the AS number length is always 2-byte. diff --git a/src/parser/mrt/messages/table_dump_v2_message.rs b/src/parser/mrt/messages/table_dump_v2_message.rs index 8febcd4..681e017 100644 --- a/src/parser/mrt/messages/table_dump_v2_message.rs +++ b/src/parser/mrt/messages/table_dump_v2_message.rs @@ -1,8 +1,7 @@ use crate::error::ParserError; use crate::models::*; use crate::parser::{AttributeParser, ReadUtils}; -use bytes::{Buf, Bytes}; -use log::warn; +use bytes::Bytes; use std::collections::HashMap; use std::convert::TryFrom; use std::net::{IpAddr, Ipv4Addr}; @@ -43,9 +42,10 @@ pub fn parse_table_dump_v2_message( TableDumpV2Type::RibGeneric | TableDumpV2Type::RibGenericAddPath | TableDumpV2Type::GeoPeerTable => { - return Err(ParserError::Unsupported( - "TableDumpV2 RibGeneric and GeoPeerTable is not currently supported".to_string(), - )) + return Err(ParserError::UnsupportedMrtType { + mrt_type: EntryType::TABLE_DUMP_V2, + subtype: sub_type, + }); } }; @@ -56,7 +56,7 @@ pub fn parse_table_dump_v2_message( /// /// RFC: https://www.rfc-editor.org/rfc/rfc6396#section-4.3.1 pub fn parse_peer_index_table(mut data: Bytes) -> Result { - let collector_bgp_id = Ipv4Addr::from(data.read_u32()?); + let collector_bgp_id = data.read_ipv4_address()?; // read and ignore view name let view_name_length = data.read_u16()?; let view_name = @@ -108,31 +108,23 @@ pub fn parse_rib_afi_entries( data: &mut Bytes, rib_type: TableDumpV2Type, ) -> Result { - let afi: Afi; - let safi: Safi; - match rib_type { + let (afi, safi) = match rib_type { TableDumpV2Type::RibIpv4Unicast | TableDumpV2Type::RibIpv4UnicastAddPath => { - afi = Afi::Ipv4; - safi = Safi::Unicast + (Afi::Ipv4, Safi::Unicast) } TableDumpV2Type::RibIpv4Multicast | TableDumpV2Type::RibIpv4MulticastAddPath => { - afi = Afi::Ipv4; - safi = Safi::Multicast + (Afi::Ipv4, Safi::Multicast) } TableDumpV2Type::RibIpv6Unicast | TableDumpV2Type::RibIpv6UnicastAddPath => { - afi = Afi::Ipv6; - safi = Safi::Unicast + (Afi::Ipv6, Safi::Unicast) } TableDumpV2Type::RibIpv6Multicast | TableDumpV2Type::RibIpv6MulticastAddPath => { - afi = Afi::Ipv6; - safi = Safi::Multicast - } - _ => { - return Err(ParserError::ParseError(format!( - "wrong RIB type for parsing: {:?}", - rib_type - ))) + (Afi::Ipv6, Safi::Multicast) } + ty => panic!( + "Invalid TableDumpV2Type {:?} passed to parse_rib_afi_entries", + ty + ), }; let add_path = matches!( @@ -157,14 +149,7 @@ pub fn parse_rib_afi_entries( // let attr_data_slice = &input.into_inner()[(input.position() as usize)..]; for _i in 0..entry_count { - let entry = match parse_rib_entry(data, add_path, &afi, &safi, &prefixes) { - Ok(entry) => entry, - Err(e) => { - warn!("early break due to error {}", e.to_string()); - break; - } - }; - rib_entries.push(entry); + rib_entries.push(parse_rib_entry(data, add_path, &afi, &safi, &prefixes)?); } Ok(RibAfiEntries { @@ -182,22 +167,19 @@ pub fn parse_rib_entry( safi: &Safi, prefixes: &[NetworkPrefix], ) -> Result { - if input.remaining() < 8 { - // total length - current position less than 16 -- - // meaning less than 16 bytes available to read - return Err(ParserError::TruncatedMsg("truncated msg".to_string())); - } + // total length - current position less than 16 -- + // meaning less than 16 bytes available to read + input.require_n_remaining(8, "rib entry")?; let peer_index = input.read_u16()?; let originated_time = input.read_u32()?; if add_path { + // TODO: Why is this value unused? let _path_id = input.read_u32()?; } let attribute_length = input.read_u16()? as usize; - if input.remaining() < attribute_length { - return Err(ParserError::TruncatedMsg("truncated msg".to_string())); - } + input.require_n_remaining(attribute_length, "rib entry attributes")?; let attr_parser = AttributeParser::new(add_path); diff --git a/src/parser/mrt/mod.rs b/src/parser/mrt/mod.rs index dff117d..6836259 100644 --- a/src/parser/mrt/mod.rs +++ b/src/parser/mrt/mod.rs @@ -10,4 +10,4 @@ pub mod mrt_record; pub use messages::bgp4mp::parse_bgp4mp; pub use messages::table_dump_message::parse_table_dump_message; pub use messages::table_dump_v2_message::parse_table_dump_v2_message; -pub use mrt_record::parse_mrt_record; +pub use mrt_record::{parse_mrt_record, try_parse_mrt_record}; diff --git a/src/parser/mrt/mrt_record.rs b/src/parser/mrt/mrt_record.rs index 6c80197..647369d 100644 --- a/src/parser/mrt/mrt_record.rs +++ b/src/parser/mrt/mrt_record.rs @@ -1,9 +1,9 @@ use crate::error::ParserError; use crate::models::*; use crate::parser::{ - parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes, + parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ReadUtils, }; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use std::convert::TryFrom; use std::io::Read; @@ -46,18 +46,22 @@ pub fn parse_common_header(input: &mut T) -> Result v, + Err(_) => return Err(ParserError::UnrecognizedMrtType(entry_type_raw)), + }; + let entry_subtype = data.read_u16()?; + let mut length = data.read_u32()?; let microsecond_timestamp = match &entry_type { EntryType::BGP4MP_ET => { + // TODO: Error if length < 4 length -= 4; let mut raw_bytes: [u8; 4] = [0; 4]; input.read_exact(&mut raw_bytes)?; - Some(BytesMut::from(&raw_bytes[..]).get_u32()) + Some(BytesMut::from(&raw_bytes[..]).read_u32()?) } _ => None, }; @@ -71,66 +75,63 @@ pub fn parse_common_header(input: &mut T) -> Result Result { - // parse common header - let common_header = match parse_common_header(input) { - Ok(v) => v, - Err(e) => { - if let ParserError::EofError(e) = &e { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - return Err(ParserErrorWithBytes::from(ParserError::EofExpected)); - } - } - return Err(ParserErrorWithBytes { - error: e, - bytes: None, - }); +/// An alternative to [parse_common_header] which returns `None` if the end of the file is reached +/// upon beginning to read the header. +pub fn try_parse_common_header( + input: &mut T, +) -> Result, ParserError> { + let mut first_byte = [0]; + match input.read(&mut first_byte)? { + 0 => Ok(None), + 1 => { + let mut reader = &first_byte[..]; + parse_common_header(&mut Read::chain(&mut reader, input)).map(Some) } + _ => unreachable!("Can only read 0 or 1 bytes into buffer of length 1 "), + } +} + +pub fn try_parse_mrt_record(input: &mut T) -> Result, ParserError> { + // parse common header + let common_header = match try_parse_common_header(input)? { + Some(v) => v, + None => return Ok(None), }; // read the whole message bytes to buffer - let mut buffer = BytesMut::with_capacity(common_header.length as usize); - buffer.resize(common_header.length as usize, 0); - match input - .take(common_header.length as u64) - .read_exact(&mut buffer) - { - Ok(_) => {} - Err(e) => { - return Err(ParserErrorWithBytes { - error: ParserError::IoError(e), - bytes: None, - }) - } - } + let mut buffer = BytesMut::zeroed(common_header.length as usize); + input.read_exact(&mut buffer)?; - match parse_mrt_body( - common_header.entry_type as u16, + let message = parse_mrt_body( + common_header.entry_type, common_header.entry_subtype, buffer.freeze(), // freeze the BytesMute to Bytes - ) { - Ok(message) => Ok(MrtRecord { - common_header, - message, - }), - Err(e) => { - // TODO: find more efficient way to preserve the bytes during error - // let mut total_bytes = vec![]; - // if common_header.write_header(&mut total_bytes).is_err() { - // unreachable!("Vec will never produce errors when used as a std::io::Write") - // } + )?; - // total_bytes.extend(buffer); - // Err(ParserErrorWithBytes { - // error: e, - // bytes: Some(total_bytes), - // }) - Err(ParserErrorWithBytes { - error: e, - bytes: None, - }) - } - } + Ok(Some(MrtRecord { + common_header, + message, + })) +} + +pub fn parse_mrt_record(input: &mut T) -> Result { + // parse common header + let common_header = parse_common_header(input)?; + + // read the whole message bytes to buffer + let mut buffer = BytesMut::zeroed(common_header.length as usize); + input.read_exact(&mut buffer)?; + + let message = parse_mrt_body( + common_header.entry_type, + common_header.entry_subtype, + buffer.freeze(), // freeze the BytesMute to Bytes + )?; + + Ok(MrtRecord { + common_header, + message, + }) } /// Parse MRT message body with given entry type and subtype. @@ -139,47 +140,29 @@ pub fn parse_mrt_record(input: &mut impl Read) -> Result Result { - let etype = EntryType::try_from(entry_type)?; - - let message: MrtMessage = match &etype { + match entry_type { EntryType::TABLE_DUMP => { - let msg = parse_table_dump_message(entry_subtype, data); - match msg { - Ok(msg) => MrtMessage::TableDumpMessage(msg), - Err(e) => { - return Err(e); - } - } + let msg = parse_table_dump_message(entry_subtype, data)?; + Ok(MrtMessage::TableDumpMessage(msg)) } EntryType::TABLE_DUMP_V2 => { - let msg = parse_table_dump_v2_message(entry_subtype, data); - match msg { - Ok(msg) => MrtMessage::TableDumpV2Message(msg), - Err(e) => { - return Err(e); - } - } + let msg = parse_table_dump_v2_message(entry_subtype, data)?; + Ok(MrtMessage::TableDumpV2Message(msg)) } EntryType::BGP4MP | EntryType::BGP4MP_ET => { - let msg = parse_bgp4mp(entry_subtype, data); - match msg { - Ok(msg) => MrtMessage::Bgp4Mp(msg), - Err(e) => { - return Err(e); - } - } + let msg = parse_bgp4mp(entry_subtype, data)?; + Ok(MrtMessage::Bgp4Mp(msg)) } - v => { + mrt_type => { // deprecated - return Err(ParserError::Unsupported(format!( - "unsupported MRT type: {:?}", - v - ))); + Err(ParserError::UnsupportedMrtType { + mrt_type, + subtype: entry_subtype, + }) } - }; - Ok(message) + } } diff --git a/src/parser/rislive/error.rs b/src/parser/rislive/error.rs index a64687c..dd4c855 100644 --- a/src/parser/rislive/error.rs +++ b/src/parser/rislive/error.rs @@ -1,58 +1,22 @@ -use std::convert; -use std::error::Error; -use std::fmt::{Display, Formatter}; +use crate::ParserError; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum ParserRisliveError { - IncorrectJson(String), - IncorrectRawBytes, - IrregularRisLiveFormat, - UnsupportedMessage, + #[error(transparent)] + IncorrectJson(#[from] serde_json::Error), + #[error("unable to parse aggregator attribute {0:?}")] + UnableToParseAggregator(String), + #[error("unable to parse raw bytes: {0}")] + UnableToParseRawBytes(ParserError), + #[error("unknown message type: {0:?}")] + UnknownMessageType(Option), + #[error("unsupported message type: {0}")] + UnsupportedMessage(String), + #[error("found 'eor' (End of RIB) prefix")] ElemEndOfRibPrefix, - ElemUnknownOriginType(String), - ElemIncorrectAggregator(String), - ElemIncorrectPrefix(String), - ElemIncorrectIp(String), + #[error("unknown origin type: {0}")] + UnknownOriginType(String), + #[error("unable to parse prefix: {0:?}")] + UnableToParsePrefix(String), } - -impl Display for ParserRisliveError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - ParserRisliveError::IncorrectJson(msg) => { - write!(f, "incorrect json message: {}", msg) - } - ParserRisliveError::IncorrectRawBytes => { - write!(f, "incorrect raw bytes") - } - ParserRisliveError::UnsupportedMessage => { - write!(f, "unsupported message") - } - ParserRisliveError::IrregularRisLiveFormat => { - write!(f, "irregular ris live format") - } - ParserRisliveError::ElemIncorrectPrefix(msg) => { - write!(f, "incorrect prefix string: {}", msg) - } - ParserRisliveError::ElemUnknownOriginType(msg) => { - write!(f, "unknown origin type: {}", msg) - } - ParserRisliveError::ElemIncorrectAggregator(msg) => { - write!(f, "incorrect aggregator string: {}", msg) - } - ParserRisliveError::ElemIncorrectIp(msg) => { - write!(f, "incorrect IP string: {}", msg) - } - ParserRisliveError::ElemEndOfRibPrefix => { - write!(f, "found 'eor' (End of RIB) prefix") - } - } - } -} - -impl convert::From for ParserRisliveError { - fn from(_: serde_json::Error) -> Self { - ParserRisliveError::IncorrectJson("serde_json error".to_string()) - } -} - -impl Error for ParserRisliveError {} diff --git a/src/parser/rislive/messages/raw_bytes.rs b/src/parser/rislive/messages/raw_bytes.rs index 06c58e8..29b3e4c 100644 --- a/src/parser/rislive/messages/raw_bytes.rs +++ b/src/parser/rislive/messages/raw_bytes.rs @@ -10,16 +10,20 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; pub fn parse_raw_bytes(msg_str: &str) -> Result, ParserRisliveError> { let msg: Value = serde_json::from_str(msg_str)?; let msg_type = match msg.get("type") { - None => return Err(ParserRisliveError::IrregularRisLiveFormat), + None => return Err(ParserRisliveError::UnknownMessageType(None)), Some(t) => t.as_str().unwrap(), }; match msg_type { "ris_message" => {} "ris_error" | "ris_rrc_list" | "ris_subscribe_ok" | "pong" => { - return Err(ParserRisliveError::UnsupportedMessage) + return Err(ParserRisliveError::UnsupportedMessage(msg_type.to_string())) + } + _ => { + return Err(ParserRisliveError::UnknownMessageType(Some( + msg_type.to_string(), + ))) } - _ => return Err(ParserRisliveError::IrregularRisLiveFormat), } let data = msg.get("data").unwrap().as_object().unwrap(); @@ -43,7 +47,7 @@ pub fn parse_raw_bytes(msg_str: &str) -> Result, ParserRisliveError Ok(m) => m, Err(_) => match parse_bgp_message(&mut bytes, false, &AsnLength::Bits16) { Ok(m) => m, - Err(_) => return Err(ParserRisliveError::IncorrectRawBytes), + Err(err) => return Err(ParserRisliveError::UnableToParseRawBytes(err)), }, }; diff --git a/src/parser/rislive/mod.rs b/src/parser/rislive/mod.rs index 917048f..18c9d91 100644 --- a/src/parser/rislive/mod.rs +++ b/src/parser/rislive/mod.rs @@ -48,25 +48,10 @@ use std::net::Ipv4Addr; pub mod error; pub mod messages; -// simple macro to make the code look a bit nicer -macro_rules! unwrap_or_return { - ( $e:expr, $msg_string:expr ) => { - match $e { - Ok(x) => x, - Err(_) => return Err(ParserRisliveError::IncorrectJson($msg_string)), - } - }; -} - /// This function parses one message and returns a result of a vector of [BgpElem]s or an error pub fn parse_ris_live_message(msg_str: &str) -> Result, ParserRisliveError> { - let msg_string = msg_str.to_string(); - // parse RIS Live message to internal struct using serde. - let msg: RisLiveMessage = match serde_json::from_str(msg_str) { - Ok(m) => m, - Err(_e) => return Err(ParserRisliveError::IncorrectJson(msg_string)), - }; + let msg: RisLiveMessage = serde_json::from_str(msg_str)?; match msg { RisLiveMessage::RisMessage(ris_msg) => { @@ -107,7 +92,7 @@ pub fn parse_ris_live_message(msg_str: &str) -> Result, ParserRisli "egp" | "EGP" => Origin::EGP, "incomplete" | "INCOMPLETE" => Origin::INCOMPLETE, other => { - return Err(ParserRisliveError::ElemUnknownOriginType( + return Err(ParserRisliveError::UnknownOriginType( other.to_string(), )); } @@ -120,15 +105,19 @@ pub fn parse_ris_live_message(msg_str: &str) -> Result, ParserRisli Some(aggr_str) => { let (asn_str, ip_str) = match aggr_str.split_once(':') { None => { - return Err(ParserRisliveError::ElemIncorrectAggregator( + return Err(ParserRisliveError::UnableToParseAggregator( aggr_str, )) } Some(v) => v, }; - let asn = unwrap_or_return!(asn_str.parse::(), msg_string); - let ip = unwrap_or_return!(ip_str.parse::(), msg_string); + let asn = asn_str.parse::().map_err(|_| { + ParserRisliveError::UnableToParseAggregator(aggr_str.to_owned()) + })?; + let ip = ip_str.parse::().map_err(|_| { + ParserRisliveError::UnableToParseAggregator(aggr_str) + })?; (Some(asn), Some(ip)) } }; @@ -143,7 +132,7 @@ pub fn parse_ris_live_message(msg_str: &str) -> Result, ParserRisli if prefix == "eor" { return Err(ParserRisliveError::ElemEndOfRibPrefix); } - return Err(ParserRisliveError::ElemIncorrectPrefix( + return Err(ParserRisliveError::UnableToParsePrefix( prefix.to_string(), )); } diff --git a/src/parser/utils.rs b/src/parser/utils.rs index 7b10f83..b8b5b70 100644 --- a/src/parser/utils.rs +++ b/src/parser/utils.rs @@ -3,10 +3,7 @@ Provides IO utility functions for read bytes of different length and converting */ use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use std::convert::TryFrom; -use std::{ - io, - net::{Ipv4Addr, Ipv6Addr}, -}; +use std::net::{Ipv4Addr, Ipv6Addr}; use crate::models::*; use bytes::{Buf, Bytes}; @@ -14,97 +11,104 @@ use log::debug; use std::net::IpAddr; use crate::error::ParserError; -use crate::ParserError::IoNotEnoughBytes; -impl ReadUtils for Bytes {} +impl ReadUtils for B {} // Allow reading IPs from Reads pub trait ReadUtils: Buf { - #[inline] - fn has_n_remaining(&self, n: usize) -> Result<(), ParserError> { - if self.remaining() < n { - Err(IoNotEnoughBytes()) - } else { - Ok(()) + #[inline(always)] + fn require_n_remaining(&self, n: usize, target: &'static str) -> Result<(), ParserError> { + if self.remaining() >= n { + return Ok(()); + } + + Err(ParserError::InconsistentFieldLength { + name: target, + expected: n, + found: self.remaining(), + }) + } + + #[inline(always)] + fn expect_remaining_eq(&self, n: usize, target: &'static str) -> Result<(), ParserError> { + if self.remaining() == n { + return Ok(()); } + + Err(ParserError::InconsistentFieldLength { + name: target, + expected: n, + found: self.remaining(), + }) } #[inline] + #[track_caller] fn read_u8(&mut self) -> Result { - self.has_n_remaining(1)?; + self.require_n_remaining(1, file!())?; Ok(self.get_u8()) } #[inline] + #[track_caller] fn read_u16(&mut self) -> Result { - self.has_n_remaining(2)?; + self.require_n_remaining(2, file!())?; Ok(self.get_u16()) } #[inline] + #[track_caller] fn read_u32(&mut self) -> Result { - self.has_n_remaining(4)?; + self.require_n_remaining(4, file!())?; Ok(self.get_u32()) } #[inline] + #[track_caller] fn read_u64(&mut self) -> Result { - self.has_n_remaining(8)?; + self.require_n_remaining(8, file!())?; Ok(self.get_u64()) } + #[inline] + #[track_caller] fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ParserError> { - self.has_n_remaining(buf.len())?; + self.require_n_remaining(buf.len(), file!())?; self.copy_to_slice(buf); Ok(()) } - fn read_address(&mut self, afi: &Afi) -> io::Result { + fn read_address(&mut self, afi: &Afi) -> Result { match afi { - Afi::Ipv4 => match self.read_ipv4_address() { - Ok(ip) => Ok(IpAddr::V4(ip)), - _ => Err(io::Error::new( - io::ErrorKind::Other, - "Cannot parse IPv4 address".to_string(), - )), - }, - Afi::Ipv6 => match self.read_ipv6_address() { - Ok(ip) => Ok(IpAddr::V6(ip)), - _ => Err(io::Error::new( - io::ErrorKind::Other, - "Cannot parse IPv6 address".to_string(), - )), - }, + Afi::Ipv4 => self.read_ipv4_address().map(IpAddr::V4), + Afi::Ipv6 => self.read_ipv6_address().map(IpAddr::V6), } } fn read_ipv4_address(&mut self) -> Result { + self.require_n_remaining(4, "IPv4 Address")?; let addr = self.read_u32()?; Ok(Ipv4Addr::from(addr)) } fn read_ipv6_address(&mut self) -> Result { - self.has_n_remaining(16)?; + self.require_n_remaining(16, "IPv6 Address")?; let buf = self.get_u128(); Ok(Ipv6Addr::from(buf)) } fn read_ipv4_prefix(&mut self) -> Result { + self.require_n_remaining(5, "IPv4 Prefix")?; let addr = self.read_ipv4_address()?; let mask = self.read_u8()?; - match Ipv4Net::new(addr, mask) { - Ok(n) => Ok(n), - Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Invalid prefix mask").into()), - } + Ipv4Net::new(addr, mask).map_err(ParserError::from) } fn read_ipv6_prefix(&mut self) -> Result { + self.require_n_remaining(17, "IPv6 Prefix")?; let addr = self.read_ipv6_address()?; let mask = self.read_u8()?; - match Ipv6Net::new(addr, mask) { - Ok(n) => Ok(n), - Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Invalid prefix mask").into()), - } + Ipv6Net::new(addr, mask).map_err(ParserError::from) } #[inline] @@ -120,13 +124,13 @@ pub trait ReadUtils: Buf { match as_length { AsnLength::Bits16 => { - self.has_n_remaining(count * 2)?; // 2 bytes for 16-bit ASN + self.require_n_remaining(count * 2, "16bit ASNs")?; // 2 bytes for 16-bit ASN for _ in 0..count { path.push(Asn::new_16bit(self.read_u16()?)); } } AsnLength::Bits32 => { - self.has_n_remaining(count * 4)?; // 4 bytes for 32-bit ASN + self.require_n_remaining(count * 4, "32bit ASNs")?; // 4 bytes for 32-bit ASN for _ in 0..count { path.push(Asn::new_32bit(self.read_u32()?)); } @@ -165,13 +169,10 @@ pub trait ReadUtils: Buf { Afi::Ipv4 => { // 4 bytes -- u32 if byte_len > 4 { - return Err(ParserError::ParseError(format!( - "Invalid byte length for IPv4 prefix. byte_len: {}, bit_len: {}", - byte_len, bit_len - ))); + return Err(ParserError::InvalidPrefixLength(ipnet::PrefixLenError)); } let mut buff = [0; 4]; - self.has_n_remaining(byte_len)?; + self.require_n_remaining(byte_len, "IPv4 NLRI Prefix")?; for i in 0..byte_len { buff[i] = self.get_u8(); } @@ -180,12 +181,9 @@ pub trait ReadUtils: Buf { Afi::Ipv6 => { // 16 bytes if byte_len > 16 { - return Err(ParserError::ParseError(format!( - "Invalid byte length for IPv6 prefix. byte_len: {}, bit_len: {}", - byte_len, bit_len - ))); + return Err(ParserError::InvalidPrefixLength(ipnet::PrefixLenError)); } - self.has_n_remaining(byte_len)?; + self.require_n_remaining(byte_len, "IPv6 NLRI Prefix")?; let mut buff = [0; 16]; for i in 0..byte_len { buff[i] = self.get_u8(); @@ -193,21 +191,13 @@ pub trait ReadUtils: Buf { IpAddr::V6(Ipv6Addr::from(buff)) } }; - let prefix = match IpNet::new(addr, bit_len) { - Ok(p) => p, - Err(_) => { - return Err(ParserError::ParseError(format!( - "Invalid network prefix length: {}", - bit_len - ))) - } - }; + let prefix = IpNet::new(addr, bit_len)?; Ok(NetworkPrefix::new(prefix, path_id)) } fn read_n_bytes(&mut self, n_bytes: usize) -> Result, ParserError> { - self.has_n_remaining(n_bytes)?; + self.require_n_remaining(n_bytes, "raw bytes")?; Ok(self.copy_to_bytes(n_bytes).into()) } diff --git a/tests/record_parse.rs b/tests/record_parse.rs new file mode 100644 index 0000000..a2de199 --- /dev/null +++ b/tests/record_parse.rs @@ -0,0 +1,21 @@ +//! This integration test simply checks that no errors are encountered when attempting to parse a +//! update dump and a RIB dump. +use bgpkit_parser::BgpkitParser; + +#[test] +fn parse_updates() { + check_file("https://spaces.bgpkit.org/parser/update-example.gz"); +} + +#[test] +fn parse_rib_dump() { + check_file("https://spaces.bgpkit.org/parser/rib-example-small.bz2"); +} + +fn check_file(url: &str) { + let parser = BgpkitParser::new(url).unwrap(); + + for record in parser.into_record_iter() { + assert!(record.is_ok(), "{}", record.unwrap_err()); + } +}