From 55b440eedbed977d4bbf6f55452b41d0153f20c2 Mon Sep 17 00:00:00 2001 From: m09526 Date: Wed, 9 Jul 2025 12:15:00 +0000 Subject: [PATCH 1/8] LoggingStore added --- Cargo.toml | 4 +++- src/lib.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1264eb7..d8a505f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ futures = "0.3" http = "1.2.0" humantime = "2.1" itertools = "0.14.0" +log = "0.4.27" parking_lot = { version = "0.12" } percent-encoding = "2.1" thiserror = "2.0.2" @@ -83,10 +84,11 @@ integration = ["rand"] hyper = { version = "1.2", features = ["server"] } hyper-util = "0.1" rand = "0.9" -tempfile = "3.1.0" regex = "1.11.1" # The "gzip" feature for reqwest is enabled for an integration test. reqwest = { version = "0.12", features = ["gzip"] } +tempfile = "3.1.0" +testing_logger = "0.1.1" [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dev-dependencies] wasm-bindgen-test = "0.3.50" diff --git a/src/lib.rs b/src/lib.rs index 06edd33..69713d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -521,6 +521,7 @@ pub mod http; pub mod limit; #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] pub mod local; +pub mod logging; pub mod memory; pub mod path; pub mod prefix; From bf8f73179a46ba877a00bef4526adc088113116e Mon Sep 17 00:00:00 2001 From: m09526 Date: Wed, 9 Jul 2025 12:33:33 +0000 Subject: [PATCH 2/8] Extra file added --- src/logging.rs | 635 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 635 insertions(+) create mode 100644 src/logging.rs diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..e19f8bd --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,635 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An object store that logs calls to the wrapped implementation. + +use crate::{ + path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, +}; +use async_trait::async_trait; +use futures::stream::BoxStream; +use log::info; + +/// An [`ObjectStore`] wrapper that logs operations made to the wrapped store. +#[derive(Debug)] +pub struct LoggingStore { + store: T, + prefix: String, + path_prefix: String, +} + +impl LoggingStore { + /// Create a new logging store by wrapping an inner store. + #[must_use] + pub fn new(inner: T, prefix: impl Into, path_prefix: impl Into) -> Self { + Self { + store: inner, + prefix: prefix.into(), + path_prefix: path_prefix.into(), + } + } +} + +impl std::fmt::Display for LoggingStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LoggingStore \"{}\" path prefix: \"{}\" ({})", + self.prefix, self.path_prefix, self.store + ) + } +} + +#[async_trait] +impl ObjectStore for LoggingStore { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if !options.head { + match &options.range { + Some(GetRange::Bounded(get_range)) => { + let len = get_range + .end + .checked_sub(get_range.start) + .expect("Get range length is negative"); + info!( + "{} get request for {}/{} byte range {} to {} = {} bytes", + self.prefix, + self.path_prefix, + location, + get_range.start, + get_range.end, + len, + ); + } + Some(GetRange::Offset(start_pos)) => { + info!( + "{} get request for {}/{} for byte {} to EOF", + self.prefix, self.path_prefix, location, start_pos, + ); + } + Some(GetRange::Suffix(pos)) => { + info!( + "{} get request for {}/{} for last {} bytes of object", + self.prefix, self.path_prefix, location, pos, + ); + } + None => { + info!( + "{} get request for {}/{} for complete file range", + self.prefix, self.path_prefix, location + ); + } + } + } + self.store.get_opts(location, options).await + } + + async fn head(&self, location: &Path) -> Result { + info!( + "{} head request for {}/{}", + self.prefix, self.path_prefix, location + ); + self.store.head(location).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + info!( + "{} delete request for {}/{}", + self.prefix, self.path_prefix, location + ); + self.store.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + info!( + "{} list request for {}/{}", + self.prefix, + self.path_prefix, + prefix.unwrap_or(&Path::default()) + ); + self.store.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + info!( + "{} list_with_delimeter request for {}/{}", + self.prefix, + self.path_prefix, + prefix.unwrap_or(&Path::default()) + ); + self.store.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + info!( + "{} copy request from {}/{} to {}/{}", + self.prefix, self.path_prefix, from, self.path_prefix, to + ); + self.store.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + info!( + "{} copy_if_not_exists request from {}/{} to {}/{}", + self.prefix, self.path_prefix, from, self.path_prefix, to + ); + self.store.copy_if_not_exists(from, to).await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + info!( + "{} put request for {}/{} of {} bytes", + self.prefix, + self.path_prefix, + location, + payload.content_length() + ); + self.store.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + info!( + "{} put multipart request for {}/{}", + self.prefix, self.path_prefix, location + ); + let part_upload = self.store.put_multipart_opts(location, opts).await?; + Ok(Box::new(LoggingMultipartUpload::new( + part_upload, + &self.prefix, + format!("{}/{}", self.path_prefix, location), + )) as Box) + } +} + +#[derive(Debug)] +struct LoggingMultipartUpload { + inner: Box, + prefix: String, + path: String, +} + +impl LoggingMultipartUpload { + fn new( + inner: Box, + prefix: impl Into, + path: impl Into, + ) -> Self { + Self { + inner, + prefix: prefix.into(), + path: path.into(), + } + } +} + +#[async_trait] +impl MultipartUpload for LoggingMultipartUpload { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + info!( + "{} put_part request for {} of {} bytes", + self.prefix, + self.path, + data.content_length() + ); + self.inner.put_part(data) + } + + async fn complete(&mut self) -> Result { + info!("multipart complete for {}", self.path); + self.inner.complete().await + } + + async fn abort(&mut self) -> Result<()> { + self.inner.abort().await + } +} + +#[cfg(test)] +mod tests { + use crate::{ + integration::*, logging::LoggingStore, memory::InMemory, GetOptions, GetRange, ObjectStore, + PutOptions, Result, + }; + use log::Level; + + #[tokio::test] + async fn log_test() { + let integration = make_store(); + + put_get_delete_list(&integration).await; + get_opts(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + put_opts(&integration, true).await; + put_get_attributes(&integration).await; + } + + fn make_store() -> LoggingStore { + let inner = InMemory::new(); + LoggingStore::new(inner, "TEST", "memory:/") + } + + #[tokio::test] + async fn zero_log() { + // Given + testing_logger::setup(); + let _store = make_store(); + + // When + // no-op + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 0); + }); + } + + #[tokio::test] + async fn ranged_get_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + store.get_range(&"test_file".into(), 1..5).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST get request for memory://test_file byte range 1 to 5 = 4 bytes" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn offset_get_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + let opts = GetOptions { + range: Some(GetRange::Offset(3)), + ..Default::default() + }; + store.get_opts(&"test_file".into(), opts).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST get request for memory://test_file for byte 3 to EOF" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn suffix_get_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + let opts = GetOptions { + range: Some(GetRange::Suffix(3)), + ..Default::default() + }; + store.get_opts(&"test_file".into(), opts).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST get request for memory://test_file for last 3 bytes of object" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn no_range_get_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + let opts = GetOptions::default(); + store.get_opts(&"test_file".into(), opts).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST get request for memory://test_file for complete file range" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn head_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + store.head(&"test_file".into()).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST head request for memory://test_file" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn delete_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + store.delete(&"test_file".into()).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST delete request for memory://test_file" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn list_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + + // When + #[allow(unused_must_use)] + store.list(Some(&"foo".into())); + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 1); + assert_eq!(captured_logs[0].body, "TEST list request for memory://foo"); + assert_eq!(captured_logs[0].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn list_with_delimeter_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + + // When + #[allow(unused_must_use)] + store.list_with_delimiter(Some(&"foo".into())).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 1); + assert_eq!( + captured_logs[0].body, + "TEST list_with_delimeter request for memory://foo" + ); + assert_eq!(captured_logs[0].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn list_path_none_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + + // When + #[allow(unused_must_use)] + store.list(None); + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 1); + assert_eq!(captured_logs[0].body, "TEST list request for memory://"); + assert_eq!(captured_logs[0].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn list_with_delimeter_path_none_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + + // When + #[allow(unused_must_use)] + store.list_with_delimiter(None).await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 1); + assert_eq!( + captured_logs[0].body, + "TEST list_with_delimeter request for memory://" + ); + assert_eq!(captured_logs[0].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn copy_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + store + .copy(&"test_file".into(), &"test_file2".into()) + .await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST copy request from memory://test_file to memory://test_file2" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn copy_if_not_exists_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + store.put(&"test_file".into(), "some_data".into()).await?; + + // When + store + .copy_if_not_exists(&"test_file".into(), &"test_file2".into()) + .await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 2); + assert_eq!( + captured_logs[1].body, + "TEST copy_if_not_exists request from memory://test_file to memory://test_file2" + ); + assert_eq!(captured_logs[1].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn put_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + + // When + store + .put_opts(&"test_file".into(), "foo".into(), PutOptions::default()) + .await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 1); + assert_eq!( + captured_logs[0].body, + "TEST put request for memory://test_file of 3 bytes" + ); + assert_eq!(captured_logs[0].level, Level::Info); + }); + + Ok(()) + } + + #[tokio::test] + async fn put_multipart_log() -> Result<()> { + // Given + testing_logger::setup(); + let store = make_store(); + + // When + let mut part = store.put_multipart(&"test_file".into()).await?; + part.put_part("foo".into()).await?; + part.put_part("foo1".into()).await?; + part.put_part("foo12".into()).await?; + part.complete().await?; + + // Then + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 5); + assert_eq!( + captured_logs[0].body, + "TEST put multipart request for memory://test_file" + ); + assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!( + captured_logs[1].body, + "TEST put_part request for memory://test_file of 3 bytes" + ); + assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!( + captured_logs[2].body, + "TEST put_part request for memory://test_file of 4 bytes" + ); + assert_eq!(captured_logs[2].level, Level::Info); + assert_eq!( + captured_logs[3].body, + "TEST put_part request for memory://test_file of 5 bytes" + ); + assert_eq!(captured_logs[3].level, Level::Info); + assert_eq!( + captured_logs[4].body, + "multipart complete for memory://test_file" + ); + assert_eq!(captured_logs[4].level, Level::Info); + }); + + let retrieved_data = String::from_utf8( + store + .get(&"test_file".into()) + .await? + .bytes() + .await? + .to_vec(), + ) + .expect("String should be valid UTF-8"); + assert_eq!(retrieved_data, "foofoo1foo12"); + Ok(()) + } +} From 770499f51096eeab095c2df868ae547ac0893158 Mon Sep 17 00:00:00 2001 From: m09526 Date: Wed, 9 Jul 2025 12:44:26 +0000 Subject: [PATCH 3/8] Change to debug level --- src/logging.rs | 71 +++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/src/logging.rs b/src/logging.rs index e19f8bd..cf4864a 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -16,16 +16,17 @@ // under the License. //! An object store that logs calls to the wrapped implementation. - use crate::{ path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use futures::stream::BoxStream; -use log::info; +use log::debug; -/// An [`ObjectStore`] wrapper that logs operations made to the wrapped store. +/// An [`ObjectStore`] wrapper that logs operations made to the wrapped store. The logs are written using the ['log'] crate. +/// +/// Logs are written at the "debug" logging level. #[derive(Debug)] pub struct LoggingStore { store: T, @@ -65,7 +66,7 @@ impl ObjectStore for LoggingStore { .end .checked_sub(get_range.start) .expect("Get range length is negative"); - info!( + debug!( "{} get request for {}/{} byte range {} to {} = {} bytes", self.prefix, self.path_prefix, @@ -76,19 +77,19 @@ impl ObjectStore for LoggingStore { ); } Some(GetRange::Offset(start_pos)) => { - info!( + debug!( "{} get request for {}/{} for byte {} to EOF", self.prefix, self.path_prefix, location, start_pos, ); } Some(GetRange::Suffix(pos)) => { - info!( + debug!( "{} get request for {}/{} for last {} bytes of object", self.prefix, self.path_prefix, location, pos, ); } None => { - info!( + debug!( "{} get request for {}/{} for complete file range", self.prefix, self.path_prefix, location ); @@ -99,7 +100,7 @@ impl ObjectStore for LoggingStore { } async fn head(&self, location: &Path) -> Result { - info!( + debug!( "{} head request for {}/{}", self.prefix, self.path_prefix, location ); @@ -107,7 +108,7 @@ impl ObjectStore for LoggingStore { } async fn delete(&self, location: &Path) -> Result<()> { - info!( + debug!( "{} delete request for {}/{}", self.prefix, self.path_prefix, location ); @@ -115,7 +116,7 @@ impl ObjectStore for LoggingStore { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { - info!( + debug!( "{} list request for {}/{}", self.prefix, self.path_prefix, @@ -125,7 +126,7 @@ impl ObjectStore for LoggingStore { } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - info!( + debug!( "{} list_with_delimeter request for {}/{}", self.prefix, self.path_prefix, @@ -135,7 +136,7 @@ impl ObjectStore for LoggingStore { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - info!( + debug!( "{} copy request from {}/{} to {}/{}", self.prefix, self.path_prefix, from, self.path_prefix, to ); @@ -143,7 +144,7 @@ impl ObjectStore for LoggingStore { } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - info!( + debug!( "{} copy_if_not_exists request from {}/{} to {}/{}", self.prefix, self.path_prefix, from, self.path_prefix, to ); @@ -156,7 +157,7 @@ impl ObjectStore for LoggingStore { payload: PutPayload, opts: PutOptions, ) -> Result { - info!( + debug!( "{} put request for {}/{} of {} bytes", self.prefix, self.path_prefix, @@ -171,7 +172,7 @@ impl ObjectStore for LoggingStore { location: &Path, opts: PutMultipartOptions, ) -> Result> { - info!( + debug!( "{} put multipart request for {}/{}", self.prefix, self.path_prefix, location ); @@ -208,7 +209,7 @@ impl LoggingMultipartUpload { #[async_trait] impl MultipartUpload for LoggingMultipartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { - info!( + debug!( "{} put_part request for {} of {} bytes", self.prefix, self.path, @@ -218,7 +219,7 @@ impl MultipartUpload for LoggingMultipartUpload { } async fn complete(&mut self) -> Result { - info!("multipart complete for {}", self.path); + debug!("multipart complete for {}", self.path); self.inner.complete().await } @@ -287,7 +288,7 @@ mod tests { captured_logs[1].body, "TEST get request for memory://test_file byte range 1 to 5 = 4 bytes" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -314,7 +315,7 @@ mod tests { captured_logs[1].body, "TEST get request for memory://test_file for byte 3 to EOF" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -341,7 +342,7 @@ mod tests { captured_logs[1].body, "TEST get request for memory://test_file for last 3 bytes of object" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -365,7 +366,7 @@ mod tests { captured_logs[1].body, "TEST get request for memory://test_file for complete file range" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -388,7 +389,7 @@ mod tests { captured_logs[1].body, "TEST head request for memory://test_file" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -411,7 +412,7 @@ mod tests { captured_logs[1].body, "TEST delete request for memory://test_file" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -431,7 +432,7 @@ mod tests { testing_logger::validate(|captured_logs| { assert_eq!(captured_logs.len(), 1); assert_eq!(captured_logs[0].body, "TEST list request for memory://foo"); - assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!(captured_logs[0].level, Level::Debug); }); Ok(()) @@ -454,7 +455,7 @@ mod tests { captured_logs[0].body, "TEST list_with_delimeter request for memory://foo" ); - assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!(captured_logs[0].level, Level::Debug); }); Ok(()) @@ -474,7 +475,7 @@ mod tests { testing_logger::validate(|captured_logs| { assert_eq!(captured_logs.len(), 1); assert_eq!(captured_logs[0].body, "TEST list request for memory://"); - assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!(captured_logs[0].level, Level::Debug); }); Ok(()) @@ -497,7 +498,7 @@ mod tests { captured_logs[0].body, "TEST list_with_delimeter request for memory://" ); - assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!(captured_logs[0].level, Level::Debug); }); Ok(()) @@ -522,7 +523,7 @@ mod tests { captured_logs[1].body, "TEST copy request from memory://test_file to memory://test_file2" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -547,7 +548,7 @@ mod tests { captured_logs[1].body, "TEST copy_if_not_exists request from memory://test_file to memory://test_file2" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); }); Ok(()) @@ -571,7 +572,7 @@ mod tests { captured_logs[0].body, "TEST put request for memory://test_file of 3 bytes" ); - assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!(captured_logs[0].level, Level::Debug); }); Ok(()) @@ -597,27 +598,27 @@ mod tests { captured_logs[0].body, "TEST put multipart request for memory://test_file" ); - assert_eq!(captured_logs[0].level, Level::Info); + assert_eq!(captured_logs[0].level, Level::Debug); assert_eq!( captured_logs[1].body, "TEST put_part request for memory://test_file of 3 bytes" ); - assert_eq!(captured_logs[1].level, Level::Info); + assert_eq!(captured_logs[1].level, Level::Debug); assert_eq!( captured_logs[2].body, "TEST put_part request for memory://test_file of 4 bytes" ); - assert_eq!(captured_logs[2].level, Level::Info); + assert_eq!(captured_logs[2].level, Level::Debug); assert_eq!( captured_logs[3].body, "TEST put_part request for memory://test_file of 5 bytes" ); - assert_eq!(captured_logs[3].level, Level::Info); + assert_eq!(captured_logs[3].level, Level::Debug); assert_eq!( captured_logs[4].body, "multipart complete for memory://test_file" ); - assert_eq!(captured_logs[4].level, Level::Info); + assert_eq!(captured_logs[4].level, Level::Debug); }); let retrieved_data = String::from_utf8( From 39bd0a56066d7adb54d0c0b02234f6d1dd6f2f1a Mon Sep 17 00:00:00 2001 From: m09526 Date: Wed, 9 Jul 2025 12:46:23 +0000 Subject: [PATCH 4/8] feat: Add LoggingStore wrapper (#380) --- src/logging.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/logging.rs b/src/logging.rs index cf4864a..9171b87 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -24,8 +24,8 @@ use async_trait::async_trait; use futures::stream::BoxStream; use log::debug; -/// An [`ObjectStore`] wrapper that logs operations made to the wrapped store. The logs are written using the ['log'] crate. -/// +/// An [`ObjectStore`] wrapper that logs operations made to the wrapped store. The logs are written using the [`log`] crate. +/// /// Logs are written at the "debug" logging level. #[derive(Debug)] pub struct LoggingStore { From 9cf1d7761272883d4f53927dff0fb08d9995b877 Mon Sep 17 00:00:00 2001 From: m09526 Date: Thu, 10 Jul 2025 09:35:19 +0000 Subject: [PATCH 5/8] Switch to using tracing module --- Cargo.toml | 3 +- src/lib.rs | 2 +- src/{logging.rs => trace.rs} | 228 +++++++++++++---------------------- 3 files changed, 87 insertions(+), 146 deletions(-) rename src/{logging.rs => trace.rs} (68%) diff --git a/Cargo.toml b/Cargo.toml index d8a505f..8909853 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ futures = "0.3" http = "1.2.0" humantime = "2.1" itertools = "0.14.0" -log = "0.4.27" parking_lot = { version = "0.12" } percent-encoding = "2.1" thiserror = "2.0.2" @@ -88,7 +87,7 @@ regex = "1.11.1" # The "gzip" feature for reqwest is enabled for an integration test. reqwest = { version = "0.12", features = ["gzip"] } tempfile = "3.1.0" -testing_logger = "0.1.1" +tracing-test = "0.2.5" [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dev-dependencies] wasm-bindgen-test = "0.3.50" diff --git a/src/lib.rs b/src/lib.rs index 69713d6..4d24b96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -521,7 +521,7 @@ pub mod http; pub mod limit; #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] pub mod local; -pub mod logging; +pub mod trace; pub mod memory; pub mod path; pub mod prefix; diff --git a/src/logging.rs b/src/trace.rs similarity index 68% rename from src/logging.rs rename to src/trace.rs index 9171b87..ef8c0c7 100644 --- a/src/logging.rs +++ b/src/trace.rs @@ -15,26 +15,24 @@ // specific language governing permissions and limitations // under the License. -//! An object store that logs calls to the wrapped implementation. +//! An object store that traces calls to the wrapped implementation. use crate::{ path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use futures::stream::BoxStream; -use log::debug; +use tracing::debug; -/// An [`ObjectStore`] wrapper that logs operations made to the wrapped store. The logs are written using the [`log`] crate. -/// -/// Logs are written at the "debug" logging level. +/// An [`ObjectStore`] wrapper that traces operations made to the wrapped store. #[derive(Debug)] -pub struct LoggingStore { +pub struct TracingStore { store: T, prefix: String, path_prefix: String, } -impl LoggingStore { +impl TracingStore { /// Create a new logging store by wrapping an inner store. #[must_use] pub fn new(inner: T, prefix: impl Into, path_prefix: impl Into) -> Self { @@ -46,7 +44,7 @@ impl LoggingStore { } } -impl std::fmt::Display for LoggingStore { +impl std::fmt::Display for TracingStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -57,7 +55,7 @@ impl std::fmt::Display for LoggingStore { } #[async_trait] -impl ObjectStore for LoggingStore { +impl ObjectStore for TracingStore { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { if !options.head { match &options.range { @@ -177,7 +175,7 @@ impl ObjectStore for LoggingStore { self.prefix, self.path_prefix, location ); let part_upload = self.store.put_multipart_opts(location, opts).await?; - Ok(Box::new(LoggingMultipartUpload::new( + Ok(Box::new(TracingMultipartUpload::new( part_upload, &self.prefix, format!("{}/{}", self.path_prefix, location), @@ -186,13 +184,13 @@ impl ObjectStore for LoggingStore { } #[derive(Debug)] -struct LoggingMultipartUpload { +struct TracingMultipartUpload { inner: Box, prefix: String, path: String, } -impl LoggingMultipartUpload { +impl TracingMultipartUpload { fn new( inner: Box, prefix: impl Into, @@ -207,7 +205,7 @@ impl LoggingMultipartUpload { } #[async_trait] -impl MultipartUpload for LoggingMultipartUpload { +impl MultipartUpload for TracingMultipartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { debug!( "{} put_part request for {} of {} bytes", @@ -230,11 +228,12 @@ impl MultipartUpload for LoggingMultipartUpload { #[cfg(test)] mod tests { + use tracing_test::traced_test; + use crate::{ - integration::*, logging::LoggingStore, memory::InMemory, GetOptions, GetRange, ObjectStore, + integration::*, memory::InMemory, trace::TracingStore, GetOptions, GetRange, ObjectStore, PutOptions, Result, }; - use log::Level; #[tokio::test] async fn log_test() { @@ -251,30 +250,15 @@ mod tests { put_get_attributes(&integration).await; } - fn make_store() -> LoggingStore { + fn make_store() -> TracingStore { let inner = InMemory::new(); - LoggingStore::new(inner, "TEST", "memory:/") - } - - #[tokio::test] - async fn zero_log() { - // Given - testing_logger::setup(); - let _store = make_store(); - - // When - // no-op - - // Then - testing_logger::validate(|captured_logs| { - assert_eq!(captured_logs.len(), 0); - }); + TracingStore::new(inner, "TEST", "memory:/") } + #[traced_test] #[tokio::test] async fn ranged_get_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -282,22 +266,20 @@ mod tests { store.get_range(&"test_file".into(), 1..5).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST get request for memory://test_file byte range 1 to 5 = 4 bytes" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1] + .contains("TEST get request for memory://test_file byte range 1 to 5 = 4 bytes")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn offset_get_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -309,22 +291,20 @@ mod tests { store.get_opts(&"test_file".into(), opts).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST get request for memory://test_file for byte 3 to EOF" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1] + .contains("TEST get request for memory://test_file for byte 3 to EOF")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn suffix_get_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -336,22 +316,20 @@ mod tests { store.get_opts(&"test_file".into(), opts).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST get request for memory://test_file for last 3 bytes of object" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1] + .contains("TEST get request for memory://test_file for last 3 bytes of object")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn no_range_get_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -360,22 +338,20 @@ mod tests { store.get_opts(&"test_file".into(), opts).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST get request for memory://test_file for complete file range" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1] + .contains("TEST get request for memory://test_file for complete file range")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn head_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -383,22 +359,19 @@ mod tests { store.head(&"test_file".into()).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST head request for memory://test_file" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1].contains("TEST head request for memory://test_file")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn delete_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -406,22 +379,19 @@ mod tests { store.delete(&"test_file".into()).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST delete request for memory://test_file" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1].contains("TEST delete request for memory://test_file")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn list_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); // When @@ -429,19 +399,19 @@ mod tests { store.list(Some(&"foo".into())); // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 1); - assert_eq!(captured_logs[0].body, "TEST list request for memory://foo"); - assert_eq!(captured_logs[0].level, Level::Debug); + assert!(captured_logs[0].contains("TEST list request for memory://foo")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn list_with_delimeter_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); // When @@ -449,22 +419,19 @@ mod tests { store.list_with_delimiter(Some(&"foo".into())).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 1); - assert_eq!( - captured_logs[0].body, - "TEST list_with_delimeter request for memory://foo" - ); - assert_eq!(captured_logs[0].level, Level::Debug); + assert!(captured_logs[0].contains("TEST list_with_delimeter request for memory://foo")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn list_path_none_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); // When @@ -472,19 +439,19 @@ mod tests { store.list(None); // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 1); - assert_eq!(captured_logs[0].body, "TEST list request for memory://"); - assert_eq!(captured_logs[0].level, Level::Debug); + assert!(captured_logs[0].contains("TEST list request for memory://")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn list_with_delimeter_path_none_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); // When @@ -492,22 +459,19 @@ mod tests { store.list_with_delimiter(None).await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 1); - assert_eq!( - captured_logs[0].body, - "TEST list_with_delimeter request for memory://" - ); - assert_eq!(captured_logs[0].level, Level::Debug); + assert!(captured_logs[0].contains("TEST list_with_delimeter request for memory://")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn copy_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -517,22 +481,20 @@ mod tests { .await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, - "TEST copy request from memory://test_file to memory://test_file2" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + assert!(captured_logs[1] + .contains("TEST copy request from memory://test_file to memory://test_file2")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn copy_if_not_exists_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); store.put(&"test_file".into(), "some_data".into()).await?; @@ -542,22 +504,21 @@ mod tests { .await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 2); - assert_eq!( - captured_logs[1].body, + assert!(captured_logs[1].contains( "TEST copy_if_not_exists request from memory://test_file to memory://test_file2" - ); - assert_eq!(captured_logs[1].level, Level::Debug); + )); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn put_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); // When @@ -566,22 +527,19 @@ mod tests { .await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 1); - assert_eq!( - captured_logs[0].body, - "TEST put request for memory://test_file of 3 bytes" - ); - assert_eq!(captured_logs[0].level, Level::Debug); + assert!(captured_logs[0].contains("TEST put request for memory://test_file of 3 bytes")); + Ok(()) }); Ok(()) } + #[traced_test] #[tokio::test] async fn put_multipart_log() -> Result<()> { // Given - testing_logger::setup(); let store = make_store(); // When @@ -592,33 +550,17 @@ mod tests { part.complete().await?; // Then - testing_logger::validate(|captured_logs| { + logs_assert(|captured_logs| { assert_eq!(captured_logs.len(), 5); - assert_eq!( - captured_logs[0].body, - "TEST put multipart request for memory://test_file" - ); - assert_eq!(captured_logs[0].level, Level::Debug); - assert_eq!( - captured_logs[1].body, - "TEST put_part request for memory://test_file of 3 bytes" - ); - assert_eq!(captured_logs[1].level, Level::Debug); - assert_eq!( - captured_logs[2].body, - "TEST put_part request for memory://test_file of 4 bytes" - ); - assert_eq!(captured_logs[2].level, Level::Debug); - assert_eq!( - captured_logs[3].body, - "TEST put_part request for memory://test_file of 5 bytes" - ); - assert_eq!(captured_logs[3].level, Level::Debug); - assert_eq!( - captured_logs[4].body, - "multipart complete for memory://test_file" - ); - assert_eq!(captured_logs[4].level, Level::Debug); + assert!(captured_logs[0].contains("TEST put multipart request for memory://test_file")); + assert!(captured_logs[1] + .contains("TEST put_part request for memory://test_file of 3 bytes")); + assert!(captured_logs[2] + .contains("TEST put_part request for memory://test_file of 4 bytes")); + assert!(captured_logs[3] + .contains("TEST put_part request for memory://test_file of 5 bytes")); + assert!(captured_logs[4].contains("multipart complete for memory://test_file")); + Ok(()) }); let retrieved_data = String::from_utf8( From f8728547c7ebfd7eaf5b369579580a6bfd9f9b26 Mon Sep 17 00:00:00 2001 From: m09526 Date: Thu, 10 Jul 2025 09:37:10 +0000 Subject: [PATCH 6/8] minor: fix typo in debug string --- src/trace.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trace.rs b/src/trace.rs index ef8c0c7..f2f8f96 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -33,7 +33,7 @@ pub struct TracingStore { } impl TracingStore { - /// Create a new logging store by wrapping an inner store. + /// Create a new tracing store by wrapping an inner store. #[must_use] pub fn new(inner: T, prefix: impl Into, path_prefix: impl Into) -> Self { Self { @@ -48,7 +48,7 @@ impl std::fmt::Display for TracingStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "LoggingStore \"{}\" path prefix: \"{}\" ({})", + "TracingStore \"{}\" path prefix: \"{}\" ({})", self.prefix, self.path_prefix, self.store ) } From 1b0563c6d292c19db6022cc35fe0d26504a85547 Mon Sep 17 00:00:00 2001 From: m09526 Date: Thu, 10 Jul 2025 16:19:16 +0000 Subject: [PATCH 7/8] Cargo fmt fix --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 4d24b96..44d787c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -521,7 +521,6 @@ pub mod http; pub mod limit; #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] pub mod local; -pub mod trace; pub mod memory; pub mod path; pub mod prefix; @@ -529,6 +528,7 @@ pub mod registry; #[cfg(feature = "cloud")] pub mod signer; pub mod throttle; +pub mod trace; #[cfg(feature = "cloud")] pub mod client; From 59c8d98cccaf566d1546c4848626ddd9d072cfd2 Mon Sep 17 00:00:00 2001 From: m09526 Date: Thu, 17 Jul 2025 12:07:14 +0000 Subject: [PATCH 8/8] Use spans and remove unneeded unit tests --- Cargo.toml | 1 - src/trace.rs | 448 ++++++--------------------------------------------- 2 files changed, 51 insertions(+), 398 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 93e5146..1e8f448 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,6 @@ regex = "1.11.1" # The "gzip" feature for reqwest is enabled for an integration test. reqwest = { version = "0.12", features = ["gzip"] } tempfile = "3.1.0" -tracing-test = "0.2.5" [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dev-dependencies] wasm-bindgen-test = "0.3.50" diff --git a/src/trace.rs b/src/trace.rs index f2f8f96..b2b1a4a 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -22,7 +22,7 @@ use crate::{ }; use async_trait::async_trait; use futures::stream::BoxStream; -use tracing::debug; +use tracing::instrument; /// An [`ObjectStore`] wrapper that traces operations made to the wrapped store. #[derive(Debug)] @@ -56,124 +56,107 @@ impl std::fmt::Display for TracingStore { #[async_trait] impl ObjectStore for TracingStore { + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location, range))] async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - if !options.head { + tracing::Span::current().record("location", format!("{}/{}", self.path_prefix, location)); + let range = if options.head { + "N/A: HEAD only request".to_owned() + } else { match &options.range { Some(GetRange::Bounded(get_range)) => { let len = get_range .end .checked_sub(get_range.start) .expect("Get range length is negative"); - debug!( - "{} get request for {}/{} byte range {} to {} = {} bytes", - self.prefix, - self.path_prefix, - location, - get_range.start, - get_range.end, - len, - ); + format!( + "bytes {} to {}, len {}", + get_range.start, get_range.end, len + ) } Some(GetRange::Offset(start_pos)) => { - debug!( - "{} get request for {}/{} for byte {} to EOF", - self.prefix, self.path_prefix, location, start_pos, - ); + format!("byte {start_pos} to EOF") } Some(GetRange::Suffix(pos)) => { - debug!( - "{} get request for {}/{} for last {} bytes of object", - self.prefix, self.path_prefix, location, pos, - ); - } - None => { - debug!( - "{} get request for {}/{} for complete file range", - self.prefix, self.path_prefix, location - ); + format!("last {pos} bytes of object") } + None => "complete file range".to_owned(), } - } + }; + tracing::Span::current().record("range", &range); self.store.get_opts(location, options).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location))] async fn head(&self, location: &Path) -> Result { - debug!( - "{} head request for {}/{}", - self.prefix, self.path_prefix, location - ); + tracing::Span::current().record("location", format!("{}/{}", self.path_prefix, location)); self.store.head(location).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location))] async fn delete(&self, location: &Path) -> Result<()> { - debug!( - "{} delete request for {}/{}", - self.prefix, self.path_prefix, location - ); + tracing::Span::current().record("location", format!("{}/{}", self.path_prefix, location)); self.store.delete(location).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, prefix))] fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { - debug!( - "{} list request for {}/{}", - self.prefix, - self.path_prefix, - prefix.unwrap_or(&Path::default()) + tracing::Span::current().record( + "prefix", + format!( + "{}/{}", + self.path_prefix, + prefix.unwrap_or(&Path::default()) + ), ); self.store.list(prefix) } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, prefix))] async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - debug!( - "{} list_with_delimeter request for {}/{}", - self.prefix, - self.path_prefix, - prefix.unwrap_or(&Path::default()) + tracing::Span::current().record( + "prefix", + format!( + "{}/{}", + self.path_prefix, + prefix.unwrap_or(&Path::default()) + ), ); self.store.list_with_delimiter(prefix).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, from, to))] async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - debug!( - "{} copy request from {}/{} to {}/{}", - self.prefix, self.path_prefix, from, self.path_prefix, to - ); + tracing::Span::current().record("from", format!("{}/{}", self.path_prefix, from)); + tracing::Span::current().record("to", format!("{}/{}", self.path_prefix, to)); self.store.copy(from, to).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, from, to))] async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - debug!( - "{} copy_if_not_exists request from {}/{} to {}/{}", - self.prefix, self.path_prefix, from, self.path_prefix, to - ); + tracing::Span::current().record("from", format!("{}/{}", self.path_prefix, from)); + tracing::Span::current().record("to", format!("{}/{}", self.path_prefix, to)); self.store.copy_if_not_exists(from, to).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location, length))] async fn put_opts( &self, location: &Path, payload: PutPayload, opts: PutOptions, ) -> Result { - debug!( - "{} put request for {}/{} of {} bytes", - self.prefix, - self.path_prefix, - location, - payload.content_length() - ); + tracing::Span::current().record("location", format!("{}/{}", self.path_prefix, location)); + tracing::Span::current().record("length", payload.content_length()); self.store.put_opts(location, payload, opts).await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location))] async fn put_multipart_opts( &self, location: &Path, opts: PutMultipartOptions, ) -> Result> { - debug!( - "{} put multipart request for {}/{}", - self.prefix, self.path_prefix, location - ); + tracing::Span::current().record("location", format!("{}/{}", self.path_prefix, location)); let part_upload = self.store.put_multipart_opts(location, opts).await?; Ok(Box::new(TracingMultipartUpload::new( part_upload, @@ -206,21 +189,18 @@ impl TracingMultipartUpload { #[async_trait] impl MultipartUpload for TracingMultipartUpload { + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location = self.path, length))] fn put_part(&mut self, data: PutPayload) -> UploadPart { - debug!( - "{} put_part request for {} of {} bytes", - self.prefix, - self.path, - data.content_length() - ); + tracing::Span::current().record("length", data.content_length()); self.inner.put_part(data) } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location = self.path))] async fn complete(&mut self) -> Result { - debug!("multipart complete for {}", self.path); self.inner.complete().await } + #[instrument(level = "debug", skip_all, fields(store = self.prefix, location = self.path))] async fn abort(&mut self) -> Result<()> { self.inner.abort().await } @@ -228,12 +208,7 @@ impl MultipartUpload for TracingMultipartUpload { #[cfg(test)] mod tests { - use tracing_test::traced_test; - - use crate::{ - integration::*, memory::InMemory, trace::TracingStore, GetOptions, GetRange, ObjectStore, - PutOptions, Result, - }; + use crate::{integration::*, memory::InMemory, trace::TracingStore}; #[tokio::test] async fn log_test() { @@ -254,325 +229,4 @@ mod tests { let inner = InMemory::new(); TracingStore::new(inner, "TEST", "memory:/") } - - #[traced_test] - #[tokio::test] - async fn ranged_get_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - store.get_range(&"test_file".into(), 1..5).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1] - .contains("TEST get request for memory://test_file byte range 1 to 5 = 4 bytes")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn offset_get_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - let opts = GetOptions { - range: Some(GetRange::Offset(3)), - ..Default::default() - }; - store.get_opts(&"test_file".into(), opts).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1] - .contains("TEST get request for memory://test_file for byte 3 to EOF")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn suffix_get_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - let opts = GetOptions { - range: Some(GetRange::Suffix(3)), - ..Default::default() - }; - store.get_opts(&"test_file".into(), opts).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1] - .contains("TEST get request for memory://test_file for last 3 bytes of object")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn no_range_get_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - let opts = GetOptions::default(); - store.get_opts(&"test_file".into(), opts).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1] - .contains("TEST get request for memory://test_file for complete file range")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn head_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - store.head(&"test_file".into()).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1].contains("TEST head request for memory://test_file")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn delete_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - store.delete(&"test_file".into()).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1].contains("TEST delete request for memory://test_file")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn list_log() -> Result<()> { - // Given - let store = make_store(); - - // When - #[allow(unused_must_use)] - store.list(Some(&"foo".into())); - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 1); - assert!(captured_logs[0].contains("TEST list request for memory://foo")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn list_with_delimeter_log() -> Result<()> { - // Given - let store = make_store(); - - // When - #[allow(unused_must_use)] - store.list_with_delimiter(Some(&"foo".into())).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 1); - assert!(captured_logs[0].contains("TEST list_with_delimeter request for memory://foo")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn list_path_none_log() -> Result<()> { - // Given - let store = make_store(); - - // When - #[allow(unused_must_use)] - store.list(None); - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 1); - assert!(captured_logs[0].contains("TEST list request for memory://")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn list_with_delimeter_path_none_log() -> Result<()> { - // Given - let store = make_store(); - - // When - #[allow(unused_must_use)] - store.list_with_delimiter(None).await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 1); - assert!(captured_logs[0].contains("TEST list_with_delimeter request for memory://")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn copy_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - store - .copy(&"test_file".into(), &"test_file2".into()) - .await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1] - .contains("TEST copy request from memory://test_file to memory://test_file2")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn copy_if_not_exists_log() -> Result<()> { - // Given - let store = make_store(); - store.put(&"test_file".into(), "some_data".into()).await?; - - // When - store - .copy_if_not_exists(&"test_file".into(), &"test_file2".into()) - .await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 2); - assert!(captured_logs[1].contains( - "TEST copy_if_not_exists request from memory://test_file to memory://test_file2" - )); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn put_log() -> Result<()> { - // Given - let store = make_store(); - - // When - store - .put_opts(&"test_file".into(), "foo".into(), PutOptions::default()) - .await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 1); - assert!(captured_logs[0].contains("TEST put request for memory://test_file of 3 bytes")); - Ok(()) - }); - - Ok(()) - } - - #[traced_test] - #[tokio::test] - async fn put_multipart_log() -> Result<()> { - // Given - let store = make_store(); - - // When - let mut part = store.put_multipart(&"test_file".into()).await?; - part.put_part("foo".into()).await?; - part.put_part("foo1".into()).await?; - part.put_part("foo12".into()).await?; - part.complete().await?; - - // Then - logs_assert(|captured_logs| { - assert_eq!(captured_logs.len(), 5); - assert!(captured_logs[0].contains("TEST put multipart request for memory://test_file")); - assert!(captured_logs[1] - .contains("TEST put_part request for memory://test_file of 3 bytes")); - assert!(captured_logs[2] - .contains("TEST put_part request for memory://test_file of 4 bytes")); - assert!(captured_logs[3] - .contains("TEST put_part request for memory://test_file of 5 bytes")); - assert!(captured_logs[4].contains("multipart complete for memory://test_file")); - Ok(()) - }); - - let retrieved_data = String::from_utf8( - store - .get(&"test_file".into()) - .await? - .bytes() - .await? - .to_vec(), - ) - .expect("String should be valid UTF-8"); - assert_eq!(retrieved_data, "foofoo1foo12"); - Ok(()) - } }