diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index c080ad64d7..97bb269265 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -524,6 +524,15 @@ // }, // ], + /// Enable stats per key expression. + // stats: { + // filters: [ + // { + // key: "some/key/expression/**", + // } + // ], + // }, + /// Configure internal transport parameters transport: { unicast: { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 5222ff89b0..8ce1a0b8f7 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -404,6 +404,11 @@ pub enum AutoConnectStrategy { GreaterZid, } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct StatsFilterConfig { + pub key: OwnedKeyExpr, +} + pub trait ConfigValidator: Send + Sync { fn check_config( &self, @@ -886,6 +891,11 @@ validated_struct::validator! { /// Configuration of the low-pass filter pub low_pass_filter: Vec, + /// Configuration of the stats per keyexpr + pub stats: #[derive(Default, PartialEq, Eq)] StatsConfig { + filters: Vec, + }, + /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. pub plugins_loading: #[derive(Default)] diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 171efe1bce..d39eebbb56 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -52,6 +52,7 @@ transport_ws = ["zenoh-link/transport_ws"] unstable = ["zenoh-config/unstable", "zenoh-protocol/unstable"] [dependencies] +arc-swap = { workspace = true } async-trait = { workspace = true } crossbeam-utils = { workspace = true } flume = { workspace = true } diff --git a/io/zenoh-transport/src/common/stats.rs b/io/zenoh-transport/src/common/stats.rs index d69faa6703..3e82de5045 100644 --- a/io/zenoh-transport/src/common/stats.rs +++ b/io/zenoh-transport/src/common/stats.rs @@ -11,6 +11,14 @@ // Contributors: // ZettaScale Zenoh Team, // + +macro_rules! ifdef { + (() $code:tt) => {}; + (($($target:tt)+) $code:tt) => { + $code + }; +} + macro_rules! stats_struct { (@field_type ) => {AtomicUsize}; (@field_type $field_type:ident) => {std::sync::Arc<$field_type>}; @@ -18,6 +26,8 @@ macro_rules! stats_struct { (@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}}; (@new($parent:expr, $id:expr) ) => {AtomicUsize::new(0)}; (@new($parent:expr, $id:expr) $field_type:ident) => {$field_type::new($parent, $id)}; + (@from($parent:expr, $field_name:ident) ) => {AtomicUsize::new($parent.$field_name.load(std::sync::atomic::Ordering::Relaxed))}; + (@from($parent:expr, $field_name:ident) $field_type:ident) => {std::sync::Arc::new($field_type::from(&*$parent.$field_name))}; (@report_default ) => {0}; (@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}}; (@get $vis:vis $field_name:ident) => { @@ -84,7 +94,9 @@ macro_rules! stats_struct { ( $(#[$meta:meta])* $vis:vis struct $struct_name:ident { + # PARENT $parent_type:ident $(# DISCRIMINANT $discriminant:literal)? + $(# RECURSIVE $recursive:tt)? $( $(# HELP $help:literal)? $(# TYPE $type:literal)? @@ -96,20 +108,24 @@ macro_rules! stats_struct { paste::paste! { $vis struct $struct_name { labels: std::collections::HashMap, - parent: Option>, + parent: Option>, children: std::sync::Arc>>>, + filtered: arc_swap::ArcSwap>, $( $(#[$field_meta])* $field_vis $field_name: stats_struct!(@field_type $($field_type)?), )* } + $(#[$meta])* $vis struct [<$struct_name Report>] { #[serde(skip)] labels: std::collections::HashMap, #[serde(skip)] children: std::vec::Vec<[<$struct_name Report>]>, + #[serde(skip)] + filtered: Vec, $( $(#[$field_meta])* $field_vis $field_name: stats_struct!(@report_field_type $($field_type)?), @@ -118,7 +134,7 @@ macro_rules! stats_struct { impl $struct_name { $(const DISCRIMINANT: &str = $discriminant;)? - $vis fn new(parent: Option>, labels: std::collections::HashMap) -> std::sync::Arc { + $vis fn new(parent: Option>, labels: std::collections::HashMap) -> std::sync::Arc { let s = $struct_name { labels: labels.clone(), parent: parent.clone(), @@ -126,21 +142,32 @@ macro_rules! stats_struct { ..Default::default() }; let a = std::sync::Arc::new(s); - match parent.and_then(|p| p.upgrade()) { - Some(p) => p.children.lock().unwrap().push(a.clone()), - None => {} - }; + ifdef!(($($recursive)?) { + match parent.and_then(|p| p.upgrade()) { + Some(p) => p.children.lock().unwrap().push(a.clone()), + None => {} + }; + }); a } - $vis fn parent(&self) -> &Option> { + $vis fn parent(&self) -> &Option> { &self.parent } + $vis fn labels(&self) -> &std::collections::HashMap { + &self.labels + } + + $vis fn filtered(&self) -> &arc_swap::ArcSwap> { + &self.filtered + } + $vis fn report(&self) -> [<$struct_name Report>] { let report = [<$struct_name Report>] { labels: self.labels.clone(), children: self.children.lock().unwrap().iter().map(|c| c.report()).collect(), + filtered: self.filtered.load().iter().map(|f| FilteredStatsReport {key: f.key_expr.clone(), stats: f.stats.report()}).collect(), $($field_name: self.[](),)* }; // remove already dropped children @@ -150,23 +177,39 @@ macro_rules! stats_struct { } $( - stats_struct!(@get $vis $field_name $($field_type)?); - stats_struct!(@increment $vis $field_name $($field_type)?); + stats_struct!(@get $field_vis $field_name $($field_type)?); + stats_struct!(@increment $field_vis $field_name $($field_type)?); )* } impl Default for $struct_name { fn default() -> Self { Self { - labels: std::collections::HashMap::default(), + labels: Default::default(), + parent: Default::default(), + children: Default::default(), + filtered: Default::default(), + $($field_name: stats_struct!(@new(Default::default(), Default::default()) $($field_type)?),)* + } + } + } + + impl From<&$parent_type> for $struct_name { + fn from(v: &$parent_type) -> Self { + $struct_name { + labels: std::collections::HashMap::new(), parent: None, - children: std::sync::Arc::new(std::sync::Mutex::new(std::vec::Vec::new())), - $($field_name: stats_struct!(@new(None, std::collections::HashMap::default()) $($field_type)?),)* + $($field_name: stats_struct!(@from(v, $field_name) $($field_type)?),)* + ..Default::default() } } } impl [<$struct_name Report>] { + $vis fn filtered(&self) -> &[FilteredStatsReport] { + &self.filtered + } + #[allow(dead_code)] fn discriminated_openmetrics_text(&self, prefix: &str, disc: &str) -> String { let mut s = String::new(); @@ -209,7 +252,7 @@ macro_rules! stats_struct { s } - $vis fn openmetrics_text(&self) -> String { + fn _openmetrics_text(&self, unlabelled: bool) -> String { let mut s = String::new(); $( $( @@ -226,20 +269,30 @@ macro_rules! stats_struct { s.push_str($type); s.push_str("\n"); )? - stats_struct!(@openmetrics(self, s) $field_name $($field_type)?); + if unlabelled { + stats_struct!(@openmetrics(self, s) $field_name $($field_type)?); + } for c in &self.children { stats_struct!(@openmetrics_labels(c, s, c.labels) $field_name $($field_type)?) } )* + for f in &self.filtered { + s.push_str(&f.stats._openmetrics_text(false)); + } s } + + $vis fn openmetrics_text(&self) -> String { + self._openmetrics_text(true) + } } impl Default for [<$struct_name Report>] { fn default() -> Self { Self { - labels: std::collections::HashMap::default(), - children: std::vec::Vec::default(), + labels: Default::default(), + children: Default::default(), + filtered: Default::default(), $($field_name: stats_struct!(@report_default $($field_type)?),)* } } @@ -251,9 +304,12 @@ macro_rules! stats_struct { use std::sync::atomic::{AtomicUsize, Ordering}; use serde::{Deserialize, Serialize}; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; + stats_struct! { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct AdminStats { + # PARENT AdminStats # DISCRIMINANT "space" pub user, pub admin, @@ -263,6 +319,7 @@ stats_struct! { stats_struct! { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SHMStats { + # PARENT SHMStats # DISCRIMINANT "medium" pub net, pub shm, @@ -272,6 +329,9 @@ stats_struct! { stats_struct! { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TransportStats { + # PARENT TransportStats + # RECURSIVE true + # HELP "Counter of sent bytes." # TYPE "counter" pub tx_bytes, @@ -389,3 +449,73 @@ stats_struct! { pub tx_low_pass_dropped_msgs, } } + +stats_struct! { + #[derive(Clone, Debug, Deserialize, Serialize)] + pub struct LinkStats { + # PARENT TransportStats + pub tx_bytes, + pub tx_t_msgs, + pub rx_bytes, + pub rx_t_msgs, + } +} + +stats_struct! { + #[derive(Clone, Debug, Deserialize, Serialize)] + pub struct MessageStats { + # PARENT MessageStats + # RECURSIVE true + + pub tx_z_put_msgs, + pub tx_z_put_pl_bytes, + pub tx_z_del_msgs, + pub tx_z_del_pl_bytes, + pub tx_z_query_msgs, + pub tx_z_query_pl_bytes, + pub tx_z_reply_msgs, + pub tx_z_reply_pl_bytes, + pub rx_z_put_msgs, + pub rx_z_put_pl_bytes, + pub rx_z_del_msgs, + pub rx_z_del_pl_bytes, + pub rx_z_query_msgs, + pub rx_z_query_pl_bytes, + pub rx_z_reply_msgs, + pub rx_z_reply_pl_bytes, + } +} + +pub struct FilteredStats { + key_expr: OwnedKeyExpr, + stats: std::sync::Arc, +} + +impl FilteredStats { + pub fn new( + key_expr: OwnedKeyExpr, + parent: Option>, + labels: impl Into>, + ) -> Self { + let mut labels = labels.into(); + labels.insert("key_expr".to_string(), key_expr.to_string()); + Self { + key_expr, + stats: MessageStats::new(parent, labels), + } + } + + pub fn key_expr(&self) -> &OwnedKeyExpr { + &self.key_expr + } + + pub fn stats(&self) -> &std::sync::Arc { + &self.stats + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct FilteredStatsReport { + key: OwnedKeyExpr, + stats: MessageStatsReport, +} diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index 7bbc16b715..93ea917bc8 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -33,7 +33,7 @@ use zenoh_result::{zerror, ZResult}; #[cfg(feature = "shared-memory")] use crate::shm_context::UnicastTransportShmContext; #[cfg(feature = "stats")] -use crate::stats::TransportStats; +use crate::stats::{LinkStats, TransportStats}; use crate::{ unicast::{ authentication::TransportAuthId, @@ -231,10 +231,10 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { } #[cfg(feature = "stats")] - fn get_link_stats(&self) -> Vec<(Link, Arc)> { + fn get_link_stats(&self) -> Vec<(Link, Arc)> { self.get_links() .into_iter() - .map(|l| (l, self.stats.clone())) + .map(|l| (l, std::sync::Arc::new(LinkStats::from(&*self.stats)))) .collect() } diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 496152d05d..8aa268d40e 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -43,7 +43,7 @@ use super::{TransportPeer, TransportPeerEventHandler}; #[cfg(feature = "shared-memory")] use crate::shm::TransportShmConfig; #[cfg(feature = "stats")] -use crate::stats::TransportStats; +use crate::stats::LinkStats; use crate::unicast::authentication::TransportAuthId; #[cfg(feature = "auth_usrpwd")] use crate::unicast::establishment::ext::auth::UsrPwdId; @@ -151,7 +151,7 @@ impl TransportUnicast { } #[cfg(feature = "stats")] - pub fn get_link_stats(&self) -> ZResult)>> { + pub fn get_link_stats(&self) -> ZResult)>> { let transport = self.get_inner()?; Ok(transport.get_link_stats()) } diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index e130dc4088..2caf4a989e 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -26,7 +26,7 @@ use zenoh_result::ZResult; use super::link::{LinkUnicastWithOpenAck, MaybeOpenAck}; #[cfg(feature = "stats")] -use crate::stats::TransportStats; +use crate::stats::{LinkStats, TransportStats}; use crate::{ unicast::{link::TransportLinkUnicast, TransportConfigUnicast}, TransportPeerEventHandler, @@ -73,7 +73,7 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { #[cfg(feature = "stats")] fn stats(&self) -> Arc; #[cfg(feature = "stats")] - fn get_link_stats(&self) -> Vec<(Link, Arc)>; + fn get_link_stats(&self) -> Vec<(Link, Arc)>; /*************************************/ /* LINK */ diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index d15fbafe9e..3e8cf2ff4a 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -20,7 +20,7 @@ use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool}; #[cfg(feature = "stats")] -use {crate::common::stats::TransportStats, std::sync::Arc}; +use {crate::common::stats::LinkStats, std::sync::Arc}; use super::transport::TransportUnicastUniversal; use crate::{ @@ -45,7 +45,7 @@ pub(super) struct TransportLinkUnicastUniversal { tracker: TaskTracker, token: CancellationToken, #[cfg(feature = "stats")] - pub(super) stats: Arc, + pub(super) stats: Arc, } impl TransportLinkUnicastUniversal { @@ -81,7 +81,7 @@ impl TransportLinkUnicastUniversal { tracker: TaskTracker::new(), token: CancellationToken::new(), #[cfg(feature = "stats")] - stats: TransportStats::new(Some(Arc::downgrade(&transport.stats)), Default::default()), + stats: LinkStats::new(Some(Arc::downgrade(&transport.stats)), Default::default()), }; (result, consumer) @@ -188,7 +188,7 @@ async fn tx_task( link: &mut TransportLinkUnicastTx, keep_alive: Duration, token: CancellationToken, - #[cfg(feature = "stats")] stats: Arc, + #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { loop { tokio::select! { @@ -254,7 +254,7 @@ async fn rx_task( lease: Duration, rx_buffer_size: usize, token: CancellationToken, - #[cfg(feature = "stats")] stats: Arc, + #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { async fn read( link: &mut TransportLinkUnicastRx, diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 3785aaad02..5743729bb6 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -26,7 +26,7 @@ use zenoh_result::{bail, zerror, ZResult}; use super::transport::TransportUnicastUniversal; #[cfg(feature = "stats")] -use crate::stats::TransportStats; +use crate::stats::LinkStats; use crate::{ common::{ batch::{Decode, RBatch}, @@ -228,7 +228,7 @@ impl TransportUnicastUniversal { &self, mut batch: RBatch, link: &Link, - #[cfg(feature = "stats")] stats: &TransportStats, + #[cfg(feature = "stats")] stats: &LinkStats, ) -> ZResult<()> { while !batch.is_empty() { if let Ok(frame) = batch.decode() { diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 266044b161..90ec14b5a7 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -33,7 +33,7 @@ use zenoh_sync::{event, Notifier, Waiter}; #[cfg(feature = "shared-memory")] use crate::shm_context::UnicastTransportShmContext; #[cfg(feature = "stats")] -use crate::stats::TransportStats; +use crate::stats::{LinkStats, TransportStats}; use crate::{ common::priority::{TransportPriorityRx, TransportPriorityTx}, unicast::{ @@ -375,7 +375,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal { } #[cfg(feature = "stats")] - fn get_link_stats(&self) -> Vec<(Link, Arc)> { + fn get_link_stats(&self) -> Vec<(Link, Arc)> { zread!(self.links) .iter() .map(|l| (l.link.link(), l.stats.clone())) diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index f0f67e63f1..1afed77c9b 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -18,32 +18,36 @@ //! //! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) //! -mod access_control; -use access_control::acl_interceptor_factories; -use nonempty_collections::NEVec; -use zenoh_link::LinkAuthId; - -mod authorization; use std::any::Any; -mod low_pass; -use low_pass::low_pass_interceptor_factories; +use nonempty_collections::NEVec; use zenoh_config::{Config, InterceptorFlow, InterceptorLink}; use zenoh_keyexpr::{keyexpr, OwnedKeyExpr}; +use zenoh_link::LinkAuthId; use zenoh_protocol::network::NetworkMessageMut; use zenoh_result::ZResult; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; -pub mod downsampling; use crate::{ key_expr::KeyExpr, net::routing::{ - dispatcher::face::Face, interceptor::downsampling::downsampling_interceptor_factories, + dispatcher::face::Face, + interceptor::{ + access_control::acl_interceptor_factories, + downsampling::downsampling_interceptor_factories, + low_pass::low_pass_interceptor_factories, + qos_overwrite::qos_overwrite_interceptor_factories, + }, }, }; -pub mod qos_overwrite; -use crate::net::routing::interceptor::qos_overwrite::qos_overwrite_interceptor_factories; +mod access_control; +mod authorization; +mod downsampling; +mod low_pass; +mod qos_overwrite; +#[cfg(feature = "stats")] +mod stats; #[derive(Default, Debug)] pub struct InterfaceEnabled { @@ -134,6 +138,12 @@ pub(crate) fn interceptor_factories(config: &Config) -> ZResult +// + +//! ⚠️ WARNING ⚠️ +//! +//! This module is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) + +use std::{ + borrow::Cow, + sync::{Arc, Weak}, +}; + +use zenoh_buffers::buffer::Buffer; +use zenoh_config::StatsConfig; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, IKeyExprTreeNode, KeBoxTree}; +use zenoh_protocol::{ + network::NetworkBodyMut, + zenoh::{ext::AttachmentType, reply::ReplyBody, PushBody, RequestBody, ResponseBody}, +}; +use zenoh_result::ZResult; +use zenoh_transport::stats::{FilteredStats, MessageStats, TransportStats}; + +use crate::net::routing::interceptor::*; + +pub(crate) fn stats_interceptor_factories( + config: &StatsConfig, +) -> ZResult> { + if config.filters().is_empty() { + return Ok(vec![]); + } + Ok(vec![Box::new(StatsInterceptorFactory::new(config))]) +} + +struct StatsInterceptorFactory { + parent_stats: Arc>, +} + +impl StatsInterceptorFactory { + fn new(conf: &StatsConfig) -> Self { + Self { + parent_stats: Arc::new( + conf.filters() + .iter() + .map(|filter| FilteredStats::new(filter.key.clone(), None, [])) + .collect(), + ), + } + } + + fn register_filtered_stats(&self, stats: &TransportStats) { + if let Some(parent) = stats.parent().as_ref().and_then(Weak::upgrade) { + parent.filtered().store(self.parent_stats.clone()) + } + stats.filtered().store(Arc::new( + self.parent_stats + .iter() + .map(|s| { + FilteredStats::new( + s.key_expr().clone(), + Some(Arc::downgrade(s.stats())), + stats.labels().clone(), + ) + }) + .collect(), + )) + } + + fn make_interceptor(&self, stats: &TransportStats, flow: InterceptorFlow) -> Interceptor { + if let Some(parent) = stats.parent().as_ref().and_then(Weak::upgrade) { + parent.filtered().store(self.parent_stats.clone()) + } + let mut filters_tree = KeBoxTree::new(); + for s in stats.filtered().load().iter() { + filters_tree.insert(s.key_expr(), s.stats().clone()); + } + Box::new(StatsInterceptor { filters_tree, flow }) + } +} + +impl InterceptorFactoryTrait for StatsInterceptorFactory { + fn new_transport_unicast( + &self, + transport: &TransportUnicast, + ) -> (Option, Option) { + let Ok(stats) = transport.get_stats() else { + return (None, None); + }; + self.register_filtered_stats(&stats); + ( + Some(self.make_interceptor(&stats, InterceptorFlow::Ingress)), + Some(self.make_interceptor(&stats, InterceptorFlow::Egress)), + ) + } + + fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option { + let stats = transport.get_stats().ok()?; + self.register_filtered_stats(&stats); + Some(self.make_interceptor(&stats, InterceptorFlow::Egress)) + } + + fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option { + let stats = transport.get_stats().ok()?; + Some(self.make_interceptor(&stats, InterceptorFlow::Ingress)) + } +} + +struct StatsInterceptor { + filters_tree: KeBoxTree>, + flow: InterceptorFlow, +} + +impl StatsInterceptor { + fn compute_filtered_stats(&self, key_expr: &keyexpr) -> Vec> { + self.filters_tree + .intersecting_nodes(key_expr) + .filter_map(|n| n.weight().cloned()) + .collect() + } + + fn get_or_compute_filtered_stats<'a>( + &self, + msg: &NetworkMessageMut, + ctx: &'a dyn InterceptorContext, + ) -> Cow<'a, [Arc]> { + match ctx + .get_cache(msg) + .and_then(|c| c.downcast_ref::>>()) + { + Some(v) => Cow::Borrowed(v), + None => ctx.full_keyexpr(msg).map_or_else(Cow::default, |k| { + Cow::Owned(self.compute_filtered_stats(&k)) + }), + } + } + + fn incr_stats( + &self, + filtered_stats: &[Arc], + payload_bytes: usize, + ingress: (IncMsgs, IncPlBytes), + egress: (IncMsgs, IncPlBytes), + ) { + for stats in filtered_stats { + match self.flow { + InterceptorFlow::Ingress => { + let (rx_msgs, rx_pl_bytes) = ingress; + rx_msgs(stats, 1); + rx_pl_bytes(stats, payload_bytes); + } + InterceptorFlow::Egress => { + let (tx_msgs, tx_pl_bytes) = egress; + tx_msgs(stats, 1); + tx_pl_bytes(stats, payload_bytes); + } + } + } + } +} + +type IncMsgs = fn(&MessageStats, usize); +type IncPlBytes = fn(&MessageStats, usize); + +impl InterceptorTrait for StatsInterceptor { + fn compute_keyexpr_cache(&self, key_expr: &keyexpr) -> Option> { + Some(Box::new(self.compute_filtered_stats(key_expr))) + } + + fn intercept(&self, msg: &mut NetworkMessageMut, ctx: &mut dyn InterceptorContext) -> bool { + fn attachment_size(attachment: &Option>) -> usize { + attachment.as_ref().map_or(0, |a| a.buffer.len()) + } + let filtered_stats = || self.get_or_compute_filtered_stats(msg, ctx); + match &msg.body { + NetworkBodyMut::Push(msg) => match &msg.payload { + PushBody::Put(put) => self.incr_stats( + &filtered_stats(), + put.payload.len() + attachment_size(&put.ext_attachment), + ( + MessageStats::inc_rx_z_put_msgs, + MessageStats::inc_rx_z_put_pl_bytes, + ), + ( + MessageStats::inc_tx_z_put_msgs, + MessageStats::inc_tx_z_put_pl_bytes, + ), + ), + PushBody::Del(del) => self.incr_stats( + &filtered_stats(), + attachment_size(&del.ext_attachment), + ( + MessageStats::inc_rx_z_del_msgs, + MessageStats::inc_rx_z_del_pl_bytes, + ), + ( + MessageStats::inc_tx_z_del_msgs, + MessageStats::inc_tx_z_del_pl_bytes, + ), + ), + }, + + NetworkBodyMut::Request(msg) => match &msg.payload { + RequestBody::Query(query) => self.incr_stats( + &filtered_stats(), + attachment_size(&query.ext_attachment), + ( + MessageStats::inc_rx_z_query_msgs, + MessageStats::inc_rx_z_query_pl_bytes, + ), + ( + MessageStats::inc_tx_z_query_msgs, + MessageStats::inc_tx_z_query_pl_bytes, + ), + ), + }, + NetworkBodyMut::Response(msg) => { + let payload_bytes = match &msg.payload { + ResponseBody::Reply(reply) => match &reply.payload { + ReplyBody::Put(put) => { + put.payload.len() + attachment_size(&put.ext_attachment) + } + ReplyBody::Del(del) => attachment_size(&del.ext_attachment), + }, + ResponseBody::Err(err) => err.payload.len(), + }; + self.incr_stats( + &filtered_stats(), + payload_bytes, + ( + MessageStats::inc_rx_z_reply_msgs, + MessageStats::inc_rx_z_reply_pl_bytes, + ), + ( + MessageStats::inc_tx_z_reply_msgs, + MessageStats::inc_tx_z_reply_pl_bytes, + ), + ) + } + NetworkBodyMut::ResponseFinal(_) + | NetworkBodyMut::Interest(_) + | NetworkBodyMut::Declare(_) + | NetworkBodyMut::OAM(_) => {} + } + true + } +} diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 77f34ac3e0..345f1ae677 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -40,7 +40,7 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; #[cfg(feature = "stats")] -use zenoh_transport::stats::TransportStats; +use zenoh_transport::stats::{LinkStats, TransportStats}; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast, TransportPeer}; use super::{routing::dispatcher::face::Face, Runtime}; @@ -580,7 +580,21 @@ fn local_data(context: &AdminContext, query: Query) { .iter() .any(|(k, v)| k == "_stats" && v != "false"); #[cfg(feature = "stats")] - let insert_stats = |mut json: serde_json::Value, stats: Option<&Arc>| { + let insert_transportstats = + |mut json: serde_json::Value, stats: Option<&Arc>| { + if export_stats { + let report = stats.map(|s| s.report()); + let map = json.as_object_mut().unwrap(); + map.insert("stats".into(), json!(report)); + map.insert( + "filtered_stats".into(), + json!(report.as_ref().map(|r| r.filtered())), + ); + } + json + }; + #[cfg(feature = "stats")] + let insert_linkstats = |mut json: serde_json::Value, stats: Option<&Arc>| { if export_stats { json.as_object_mut() .unwrap() @@ -629,7 +643,7 @@ fn local_data(context: &AdminContext, query: Query) { .get_link_stats() .unwrap_or_default() .iter() - .map(|(link, stats)| insert_stats(link_to_json(link), Some(stats))) + .map(|(link, stats)| insert_linkstats(link_to_json(link), Some(stats))) .collect_vec(); #[cfg(feature = "shared-memory")] let shm = transport.is_shm().unwrap_or_default(); @@ -643,7 +657,7 @@ fn local_data(context: &AdminContext, query: Query) { "shm": shm, }); #[cfg(feature = "stats")] - let json = insert_stats(json, transport.get_stats().ok().as_ref()); + let json = insert_transportstats(json, transport.get_stats().ok().as_ref()); json }; let transport_multicast_peer_to_json = @@ -667,7 +681,7 @@ fn local_data(context: &AdminContext, query: Query) { "links": links, }); #[cfg(feature = "stats")] - let json = insert_stats(json, transport.get_stats().ok().as_ref()); + let json = insert_transportstats(json, transport.get_stats().ok().as_ref()); json }; let mut transports: Vec = vec![]; @@ -695,7 +709,7 @@ fn local_data(context: &AdminContext, query: Query) { "plugins": plugins, }); #[cfg(feature = "stats")] - let json = insert_stats(json, Some(&transport_mgr.get_stats())); + let json = insert_transportstats(json, Some(&transport_mgr.get_stats())); tracing::trace!("AdminSpace router_data: {:?}", json); let payload = match serde_json::to_vec(&json) {