Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@
// },
// ],

/// Enable stats per key expression.
// stats: {
// filters: [
// {
// key: "some/key/expression/**",
// }
// ],
// },

/// Configure internal transport parameters
transport: {
unicast: {
Expand Down
10 changes: 10 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ pub enum AutoConnectStrategy {
GreaterZid,
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct StatsFilterConfig {
pub key: OwnedKeyExpr,
}

pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
Expand Down Expand Up @@ -886,6 +891,11 @@ validated_struct::validator! {
/// Configuration of the low-pass filter
pub low_pass_filter: Vec<LowPassFilterConf>,

/// Configuration of the stats per keyexpr
pub stats: #[derive(Default, PartialEq, Eq)] StatsConfig {
filters: Vec<StatsFilterConfig>,
},

/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
/// The executable's current directory will be added to the search paths.
pub plugins_loading: #[derive(Default)]
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ transport_ws = ["zenoh-link/transport_ws"]
unstable = ["zenoh-config/unstable", "zenoh-protocol/unstable"]

[dependencies]
arc-swap = { workspace = true }
async-trait = { workspace = true }
crossbeam-utils = { workspace = true }
flume = { workspace = true }
Expand Down
162 changes: 146 additions & 16 deletions io/zenoh-transport/src/common/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,23 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

macro_rules! ifdef {
(() $code:tt) => {};
(($($target:tt)+) $code:tt) => {
$code
};
}

macro_rules! stats_struct {
(@field_type ) => {AtomicUsize};
(@field_type $field_type:ident) => {std::sync::Arc<$field_type>};
(@report_field_type ) => {usize};
(@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}};
(@new($parent:expr, $id:expr) ) => {AtomicUsize::new(0)};
(@new($parent:expr, $id:expr) $field_type:ident) => {$field_type::new($parent, $id)};
(@from($parent:expr, $field_name:ident) ) => {AtomicUsize::new($parent.$field_name.load(std::sync::atomic::Ordering::Relaxed))};
(@from($parent:expr, $field_name:ident) $field_type:ident) => {std::sync::Arc::new($field_type::from(&*$parent.$field_name))};
(@report_default ) => {0};
(@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}};
(@get $vis:vis $field_name:ident) => {
Expand Down Expand Up @@ -84,7 +94,9 @@ macro_rules! stats_struct {
(
$(#[$meta:meta])*
$vis:vis struct $struct_name:ident {
# PARENT $parent_type:ident
$(# DISCRIMINANT $discriminant:literal)?
$(# RECURSIVE $recursive:tt)?
$(
$(# HELP $help:literal)?
$(# TYPE $type:literal)?
Expand All @@ -96,20 +108,24 @@ macro_rules! stats_struct {
paste::paste! {
$vis struct $struct_name {
labels: std::collections::HashMap<String, String>,
parent: Option<std::sync::Weak<$struct_name>>,
parent: Option<std::sync::Weak<$parent_type>>,
children: std::sync::Arc<std::sync::Mutex<std::vec::Vec<std::sync::Arc<$struct_name>>>>,
filtered: arc_swap::ArcSwap<Vec<FilteredStats>>,
$(
$(#[$field_meta])*
$field_vis $field_name: stats_struct!(@field_type $($field_type)?),
)*
}


$(#[$meta])*
$vis struct [<$struct_name Report>] {
#[serde(skip)]
labels: std::collections::HashMap<String, String>,
#[serde(skip)]
children: std::vec::Vec<[<$struct_name Report>]>,
#[serde(skip)]
filtered: Vec<FilteredStatsReport>,
$(
$(#[$field_meta])*
$field_vis $field_name: stats_struct!(@report_field_type $($field_type)?),
Expand All @@ -118,29 +134,40 @@ macro_rules! stats_struct {

impl $struct_name {
$(const DISCRIMINANT: &str = $discriminant;)?
$vis fn new(parent: Option<std::sync::Weak<$struct_name>>, labels: std::collections::HashMap<String, String>) -> std::sync::Arc<Self> {
$vis fn new(parent: Option<std::sync::Weak<$parent_type>>, labels: std::collections::HashMap<String, String>) -> std::sync::Arc<Self> {
let s = $struct_name {
labels: labels.clone(),
parent: parent.clone(),
$($field_name: stats_struct!(@new(parent.as_ref().and_then(|p| p.upgrade()).map(|p| std::sync::Arc::downgrade(&p.$field_name)), labels.clone()) $($field_type)?),)*
..Default::default()
};
let a = std::sync::Arc::new(s);
match parent.and_then(|p| p.upgrade()) {
Some(p) => p.children.lock().unwrap().push(a.clone()),
None => {}
};
ifdef!(($($recursive)?) {
match parent.and_then(|p| p.upgrade()) {
Some(p) => p.children.lock().unwrap().push(a.clone()),
None => {}
};
});
a
}

$vis fn parent(&self) -> &Option<std::sync::Weak<$struct_name>> {
$vis fn parent(&self) -> &Option<std::sync::Weak<$parent_type>> {
&self.parent
}

$vis fn labels(&self) -> &std::collections::HashMap<String, String> {
&self.labels
}

$vis fn filtered(&self) -> &arc_swap::ArcSwap<Vec<FilteredStats>> {
&self.filtered
}

$vis fn report(&self) -> [<$struct_name Report>] {
let report = [<$struct_name Report>] {
labels: self.labels.clone(),
children: self.children.lock().unwrap().iter().map(|c| c.report()).collect(),
filtered: self.filtered.load().iter().map(|f| FilteredStatsReport {key: f.key_expr.clone(), stats: f.stats.report()}).collect(),
$($field_name: self.[<get_ $field_name>](),)*
};
// remove already dropped children
Expand All @@ -150,23 +177,39 @@ macro_rules! stats_struct {
}

$(
stats_struct!(@get $vis $field_name $($field_type)?);
stats_struct!(@increment $vis $field_name $($field_type)?);
stats_struct!(@get $field_vis $field_name $($field_type)?);
stats_struct!(@increment $field_vis $field_name $($field_type)?);
)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
labels: std::collections::HashMap::default(),
labels: Default::default(),
parent: Default::default(),
children: Default::default(),
filtered: Default::default(),
$($field_name: stats_struct!(@new(Default::default(), Default::default()) $($field_type)?),)*
}
}
}

impl From<&$parent_type> for $struct_name {
fn from(v: &$parent_type) -> Self {
$struct_name {
labels: std::collections::HashMap::new(),
parent: None,
children: std::sync::Arc::new(std::sync::Mutex::new(std::vec::Vec::new())),
$($field_name: stats_struct!(@new(None, std::collections::HashMap::default()) $($field_type)?),)*
$($field_name: stats_struct!(@from(v, $field_name) $($field_type)?),)*
..Default::default()
}
}
}

impl [<$struct_name Report>] {
$vis fn filtered(&self) -> &[FilteredStatsReport] {
&self.filtered
}

#[allow(dead_code)]
fn discriminated_openmetrics_text(&self, prefix: &str, disc: &str) -> String {
let mut s = String::new();
Expand Down Expand Up @@ -209,7 +252,7 @@ macro_rules! stats_struct {
s
}

$vis fn openmetrics_text(&self) -> String {
fn _openmetrics_text(&self, unlabelled: bool) -> String {
let mut s = String::new();
$(
$(
Expand All @@ -226,20 +269,30 @@ macro_rules! stats_struct {
s.push_str($type);
s.push_str("\n");
)?
stats_struct!(@openmetrics(self, s) $field_name $($field_type)?);
if unlabelled {
stats_struct!(@openmetrics(self, s) $field_name $($field_type)?);
}
for c in &self.children {
stats_struct!(@openmetrics_labels(c, s, c.labels) $field_name $($field_type)?)
}
)*
for f in &self.filtered {
s.push_str(&f.stats._openmetrics_text(false));
}
s
}

$vis fn openmetrics_text(&self) -> String {
self._openmetrics_text(true)
}
}

impl Default for [<$struct_name Report>] {
fn default() -> Self {
Self {
labels: std::collections::HashMap::default(),
children: std::vec::Vec::default(),
labels: Default::default(),
children: Default::default(),
filtered: Default::default(),
$($field_name: stats_struct!(@report_default $($field_type)?),)*
}
}
Expand All @@ -251,9 +304,12 @@ macro_rules! stats_struct {
use std::sync::atomic::{AtomicUsize, Ordering};

use serde::{Deserialize, Serialize};
use zenoh_protocol::core::key_expr::OwnedKeyExpr;

stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct AdminStats {
# PARENT AdminStats
# DISCRIMINANT "space"
pub user,
pub admin,
Expand All @@ -263,6 +319,7 @@ stats_struct! {
stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SHMStats {
# PARENT SHMStats
# DISCRIMINANT "medium"
pub net,
pub shm,
Expand All @@ -272,6 +329,9 @@ stats_struct! {
stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TransportStats {
# PARENT TransportStats
# RECURSIVE true

# HELP "Counter of sent bytes."
# TYPE "counter"
pub tx_bytes,
Expand Down Expand Up @@ -389,3 +449,73 @@ stats_struct! {
pub tx_low_pass_dropped_msgs,
}
}

stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LinkStats {
# PARENT TransportStats
pub tx_bytes,
pub tx_t_msgs,
pub rx_bytes,
pub rx_t_msgs,
}
}

stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MessageStats {
# PARENT MessageStats
# RECURSIVE true

pub tx_z_put_msgs,
pub tx_z_put_pl_bytes,
pub tx_z_del_msgs,
pub tx_z_del_pl_bytes,
pub tx_z_query_msgs,
pub tx_z_query_pl_bytes,
pub tx_z_reply_msgs,
pub tx_z_reply_pl_bytes,
pub rx_z_put_msgs,
pub rx_z_put_pl_bytes,
pub rx_z_del_msgs,
pub rx_z_del_pl_bytes,
pub rx_z_query_msgs,
pub rx_z_query_pl_bytes,
pub rx_z_reply_msgs,
pub rx_z_reply_pl_bytes,
}
}

pub struct FilteredStats {
key_expr: OwnedKeyExpr,
stats: std::sync::Arc<MessageStats>,
}

impl FilteredStats {
pub fn new(
key_expr: OwnedKeyExpr,
parent: Option<std::sync::Weak<MessageStats>>,
labels: impl Into<std::collections::HashMap<String, String>>,
) -> Self {
let mut labels = labels.into();
labels.insert("key_expr".to_string(), key_expr.to_string());
Self {
key_expr,
stats: MessageStats::new(parent, labels),
}
}

pub fn key_expr(&self) -> &OwnedKeyExpr {
&self.key_expr
}

pub fn stats(&self) -> &std::sync::Arc<MessageStats> {
&self.stats
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct FilteredStatsReport {
key: OwnedKeyExpr,
stats: MessageStatsReport,
}
6 changes: 3 additions & 3 deletions io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use zenoh_result::{zerror, ZResult};
#[cfg(feature = "shared-memory")]
use crate::shm_context::UnicastTransportShmContext;
#[cfg(feature = "stats")]
use crate::stats::TransportStats;
use crate::stats::{LinkStats, TransportStats};
use crate::{
unicast::{
authentication::TransportAuthId,
Expand Down Expand Up @@ -231,10 +231,10 @@ impl TransportUnicastTrait for TransportUnicastLowlatency {
}

#[cfg(feature = "stats")]
fn get_link_stats(&self) -> Vec<(Link, Arc<TransportStats>)> {
fn get_link_stats(&self) -> Vec<(Link, Arc<LinkStats>)> {
self.get_links()
.into_iter()
.map(|l| (l, self.stats.clone()))
.map(|l| (l, std::sync::Arc::new(LinkStats::from(&*self.stats))))
.collect()
}

Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/unicast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::{TransportPeer, TransportPeerEventHandler};
#[cfg(feature = "shared-memory")]
use crate::shm::TransportShmConfig;
#[cfg(feature = "stats")]
use crate::stats::TransportStats;
use crate::stats::LinkStats;
use crate::unicast::authentication::TransportAuthId;
#[cfg(feature = "auth_usrpwd")]
use crate::unicast::establishment::ext::auth::UsrPwdId;
Expand Down Expand Up @@ -151,7 +151,7 @@ impl TransportUnicast {
}

#[cfg(feature = "stats")]
pub fn get_link_stats(&self) -> ZResult<Vec<(Link, Arc<TransportStats>)>> {
pub fn get_link_stats(&self) -> ZResult<Vec<(Link, Arc<LinkStats>)>> {
let transport = self.get_inner()?;
Ok(transport.get_link_stats())
}
Expand Down
Loading
Loading