From 5f92f5c49d4c2c24906858f8b782748b9663a487 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 18:29:39 +0900 Subject: [PATCH 01/47] feat: add backon library --- Cargo.lock | 24 ++++++++++++++++++++++++ Cargo.toml | 2 ++ 2 files changed, 26 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3b0e4753..3d612261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1034,6 +1034,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1590,6 +1601,7 @@ dependencies = [ "alloy-node-bindings", "anyhow", "async-trait", + "backon", "chrono", "serde", "serde_json", @@ -1839,6 +1851,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" diff --git a/Cargo.toml b/Cargo.toml index a27bb890..59b907d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ tokio-stream = "0.1.17" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } hex = "0.4" +backon = "1.5.2" [package] name = "event-scanner" @@ -65,6 +66,7 @@ alloy-node-bindings.workspace = true tokio-stream.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +backon.workspace = true [lints] workspace = true From 2ad65163818913773bd4de8734fa6af3b81be8c8 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 21:11:26 +0900 Subject: [PATCH 02/47] feat: add provider wrapper doc doc --- src/lib.rs | 8 +++ src/safe_provider.rs | 150 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 src/safe_provider.rs diff --git a/src/lib.rs b/src/lib.rs index 38ed609a..d687f03f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,14 @@ pub mod block_range_scanner; pub mod event_filter; pub mod event_listener; pub mod event_scanner; +pub mod safe_provider; pub mod types; +pub use block_range_scanner::{ + BlockRangeMessage, BlockRangeScanner, BlockRangeScannerClient, BlockRangeScannerError, + DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_BLOCKS_READ_PER_EPOCH, +}; pub use event_filter::EventFilter; +pub use event_scanner::{EventScanner, EventScannerError, EventScannerMessage}; +pub use safe_provider::{SafeProvider, SafeProviderError}; + diff --git a/src/safe_provider.rs b/src/safe_provider.rs new file mode 100644 index 00000000..fce66a3f --- /dev/null +++ b/src/safe_provider.rs @@ -0,0 +1,150 @@ +//! Safe provider wrapper with built-in retry and timeout mechanisms. +//! +//! This module provides a wrapper around Alloy providers that automatically +//! handles retries, timeouts, and error logging for RPC calls. +//! +//! # Example +//! +//! ```rust,no_run +//! use alloy::{ +//! network::Ethereum, +//! providers::{RootProvider, WsConnect}, +//! rpc::client::ClientBuilder, +//! }; +//! use event_scanner::safe_provider::SafeProvider; +//! use std::time::Duration; +//! +//! async fn example() -> Result<(), Box> { +//! let provider = RootProvider::::new( +//! ClientBuilder::default().ws(WsConnect::new("wss://localhost:8000")).await?, +//! ); +//! let safe_provider = +//! SafeProvider::new(provider).with_timeout(Duration::from_secs(30)).with_max_retries(5); +//! +//! let block = safe_provider.get_block_by_number(12345.into()).await?; +//! Ok(()) +//! } +//! ``` + +use std::{sync::Arc, time::Duration}; + +use alloy::{ + eips::BlockNumberOrTag, + network::Network, + providers::{Provider, RootProvider}, + transports::{RpcError, TransportErrorKind}, +}; +use backon::{ExponentialBuilder, Retryable}; +use thiserror::Error; +use tokio::time::timeout; +use tracing::warn; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); +const DEFAULT_MAX_RETRIES: usize = 5; +const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Error, Debug, Clone)] +pub enum SafeProviderError { + #[error("RPC error: {0}")] + RpcError(Arc>), + + #[error("Request timeout after {0:?}")] + Timeout(Duration), + + #[error("Block not found: {0}")] + BlockNotFound(BlockNumberOrTag), + + #[error("All retry attempts exhausted")] + RetryExhausted, +} + +impl From> for SafeProviderError { + fn from(error: RpcError) -> Self { + SafeProviderError::RpcError(Arc::new(error)) + } +} + +#[derive(Clone)] +pub struct SafeProvider { + provider: RootProvider, + timeout: Duration, + max_retries: usize, + retry_interval: Duration, +} + +impl SafeProvider { + #[must_use] + pub fn new(provider: RootProvider) -> Self { + Self { + provider, + timeout: DEFAULT_TIMEOUT, + max_retries: DEFAULT_MAX_RETRIES, + retry_interval: DEFAULT_RETRY_INTERVAL, + } + } + + #[must_use] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + #[must_use] + pub fn with_max_retries(mut self, max_retries: usize) -> Self { + self.max_retries = max_retries; + self + } + + #[must_use] + pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self { + self.retry_interval = retry_interval; + self + } + + #[must_use] + pub fn inner(&self) -> &RootProvider { + &self.provider + } + + #[allow(clippy::missing_errors_doc)] + pub async fn get_block_by_number( + &self, + number: BlockNumberOrTag, + ) -> Result, SafeProviderError> { + let timeout_duration = self.timeout; + let provider = self.provider.clone(); + + let operation = || async { + let result = timeout(timeout_duration, provider.get_block_by_number(number)).await; + + match result { + Ok(Ok(block)) => Ok(block), + Ok(Err(e)) => { + warn!("RPC error fetching block {number}: {e}"); + Err(SafeProviderError::from(e)) + } + Err(_) => { + warn!("Timeout fetching block {number} after {timeout_duration:?}"); + Err(SafeProviderError::Timeout(timeout_duration)) + } + } + }; + + let retry_strategy = ExponentialBuilder::default() + .with_max_times(self.max_retries) + .with_min_delay(self.retry_interval); + + operation.retry(retry_strategy).sleep(tokio::time::sleep).await + } + + // pub async fn get_block_number(&self) -> Result { + // Ok(result) + // } + // + // pub async fn get_block_by_hash( + // &self, + // hash: alloy::primitives::BlockHash, + // ) -> Result, SafeProviderError> { + // Ok(result) + // } +} From 22c2448d673a363e4139ab943ae997a4407fd456 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 21:43:16 +0900 Subject: [PATCH 03/47] feat: use internal --- src/safe_provider.rs | 59 ++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index fce66a3f..f418f113 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -37,7 +37,6 @@ use alloy::{ use backon::{ExponentialBuilder, Retryable}; use thiserror::Error; use tokio::time::timeout; -use tracing::warn; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_MAX_RETRIES: usize = 5; @@ -111,22 +110,38 @@ impl SafeProvider { &self, number: BlockNumberOrTag, ) -> Result, SafeProviderError> { - let timeout_duration = self.timeout; let provider = self.provider.clone(); + self.retry_with_timeout(|| async { provider.get_block_by_number(number).await }).await + } - let operation = || async { - let result = timeout(timeout_duration, provider.get_block_by_number(number)).await; - - match result { - Ok(Ok(block)) => Ok(block), - Ok(Err(e)) => { - warn!("RPC error fetching block {number}: {e}"); - Err(SafeProviderError::from(e)) - } - Err(_) => { - warn!("Timeout fetching block {number} after {timeout_duration:?}"); - Err(SafeProviderError::Timeout(timeout_duration)) - } + #[allow(clippy::missing_errors_doc)] + pub async fn get_block_number(&self) -> Result { + let provider = self.provider.clone(); + self.retry_with_timeout(|| async { provider.get_block_number().await }).await + } + + #[allow(clippy::missing_errors_doc)] + pub async fn get_block_by_hash( + &self, + hash: alloy::primitives::BlockHash, + ) -> Result, SafeProviderError> { + let provider = self.provider.clone(); + self.retry_with_timeout(|| async { provider.get_block_by_hash(hash).await }).await + } + + #[allow(clippy::missing_errors_doc)] + async fn retry_with_timeout(&self, operation: F) -> Result + where + F: Fn() -> Fut, + Fut: Future>>, + { + let timeout_duration = self.timeout; + + let wrapped_operation = || async { + match timeout(timeout_duration, operation()).await { + Ok(Ok(result)) => Ok(result), + Ok(Err(e)) => Err(SafeProviderError::from(e)), + Err(_) => Err(SafeProviderError::Timeout(timeout_duration)), } }; @@ -134,17 +149,7 @@ impl SafeProvider { .with_max_times(self.max_retries) .with_min_delay(self.retry_interval); - operation.retry(retry_strategy).sleep(tokio::time::sleep).await + wrapped_operation.retry(retry_strategy).sleep(tokio::time::sleep).await } - - // pub async fn get_block_number(&self) -> Result { - // Ok(result) - // } - // - // pub async fn get_block_by_hash( - // &self, - // hash: alloy::primitives::BlockHash, - // ) -> Result, SafeProviderError> { - // Ok(result) - // } } + From d738ab291a97c88bccdedbc9010acddf143fd7a1 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 21:56:16 +0900 Subject: [PATCH 04/47] feat: add get logs to safe provider --- src/safe_provider.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index f418f113..bdbc215c 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -26,12 +26,13 @@ //! } //! ``` -use std::{sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use alloy::{ eips::BlockNumberOrTag, network::Network, providers::{Provider, RootProvider}, + rpc::types::{Filter, Log}, transports::{RpcError, TransportErrorKind}, }; use backon::{ExponentialBuilder, Retryable}; @@ -129,6 +130,13 @@ impl SafeProvider { self.retry_with_timeout(|| async { provider.get_block_by_hash(hash).await }).await } + #[allow(clippy::missing_errors_doc)] + pub async fn get_logs(&self, filter: &Filter) -> Result, SafeProviderError> { + let provider = self.provider.clone(); + let filter = filter.clone(); + self.retry_with_timeout(|| async { provider.get_logs(&filter).await }).await + } + #[allow(clippy::missing_errors_doc)] async fn retry_with_timeout(&self, operation: F) -> Result where @@ -152,4 +160,3 @@ impl SafeProvider { wrapped_operation.retry(retry_strategy).sleep(tokio::time::sleep).await } } - From d068c8e589564ce8cc2819c5538983e9c2eee8b3 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 22:06:32 +0900 Subject: [PATCH 05/47] chore: made var public --- src/safe_provider.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index bdbc215c..1baf129e 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -39,9 +39,9 @@ use backon::{ExponentialBuilder, Retryable}; use thiserror::Error; use tokio::time::timeout; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); -const DEFAULT_MAX_RETRIES: usize = 5; -const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); +pub const DEFAULT_MAX_RETRIES: usize = 5; +pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); #[derive(Error, Debug, Clone)] pub enum SafeProviderError { From d1e12e5a8922d5ce7a4be21bcdf9b355dcc9c486 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 22:06:54 +0900 Subject: [PATCH 06/47] feat: added safe provider conifgs to block range scanner --- src/block_range_scanner.rs | 89 ++++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 19 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index eab3ee6f..34216bbe 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -64,7 +64,7 @@ //! } //! ``` -use std::{cmp::Ordering, ops::RangeInclusive, sync::Arc}; +use std::{cmp::Ordering, ops::RangeInclusive, sync::Arc, time::Duration}; use tokio::{ join, @@ -72,7 +72,13 @@ use tokio::{ }; use tokio_stream::{StreamExt, wrappers::ReceiverStream}; -use crate::types::{ScannerMessage, ScannerStatus}; +use crate::{ + safe_provider::{ + DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL, DEFAULT_TIMEOUT, SafeProvider, + SafeProviderError, + }, + types::{ScannerMessage, ScannerStatus}, +}; use alloy::{ consensus::BlockHeader, eips::BlockNumberOrTag, @@ -82,7 +88,7 @@ use alloy::{ pubsub::Subscription, rpc::client::ClientBuilder, transports::{ - RpcError, TransportErrorKind, TransportResult, + TransportResult, http::reqwest::{self, Url}, ws::WsConnect, }, @@ -101,6 +107,11 @@ pub const MAX_BUFFERED_MESSAGES: usize = 50000; // is considered final) pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; +// RPC retry and timeout settings +pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30); +pub const DEFAULT_RPC_MAX_RETRIES: usize = 5; +pub const DEFAULT_RPC_RETRY_INTERVAL: Duration = Duration::from_secs(1); + // // State sync aware retry settings // const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30); // const STATE_SYNC_MAX_RETRIES: u64 = 12; @@ -138,8 +149,8 @@ pub enum BlockRangeScannerError { #[error("Serialization error: {0}")] SerializationError(Arc), - #[error("RPC error: {0}")] - RpcError(Arc>), + #[error("Safe provider error: {0}")] + SafeProviderError(Arc), #[error("Channel send error")] ChannelError, @@ -175,9 +186,9 @@ impl From for BlockRangeScannerError { } } -impl From> for BlockRangeScannerError { - fn from(error: RpcError) -> Self { - BlockRangeScannerError::RpcError(Arc::new(error)) +impl From for BlockRangeScannerError { + fn from(error: SafeProviderError) -> Self { + BlockRangeScannerError::SafeProviderError(Arc::new(error)) } } @@ -233,6 +244,9 @@ pub struct BlockRangeScanner { blocks_read_per_epoch: usize, max_reorg_depth: u64, block_confirmations: u64, + timeout: Duration, + max_retries: usize, + retry_interval: Duration, } impl Default for BlockRangeScanner { @@ -248,6 +262,9 @@ impl BlockRangeScanner { blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH, max_reorg_depth: DEFAULT_REORG_REWIND_DEPTH, block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, + timeout: DEFAULT_TIMEOUT, + max_retries: DEFAULT_MAX_RETRIES, + retry_interval: DEFAULT_RETRY_INTERVAL, } } @@ -269,6 +286,24 @@ impl BlockRangeScanner { self } + #[must_use] + pub fn with_timeout(mut self, rpc_timeout: Duration) -> Self { + self.timeout = rpc_timeout; + self + } + + #[must_use] + pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self { + self.max_retries = rpc_max_retries; + self + } + + #[must_use] + pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self { + self.retry_interval = rpc_retry_interval; + self + } + /// Connects to the provider via WebSocket /// /// # Errors @@ -305,8 +340,13 @@ impl BlockRangeScanner { self, provider: RootProvider, ) -> TransportResult> { + let safe_provider = SafeProvider::new(provider) + .with_timeout(self.timeout) + .with_max_retries(self.max_retries) + .with_retry_interval(self.retry_interval); + Ok(ConnectedBlockRangeScanner { - provider, + provider: safe_provider, config: Config { blocks_read_per_epoch: self.blocks_read_per_epoch, reorg_rewind_depth: self.max_reorg_depth, @@ -317,14 +357,14 @@ impl BlockRangeScanner { } pub struct ConnectedBlockRangeScanner { - provider: RootProvider, + provider: SafeProvider, config: Config, } impl ConnectedBlockRangeScanner { - /// Returns the underlying Provider. + /// Returns the `SafeProvider` #[must_use] - pub fn provider(&self) -> &RootProvider { + pub fn provider(&self) -> &SafeProvider { &self.provider } @@ -344,7 +384,7 @@ impl ConnectedBlockRangeScanner { struct Service { config: Config, - provider: RootProvider, + provider: SafeProvider, subscriber: Option>, websocket_connected: bool, processed_count: u64, @@ -354,7 +394,7 @@ struct Service { } impl Service { - pub fn new(config: Config, provider: RootProvider) -> (Self, mpsc::Sender) { + pub fn new(config: Config, provider: SafeProvider) -> (Self, mpsc::Sender) { let (cmd_tx, cmd_rx) = mpsc::channel(100); let service = Self { @@ -450,7 +490,13 @@ impl Service { let range_start = (latest + 1).saturating_sub(block_confirmations); tokio::spawn(async move { - Self::stream_live_blocks(range_start, provider, sender, block_confirmations).await; + Self::stream_live_blocks( + range_start, + provider.inner().clone(), + sender, + block_confirmations, + ) + .await; }); Ok(()) @@ -534,7 +580,7 @@ impl Service { let sender = self.subscriber.clone().ok_or_else(|| BlockRangeScannerError::ServiceShutdown)?; - let provider = self.provider.clone(); + let provider = self.provider.inner().clone(); tokio::spawn(async move { Self::stream_live_blocks(start_block_num, provider, sender, block_confirmations) .await; @@ -554,7 +600,7 @@ impl Service { let (live_block_buffer_sender, live_block_buffer_receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); - let provider = self.provider.clone(); + let provider = self.provider.inner().clone(); // The cutoff is the last block we have synced historically // Any block > cutoff will come from the live stream @@ -1104,8 +1150,12 @@ mod tests { Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 } } - fn mocked_provider(asserter: Asserter) -> RootProvider { - RootProvider::new(RpcClient::mocked(asserter)) + fn mocked_provider(asserter: Asserter) -> SafeProvider { + let root_provider = RootProvider::new(RpcClient::mocked(asserter)); + SafeProvider::new(root_provider) + .with_timeout(DEFAULT_RPC_TIMEOUT) + .with_max_retries(DEFAULT_RPC_MAX_RETRIES) + .with_retry_interval(DEFAULT_RPC_RETRY_INTERVAL) } #[test] @@ -1970,3 +2020,4 @@ mod tests { Ok(()) } } + From 0e9aa3bb07e0465d223f8f95bc4f91ade5d33782 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 22:30:14 +0900 Subject: [PATCH 07/47] feat: use safe provider in event scanner --- src/event_scanner.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 3b5c063d..b3bef118 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -7,12 +7,13 @@ use crate::{ }, event_filter::EventFilter, event_listener::EventListener, + safe_provider::{SafeProvider, SafeProviderError}, types::ScannerMessage, }; use alloy::{ eips::BlockNumberOrTag, network::Network, - providers::{Provider, RootProvider}, + providers::RootProvider, rpc::types::{Filter, Log}, sol_types::SolEvent, transports::{RpcError, TransportErrorKind, http::reqwest::Url}, @@ -80,6 +81,8 @@ pub enum EventScannerError { BlockRangeScanner(#[from] BlockRangeScannerError), #[error("Provider error: {0}")] Provider(Arc>), + #[error("Safe provider error: {0}")] + SafeProvider(Arc), } impl From> for EventScannerError { @@ -88,6 +91,12 @@ impl From> for EventScannerError { } } +impl From for EventScannerError { + fn from(e: SafeProviderError) -> Self { + EventScannerError::SafeProvider(Arc::new(e)) + } +} + impl From> for EventScannerMessage { fn from(e: RpcError) -> Self { EventScannerMessage::Error(e.into()) @@ -100,6 +109,12 @@ impl From for EventScannerMessage { } } +impl From for EventScannerMessage { + fn from(e: EventScannerError) -> Self { + EventScannerMessage::Error(e) + } +} + impl Default for EventScanner { fn default() -> Self { Self::new() @@ -376,8 +391,8 @@ impl ConnectedEventScanner { range: RangeInclusive, event_filter: &EventFilter, log_filter: &Filter, - provider: &RootProvider, - ) -> Result, RpcError> { + provider: &SafeProvider, + ) -> Result, EventScannerError> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); match provider.get_logs(&log_filter).await { @@ -403,7 +418,7 @@ impl ConnectedEventScanner { "failed to get logs for block range" ); - Err(e) + Err(e.into()) } } } From 319940adc50713e9dcbd15a456876acb34e6564f Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 Oct 2025 23:20:14 +0900 Subject: [PATCH 08/47] feat: undo safe provider errors --- src/block_range_scanner.rs | 24 +++++++++----------- src/event_scanner.rs | 10 ++------- src/lib.rs | 2 +- src/safe_provider.rs | 45 ++++++++------------------------------ 4 files changed, 22 insertions(+), 59 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 34216bbe..d66555e4 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -73,10 +73,7 @@ use tokio::{ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ - safe_provider::{ - DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL, DEFAULT_TIMEOUT, SafeProvider, - SafeProviderError, - }, + safe_provider::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL, DEFAULT_TIMEOUT, SafeProvider}, types::{ScannerMessage, ScannerStatus}, }; use alloy::{ @@ -88,7 +85,7 @@ use alloy::{ pubsub::Subscription, rpc::client::ClientBuilder, transports::{ - TransportResult, + RpcError, TransportErrorKind, TransportResult, http::reqwest::{self, Url}, ws::WsConnect, }, @@ -149,8 +146,11 @@ pub enum BlockRangeScannerError { #[error("Serialization error: {0}")] SerializationError(Arc), - #[error("Safe provider error: {0}")] - SafeProviderError(Arc), + #[error("Provider error: {0}")] + Provider(Arc>), + + #[error("Block not found, block number: {0}")] + BlockNotFound(BlockNumberOrTag), #[error("Channel send error")] ChannelError, @@ -169,9 +169,6 @@ pub enum BlockRangeScannerError { #[error("WebSocket connection failed after {0} attempts")] WebSocketConnectionFailed(usize), - - #[error("Block not found, block number: {0}")] - BlockNotFound(BlockNumberOrTag), } impl From for BlockRangeScannerError { @@ -186,9 +183,9 @@ impl From for BlockRangeScannerError { } } -impl From for BlockRangeScannerError { - fn from(error: SafeProviderError) -> Self { - BlockRangeScannerError::SafeProviderError(Arc::new(error)) +impl From> for BlockRangeScannerError { + fn from(error: RpcError) -> Self { + BlockRangeScannerError::Provider(Arc::new(error)) } } @@ -2020,4 +2017,3 @@ mod tests { Ok(()) } } - diff --git a/src/event_scanner.rs b/src/event_scanner.rs index b3bef118..9ae703ca 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -7,7 +7,7 @@ use crate::{ }, event_filter::EventFilter, event_listener::EventListener, - safe_provider::{SafeProvider, SafeProviderError}, + safe_provider::SafeProvider, types::ScannerMessage, }; use alloy::{ @@ -81,8 +81,7 @@ pub enum EventScannerError { BlockRangeScanner(#[from] BlockRangeScannerError), #[error("Provider error: {0}")] Provider(Arc>), - #[error("Safe provider error: {0}")] - SafeProvider(Arc), + } impl From> for EventScannerError { @@ -91,11 +90,6 @@ impl From> for EventScannerError { } } -impl From for EventScannerError { - fn from(e: SafeProviderError) -> Self { - EventScannerError::SafeProvider(Arc::new(e)) - } -} impl From> for EventScannerMessage { fn from(e: RpcError) -> Self { diff --git a/src/lib.rs b/src/lib.rs index c2fd0070..4bd99c6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,4 +13,4 @@ pub use block_range_scanner::{ }; pub use event_filter::EventFilter; pub use event_scanner::{EventScanner, EventScannerError, EventScannerMessage}; -pub use safe_provider::{SafeProvider, SafeProviderError}; +pub use safe_provider::SafeProvider; diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 1baf129e..46a29a3a 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -26,7 +26,7 @@ //! } //! ``` -use std::{future::Future, sync::Arc, time::Duration}; +use std::{future::Future, time::Duration}; use alloy::{ eips::BlockNumberOrTag, @@ -36,33 +36,14 @@ use alloy::{ transports::{RpcError, TransportErrorKind}, }; use backon::{ExponentialBuilder, Retryable}; -use thiserror::Error; -use tokio::time::timeout; + + pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); pub const DEFAULT_MAX_RETRIES: usize = 5; pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); -#[derive(Error, Debug, Clone)] -pub enum SafeProviderError { - #[error("RPC error: {0}")] - RpcError(Arc>), - - #[error("Request timeout after {0:?}")] - Timeout(Duration), - - #[error("Block not found: {0}")] - BlockNotFound(BlockNumberOrTag), - #[error("All retry attempts exhausted")] - RetryExhausted, -} - -impl From> for SafeProviderError { - fn from(error: RpcError) -> Self { - SafeProviderError::RpcError(Arc::new(error)) - } -} #[derive(Clone)] pub struct SafeProvider { @@ -110,13 +91,13 @@ impl SafeProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result, SafeProviderError> { + ) -> Result, RpcError> { let provider = self.provider.clone(); self.retry_with_timeout(|| async { provider.get_block_by_number(number).await }).await } #[allow(clippy::missing_errors_doc)] - pub async fn get_block_number(&self) -> Result { + pub async fn get_block_number(&self) -> Result> { let provider = self.provider.clone(); self.retry_with_timeout(|| async { provider.get_block_number().await }).await } @@ -125,33 +106,25 @@ impl SafeProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, SafeProviderError> { + ) -> Result, RpcError> { let provider = self.provider.clone(); self.retry_with_timeout(|| async { provider.get_block_by_hash(hash).await }).await } #[allow(clippy::missing_errors_doc)] - pub async fn get_logs(&self, filter: &Filter) -> Result, SafeProviderError> { + pub async fn get_logs(&self, filter: &Filter) -> Result, RpcError> { let provider = self.provider.clone(); let filter = filter.clone(); self.retry_with_timeout(|| async { provider.get_logs(&filter).await }).await } #[allow(clippy::missing_errors_doc)] - async fn retry_with_timeout(&self, operation: F) -> Result + async fn retry_with_timeout(&self, operation: F) -> Result> where F: Fn() -> Fut, Fut: Future>>, { - let timeout_duration = self.timeout; - - let wrapped_operation = || async { - match timeout(timeout_duration, operation()).await { - Ok(Ok(result)) => Ok(result), - Ok(Err(e)) => Err(SafeProviderError::from(e)), - Err(_) => Err(SafeProviderError::Timeout(timeout_duration)), - } - }; + let wrapped_operation = || async { operation().await }; let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) From 65527b8232150788e262558285bb8fddb4c067b4 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 20 Oct 2025 21:46:50 +0900 Subject: [PATCH 09/47] feat: implement stream block remove import remove wrapped --- src/block_range_scanner.rs | 21 ++++++++------------- src/safe_provider.rs | 30 +++++++++++++++++++----------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index d66555e4..f68f2536 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -81,7 +81,7 @@ use alloy::{ eips::BlockNumberOrTag, network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::{B256, BlockNumber}, - providers::{Provider, RootProvider}, + providers::RootProvider, pubsub::Subscription, rpc::client::ClientBuilder, transports::{ @@ -487,13 +487,7 @@ impl Service { let range_start = (latest + 1).saturating_sub(block_confirmations); tokio::spawn(async move { - Self::stream_live_blocks( - range_start, - provider.inner().clone(), - sender, - block_confirmations, - ) - .await; + Self::stream_live_blocks(range_start, provider, sender, block_confirmations).await; }); Ok(()) @@ -577,7 +571,7 @@ impl Service { let sender = self.subscriber.clone().ok_or_else(|| BlockRangeScannerError::ServiceShutdown)?; - let provider = self.provider.inner().clone(); + let provider = self.provider.clone(); tokio::spawn(async move { Self::stream_live_blocks(start_block_num, provider, sender, block_confirmations) .await; @@ -597,7 +591,7 @@ impl Service { let (live_block_buffer_sender, live_block_buffer_receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); - let provider = self.provider.inner().clone(); + let provider = self.provider.clone(); // The cutoff is the last block we have synced historically // Any block > cutoff will come from the live stream @@ -792,9 +786,9 @@ impl Service { Ok(()) } - async fn stream_live_blocks>( + async fn stream_live_blocks( mut range_start: BlockNumber, - provider: P, + provider: SafeProvider, sender: mpsc::Sender, block_confirmations: u64, ) { @@ -900,7 +894,7 @@ impl Service { } async fn get_block_subscription( - provider: &impl Provider, + provider: &SafeProvider, ) -> Result, BlockRangeScannerError> { let ws_stream = provider .subscribe_blocks() @@ -1128,6 +1122,7 @@ impl BlockRangeScannerClient { #[cfg(test)] mod tests { + use alloy::providers::Provider; use std::time::Duration; use tokio::time::timeout; diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 46a29a3a..d3bcbc5f 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -32,19 +32,16 @@ use alloy::{ eips::BlockNumberOrTag, network::Network, providers::{Provider, RootProvider}, + pubsub::Subscription, rpc::types::{Filter, Log}, transports::{RpcError, TransportErrorKind}, }; use backon::{ExponentialBuilder, Retryable}; - - pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); pub const DEFAULT_MAX_RETRIES: usize = 5; pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); - - #[derive(Clone)] pub struct SafeProvider { provider: RootProvider, @@ -112,24 +109,35 @@ impl SafeProvider { } #[allow(clippy::missing_errors_doc)] - pub async fn get_logs(&self, filter: &Filter) -> Result, RpcError> { + pub async fn get_logs( + &self, + filter: &Filter, + ) -> Result, RpcError> { + let provider = self.provider.clone(); + self.retry_with_timeout(|| async { provider.get_logs(filter).await }).await + } + + #[allow(clippy::missing_errors_doc)] + pub async fn subscribe_blocks( + &self, + ) -> Result, RpcError> { let provider = self.provider.clone(); - let filter = filter.clone(); - self.retry_with_timeout(|| async { provider.get_logs(&filter).await }).await + self.retry_with_timeout(|| async { provider.subscribe_blocks().await }).await } #[allow(clippy::missing_errors_doc)] - async fn retry_with_timeout(&self, operation: F) -> Result> + async fn retry_with_timeout( + &self, + operation: F, + ) -> Result> where F: Fn() -> Fut, Fut: Future>>, { - let wrapped_operation = || async { operation().await }; - let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) .with_min_delay(self.retry_interval); - wrapped_operation.retry(retry_strategy).sleep(tokio::time::sleep).await + operation.retry(retry_strategy).sleep(tokio::time::sleep).await } } From b5bf428de90df422877a87df5aefb8c3cf26c232 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 20 Oct 2025 22:46:53 +0900 Subject: [PATCH 10/47] test: add basic testing to safe provider --- src/safe_provider.rs | 100 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index d3bcbc5f..d0f57863 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -126,7 +126,7 @@ impl SafeProvider { } #[allow(clippy::missing_errors_doc)] - async fn retry_with_timeout( + pub(crate) async fn retry_with_timeout( &self, operation: F, ) -> Result> @@ -136,8 +136,106 @@ impl SafeProvider { { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) + .with_total_delay(Some(self.timeout)) .with_min_delay(self.retry_interval); operation.retry(retry_strategy).sleep(tokio::time::sleep).await } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy::network::Ethereum; + use std::sync::{Arc, Mutex}; + + fn create_test_provider( + timeout: Duration, + max_retries: usize, + retry_interval: Duration, + ) -> SafeProvider { + SafeProvider { + provider: RootProvider::::new_http("http://localhost:8545".parse().unwrap()), + timeout, + max_retries, + retry_interval, + } + } + + #[tokio::test] + async fn test_retry_with_timeout_succeeds_on_first_attempt() { + let provider = + create_test_provider(Duration::from_millis(100), 3, Duration::from_millis(10)); + + let call_count = Arc::new(Mutex::new(0)); + let call_count_clone = call_count.clone(); + + let result = provider + .retry_with_timeout(move || { + let count = call_count_clone.clone(); + async move { + let mut c = count.lock().unwrap(); + *c += 1; + Ok(42) + } + }) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + assert_eq!(*call_count.lock().unwrap(), 1); + } + + #[tokio::test] + async fn test_retry_with_timeout_retries_on_error() { + let provider = + create_test_provider(Duration::from_millis(100), 3, Duration::from_millis(10)); + + let call_count = Arc::new(Mutex::new(0)); + let call_count_clone = call_count.clone(); + + let result = provider + .retry_with_timeout(move || { + let count = call_count_clone.clone(); + async move { + let mut c = count.lock().unwrap(); + *c += 1; + if *c < 3 { + Err(TransportErrorKind::custom_str("temporary error")) + } else { + Ok(42) + } + } + }) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + assert_eq!(*call_count.lock().unwrap(), 3); + } + + #[tokio::test] + async fn test_retry_with_timeout_fails_after_max_retries() { + let provider = + create_test_provider(Duration::from_millis(100), 2, Duration::from_millis(10)); + + let call_count = Arc::new(Mutex::new(0)); + let call_count_clone = call_count.clone(); + + let result = provider + .retry_with_timeout(move || { + let count = call_count_clone.clone(); + async move { + let mut c = count.lock().unwrap(); + *c += 1; + Err::>(TransportErrorKind::custom_str( + "permanent error", + )) + } + }) + .await; + + assert!(result.is_err()); + assert_eq!(*call_count.lock().unwrap(), 3); + } +} From 6241e8375390bfe4ab8d7d348312b3fe558c1d23 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 20 Oct 2025 22:48:44 +0900 Subject: [PATCH 11/47] chore: delete other constants --- src/block_range_scanner.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index f68f2536..9c4589f3 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -109,10 +109,6 @@ pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const DEFAULT_RPC_MAX_RETRIES: usize = 5; pub const DEFAULT_RPC_RETRY_INTERVAL: Duration = Duration::from_secs(1); -// // State sync aware retry settings -// const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30); -// const STATE_SYNC_MAX_RETRIES: u64 = 12; - pub type BlockRangeMessage = ScannerMessage, BlockRangeScannerError>; impl From, BlockRangeScannerError>> for BlockRangeMessage { From 79eaaaef91e21b0d3b9e8e62dce575a99b458b81 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 20 Oct 2025 22:51:02 +0900 Subject: [PATCH 12/47] fix: fmt --- src/event_scanner.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 9ae703ca..464c518a 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -81,7 +81,6 @@ pub enum EventScannerError { BlockRangeScanner(#[from] BlockRangeScannerError), #[error("Provider error: {0}")] Provider(Arc>), - } impl From> for EventScannerError { @@ -90,7 +89,6 @@ impl From> for EventScannerError { } } - impl From> for EventScannerMessage { fn from(e: RpcError) -> Self { EventScannerMessage::Error(e.into()) From 112c8600ee78218e34db9f73ca15eefe6be3c3bb Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 21 Oct 2025 21:56:30 +0900 Subject: [PATCH 13/47] feat: add logging to rpc calls --- src/safe_provider.rs | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index d0f57863..4fde8150 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -37,6 +37,7 @@ use alloy::{ transports::{RpcError, TransportErrorKind}, }; use backon::{ExponentialBuilder, Retryable}; +use tracing::{debug, error}; pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); pub const DEFAULT_MAX_RETRIES: usize = 5; @@ -89,14 +90,25 @@ impl SafeProvider { &self, number: BlockNumberOrTag, ) -> Result, RpcError> { + debug!("SafeProvider eth_getBlockByNumber called with number: {:?}", number); let provider = self.provider.clone(); - self.retry_with_timeout(|| async { provider.get_block_by_number(number).await }).await + let result = + self.retry_with_timeout(|| async { provider.get_block_by_number(number).await }).await; + if let Err(e) = &result { + error!("SafeProvider eth_getByBlockNumber failed: {}", e); + } + result } #[allow(clippy::missing_errors_doc)] pub async fn get_block_number(&self) -> Result> { + debug!("SafeProvider eth_getBlockNumber called"); let provider = self.provider.clone(); - self.retry_with_timeout(|| async { provider.get_block_number().await }).await + let result = self.retry_with_timeout(|| async { provider.get_block_number().await }).await; + if let Err(e) = &result { + error!("SafeProvider eth_getBlockNumber failed: {}", e); + } + result } #[allow(clippy::missing_errors_doc)] @@ -104,8 +116,14 @@ impl SafeProvider { &self, hash: alloy::primitives::BlockHash, ) -> Result, RpcError> { + debug!("SafeProvider eth_getBlockByHash called with hash: {:?}", hash); let provider = self.provider.clone(); - self.retry_with_timeout(|| async { provider.get_block_by_hash(hash).await }).await + let result = + self.retry_with_timeout(|| async { provider.get_block_by_hash(hash).await }).await; + if let Err(e) = &result { + error!("SafeProvider eth_getBlockByHash failed: {}", e); + } + result } #[allow(clippy::missing_errors_doc)] @@ -113,16 +131,26 @@ impl SafeProvider { &self, filter: &Filter, ) -> Result, RpcError> { + debug!("eth_getLogs called with filter: {:?}", filter); let provider = self.provider.clone(); - self.retry_with_timeout(|| async { provider.get_logs(filter).await }).await + let result = self.retry_with_timeout(|| async { provider.get_logs(filter).await }).await; + if let Err(e) = &result { + error!("SafeProvider eth_getLogs failed: {}", e); + } + result } #[allow(clippy::missing_errors_doc)] pub async fn subscribe_blocks( &self, ) -> Result, RpcError> { + debug!("eth_subscribe called"); let provider = self.provider.clone(); - self.retry_with_timeout(|| async { provider.subscribe_blocks().await }).await + let result = self.retry_with_timeout(|| async { provider.subscribe_blocks().await }).await; + if let Err(e) = &result { + error!("SafeProvider eth_subscribe failed: {}", e); + } + result } #[allow(clippy::missing_errors_doc)] From 9fc65430b3744621aefa4cc47ad66f4caebdcd6f Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 21 Oct 2025 22:42:08 +0900 Subject: [PATCH 14/47] chore: add comments and rename timeout --- src/block_range_scanner.rs | 25 ++++++++--------- src/safe_provider.rs | 56 +++++++++++++++++++++++++++++--------- 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 9c4589f3..50368d66 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -73,7 +73,9 @@ use tokio::{ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ - safe_provider::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL, DEFAULT_TIMEOUT, SafeProvider}, + safe_provider::{ + DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, SafeProvider, + }, types::{ScannerMessage, ScannerStatus}, }; use alloy::{ @@ -104,11 +106,6 @@ pub const MAX_BUFFERED_MESSAGES: usize = 50000; // is considered final) pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; -// RPC retry and timeout settings -pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub const DEFAULT_RPC_MAX_RETRIES: usize = 5; -pub const DEFAULT_RPC_RETRY_INTERVAL: Duration = Duration::from_secs(1); - pub type BlockRangeMessage = ScannerMessage, BlockRangeScannerError>; impl From, BlockRangeScannerError>> for BlockRangeMessage { @@ -237,7 +234,7 @@ pub struct BlockRangeScanner { blocks_read_per_epoch: usize, max_reorg_depth: u64, block_confirmations: u64, - timeout: Duration, + max_timeout: Duration, max_retries: usize, retry_interval: Duration, } @@ -255,7 +252,7 @@ impl BlockRangeScanner { blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH, max_reorg_depth: DEFAULT_REORG_REWIND_DEPTH, block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, - timeout: DEFAULT_TIMEOUT, + max_timeout: DEFAULT_MAX_TIMEOUT, max_retries: DEFAULT_MAX_RETRIES, retry_interval: DEFAULT_RETRY_INTERVAL, } @@ -280,8 +277,8 @@ impl BlockRangeScanner { } #[must_use] - pub fn with_timeout(mut self, rpc_timeout: Duration) -> Self { - self.timeout = rpc_timeout; + pub fn with_max_timeout(mut self, rpc_timeout: Duration) -> Self { + self.max_timeout = rpc_timeout; self } @@ -334,7 +331,7 @@ impl BlockRangeScanner { provider: RootProvider, ) -> TransportResult> { let safe_provider = SafeProvider::new(provider) - .with_timeout(self.timeout) + .with_max_timeout(self.max_timeout) .with_max_retries(self.max_retries) .with_retry_interval(self.retry_interval); @@ -1141,9 +1138,9 @@ mod tests { fn mocked_provider(asserter: Asserter) -> SafeProvider { let root_provider = RootProvider::new(RpcClient::mocked(asserter)); SafeProvider::new(root_provider) - .with_timeout(DEFAULT_RPC_TIMEOUT) - .with_max_retries(DEFAULT_RPC_MAX_RETRIES) - .with_retry_interval(DEFAULT_RPC_RETRY_INTERVAL) + .with_max_timeout(DEFAULT_MAX_TIMEOUT) + .with_max_retries(DEFAULT_MAX_RETRIES) + .with_retry_interval(DEFAULT_RETRY_INTERVAL) } #[test] diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 4fde8150..b4deaf22 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -39,32 +39,38 @@ use alloy::{ use backon::{ExponentialBuilder, Retryable}; use tracing::{debug, error}; -pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); +// RPC retry and timeout settings +/// Default timeout used by `SafeProvider` +pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); +/// Default maximum number of retry attempts. pub const DEFAULT_MAX_RETRIES: usize = 5; +/// Default base delay between retries. pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); +/// Provider wrapper adding retries and timeouts. #[derive(Clone)] pub struct SafeProvider { provider: RootProvider, - timeout: Duration, + max_timeout: Duration, max_retries: usize, retry_interval: Duration, } impl SafeProvider { + /// Create a new `SafeProvider` with default settings. #[must_use] pub fn new(provider: RootProvider) -> Self { Self { provider, - timeout: DEFAULT_TIMEOUT, + max_timeout: DEFAULT_MAX_TIMEOUT, max_retries: DEFAULT_MAX_RETRIES, retry_interval: DEFAULT_RETRY_INTERVAL, } } #[must_use] - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.timeout = timeout; + pub fn with_max_timeout(mut self, timeout: Duration) -> Self { + self.max_timeout = timeout; self } @@ -85,7 +91,11 @@ impl SafeProvider { &self.provider } - #[allow(clippy::missing_errors_doc)] + /// Fetch a block by number with retry and timeout. + /// + /// # Errors + /// Returns `RpcError` if the RPC call fails + /// after exhausting retries or times out. pub async fn get_block_by_number( &self, number: BlockNumberOrTag, @@ -100,7 +110,11 @@ impl SafeProvider { result } - #[allow(clippy::missing_errors_doc)] + /// Fetch the latest block number with retry and timeout. + /// + /// # Errors + /// Returns `RpcError` if the RPC call fails + /// after exhausting retries or times out. pub async fn get_block_number(&self) -> Result> { debug!("SafeProvider eth_getBlockNumber called"); let provider = self.provider.clone(); @@ -111,7 +125,11 @@ impl SafeProvider { result } - #[allow(clippy::missing_errors_doc)] + /// Fetch a block by hash with retry and timeout. + /// + /// # Errors + /// Returns `RpcError` if the RPC call fails + /// after exhausting retries or times out. pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, @@ -126,7 +144,11 @@ impl SafeProvider { result } - #[allow(clippy::missing_errors_doc)] + /// Fetch logs for the given filter with retry and timeout. + /// + /// # Errors + /// Returns `RpcError` if the RPC call fails + /// after exhausting retries or times out. pub async fn get_logs( &self, filter: &Filter, @@ -140,7 +162,11 @@ impl SafeProvider { result } - #[allow(clippy::missing_errors_doc)] + /// Subscribe to new block headers with retry and timeout. + /// + /// # Errors + /// Returns `RpcError` if the subscription + /// cannot be established after retries or times out. pub async fn subscribe_blocks( &self, ) -> Result, RpcError> { @@ -153,7 +179,11 @@ impl SafeProvider { result } - #[allow(clippy::missing_errors_doc)] + /// Execute `operation` with exponential backoff and a total timeout. + /// + /// # Errors + /// Returns `RpcError` if all attempts fail or the + /// total delay exceeds the configured timeout. pub(crate) async fn retry_with_timeout( &self, operation: F, @@ -164,7 +194,7 @@ impl SafeProvider { { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) - .with_total_delay(Some(self.timeout)) + .with_total_delay(Some(self.max_timeout)) .with_min_delay(self.retry_interval); operation.retry(retry_strategy).sleep(tokio::time::sleep).await @@ -184,7 +214,7 @@ mod tests { ) -> SafeProvider { SafeProvider { provider: RootProvider::::new_http("http://localhost:8545".parse().unwrap()), - timeout, + max_timeout: timeout, max_retries, retry_interval, } From 8bd13b1f49a2973e3158d2f0e42a4545bfeb752c Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 21 Oct 2025 22:45:09 +0900 Subject: [PATCH 15/47] chore: doctest --- src/safe_provider.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index b4deaf22..d7b32f27 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -18,8 +18,9 @@ //! let provider = RootProvider::::new( //! ClientBuilder::default().ws(WsConnect::new("wss://localhost:8000")).await?, //! ); -//! let safe_provider = -//! SafeProvider::new(provider).with_timeout(Duration::from_secs(30)).with_max_retries(5); +//! let safe_provider = SafeProvider::new(provider) +//! .with_max_timeout(Duration::from_secs(30)) +//! .with_max_retries(5); //! //! let block = safe_provider.get_block_by_number(12345.into()).await?; //! Ok(()) From af7ce0159fdd15f3b4dffae876f04c77bbbf490d Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 21 Oct 2025 23:09:58 +0900 Subject: [PATCH 16/47] feat: add total timeout --- src/safe_provider.rs | 66 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index d7b32f27..bbd8648a 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -103,8 +103,9 @@ impl SafeProvider { ) -> Result, RpcError> { debug!("SafeProvider eth_getBlockByNumber called with number: {:?}", number); let provider = self.provider.clone(); - let result = - self.retry_with_timeout(|| async { provider.get_block_by_number(number).await }).await; + let result = self + .retry_with_total_timeout(|| async { provider.get_block_by_number(number).await }) + .await; if let Err(e) = &result { error!("SafeProvider eth_getByBlockNumber failed: {}", e); } @@ -119,7 +120,8 @@ impl SafeProvider { pub async fn get_block_number(&self) -> Result> { debug!("SafeProvider eth_getBlockNumber called"); let provider = self.provider.clone(); - let result = self.retry_with_timeout(|| async { provider.get_block_number().await }).await; + let result = + self.retry_with_total_timeout(|| async { provider.get_block_number().await }).await; if let Err(e) = &result { error!("SafeProvider eth_getBlockNumber failed: {}", e); } @@ -137,8 +139,9 @@ impl SafeProvider { ) -> Result, RpcError> { debug!("SafeProvider eth_getBlockByHash called with hash: {:?}", hash); let provider = self.provider.clone(); - let result = - self.retry_with_timeout(|| async { provider.get_block_by_hash(hash).await }).await; + let result = self + .retry_with_total_timeout(|| async { provider.get_block_by_hash(hash).await }) + .await; if let Err(e) = &result { error!("SafeProvider eth_getBlockByHash failed: {}", e); } @@ -156,7 +159,8 @@ impl SafeProvider { ) -> Result, RpcError> { debug!("eth_getLogs called with filter: {:?}", filter); let provider = self.provider.clone(); - let result = self.retry_with_timeout(|| async { provider.get_logs(filter).await }).await; + let result = + self.retry_with_total_timeout(|| async { provider.get_logs(filter).await }).await; if let Err(e) = &result { error!("SafeProvider eth_getLogs failed: {}", e); } @@ -173,19 +177,20 @@ impl SafeProvider { ) -> Result, RpcError> { debug!("eth_subscribe called"); let provider = self.provider.clone(); - let result = self.retry_with_timeout(|| async { provider.subscribe_blocks().await }).await; + let result = + self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; if let Err(e) = &result { error!("SafeProvider eth_subscribe failed: {}", e); } result } - /// Execute `operation` with exponential backoff and a total timeout. + /// Execute `operation` with exponential backoff respecting only the backoff budget. /// /// # Errors /// Returns `RpcError` if all attempts fail or the - /// total delay exceeds the configured timeout. - pub(crate) async fn retry_with_timeout( + /// cumulative backoff delay exceeds the configured budget. + async fn retry_with_timeout( &self, operation: F, ) -> Result> @@ -195,11 +200,34 @@ impl SafeProvider { { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) - .with_total_delay(Some(self.max_timeout)) .with_min_delay(self.retry_interval); operation.retry(retry_strategy).sleep(tokio::time::sleep).await } + + /// Execute `operation` with exponential backoff and a true total timeout. + /// + /// Wraps the retry logic with `tokio::time::timeout(self.max_timeout, ...)` so + /// the entire operation (including time spent inside the RPC call) cannot exceed + /// `max_timeout`. + /// + /// # Errors + /// - Returns `RpcError` with message "total operation timeout exceeded" if + /// the overall timeout elapses. + /// - Propagates any `RpcError` from the underlying retries. + async fn retry_with_total_timeout( + &self, + operation: F, + ) -> Result> + where + F: Fn() -> Fut, + Fut: Future>>, + { + match tokio::time::timeout(self.max_timeout, self.retry_with_timeout(operation)).await { + Ok(res) => res, + Err(_) => Err(TransportErrorKind::custom_str("total operation timeout exceeded")), + } + } } #[cfg(test)] @@ -207,6 +235,7 @@ mod tests { use super::*; use alloy::network::Ethereum; use std::sync::{Arc, Mutex}; + use tokio::time::sleep; fn create_test_provider( timeout: Duration, @@ -297,4 +326,19 @@ mod tests { assert!(result.is_err()); assert_eq!(*call_count.lock().unwrap(), 3); } + + #[tokio::test] + async fn test_retry_with_timeout_respects_total_delay() { + let max_timeout = Duration::from_millis(50); + let provider = create_test_provider(max_timeout, 10, Duration::from_millis(1)); + + let result = provider + .retry_with_total_timeout(move || async move { + sleep(max_timeout + Duration::from_millis(10)).await; + Ok(42) + }) + .await; + + assert!(result.is_err()); + } } From 471f767795d50f7db47a5d3830f979562c5bb2ba Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 20:15:25 +0900 Subject: [PATCH 17/47] ref: imporving tracing message --- src/safe_provider.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index bbd8648a..9cc72e49 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -38,7 +38,7 @@ use alloy::{ transports::{RpcError, TransportErrorKind}, }; use backon::{ExponentialBuilder, Retryable}; -use tracing::{debug, error}; +use tracing::{error, info}; // RPC retry and timeout settings /// Default timeout used by `SafeProvider` @@ -87,11 +87,6 @@ impl SafeProvider { self } - #[must_use] - pub fn inner(&self) -> &RootProvider { - &self.provider - } - /// Fetch a block by number with retry and timeout. /// /// # Errors @@ -101,13 +96,13 @@ impl SafeProvider { &self, number: BlockNumberOrTag, ) -> Result, RpcError> { - debug!("SafeProvider eth_getBlockByNumber called with number: {:?}", number); + info!("eth_getBlockByNumber called"); let provider = self.provider.clone(); let result = self .retry_with_total_timeout(|| async { provider.get_block_by_number(number).await }) .await; if let Err(e) = &result { - error!("SafeProvider eth_getByBlockNumber failed: {}", e); + error!("eth_getByBlockNumber failed: {}", e); } result } @@ -118,12 +113,12 @@ impl SafeProvider { /// Returns `RpcError` if the RPC call fails /// after exhausting retries or times out. pub async fn get_block_number(&self) -> Result> { - debug!("SafeProvider eth_getBlockNumber called"); + info!("eth_getBlockNumber called"); let provider = self.provider.clone(); let result = self.retry_with_total_timeout(|| async { provider.get_block_number().await }).await; if let Err(e) = &result { - error!("SafeProvider eth_getBlockNumber failed: {}", e); + error!("eth_getBlockNumber failed: {}", e); } result } @@ -137,13 +132,13 @@ impl SafeProvider { &self, hash: alloy::primitives::BlockHash, ) -> Result, RpcError> { - debug!("SafeProvider eth_getBlockByHash called with hash: {:?}", hash); + info!("eth_getBlockByHash called"); let provider = self.provider.clone(); let result = self .retry_with_total_timeout(|| async { provider.get_block_by_hash(hash).await }) .await; if let Err(e) = &result { - error!("SafeProvider eth_getBlockByHash failed: {}", e); + error!("eth_getBlockByHash failed: {}", e); } result } @@ -157,12 +152,12 @@ impl SafeProvider { &self, filter: &Filter, ) -> Result, RpcError> { - debug!("eth_getLogs called with filter: {:?}", filter); + info!("eth_getLogs called"); let provider = self.provider.clone(); let result = self.retry_with_total_timeout(|| async { provider.get_logs(filter).await }).await; if let Err(e) = &result { - error!("SafeProvider eth_getLogs failed: {}", e); + error!("eth_getLogs failed: {}", e); } result } @@ -175,12 +170,12 @@ impl SafeProvider { pub async fn subscribe_blocks( &self, ) -> Result, RpcError> { - debug!("eth_subscribe called"); + info!("eth_subscribe called"); let provider = self.provider.clone(); let result = self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; if let Err(e) = &result { - error!("SafeProvider eth_subscribe failed: {}", e); + error!("eth_subscribe failed: {}", e); } result } From 855e16746d177a164c3bc5c531966dd646c09bcf Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 20:20:08 +0900 Subject: [PATCH 18/47] ref: collapse timeout fn to one --- src/safe_provider.rs | 71 ++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 9cc72e49..9eca83be 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -90,8 +90,9 @@ impl SafeProvider { /// Fetch a block by number with retry and timeout. /// /// # Errors - /// Returns `RpcError` if the RPC call fails - /// after exhausting retries or times out. + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. pub async fn get_block_by_number( &self, number: BlockNumberOrTag, @@ -110,8 +111,9 @@ impl SafeProvider { /// Fetch the latest block number with retry and timeout. /// /// # Errors - /// Returns `RpcError` if the RPC call fails - /// after exhausting retries or times out. + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. pub async fn get_block_number(&self) -> Result> { info!("eth_getBlockNumber called"); let provider = self.provider.clone(); @@ -126,8 +128,9 @@ impl SafeProvider { /// Fetch a block by hash with retry and timeout. /// /// # Errors - /// Returns `RpcError` if the RPC call fails - /// after exhausting retries or times out. + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, @@ -146,8 +149,9 @@ impl SafeProvider { /// Fetch logs for the given filter with retry and timeout. /// /// # Errors - /// Returns `RpcError` if the RPC call fails - /// after exhausting retries or times out. + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. pub async fn get_logs( &self, filter: &Filter, @@ -165,8 +169,9 @@ impl SafeProvider { /// Subscribe to new block headers with retry and timeout. /// /// # Errors - /// Returns `RpcError` if the subscription - /// cannot be established after retries or times out. + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. pub async fn subscribe_blocks( &self, ) -> Result, RpcError> { @@ -180,36 +185,17 @@ impl SafeProvider { result } - /// Execute `operation` with exponential backoff respecting only the backoff budget. - /// - /// # Errors - /// Returns `RpcError` if all attempts fail or the - /// cumulative backoff delay exceeds the configured budget. - async fn retry_with_timeout( - &self, - operation: F, - ) -> Result> - where - F: Fn() -> Fut, - Fut: Future>>, - { - let retry_strategy = ExponentialBuilder::default() - .with_max_times(self.max_retries) - .with_min_delay(self.retry_interval); - - operation.retry(retry_strategy).sleep(tokio::time::sleep).await - } - - /// Execute `operation` with exponential backoff and a true total timeout. + /// Execute `operation` with exponential backoff and a total timeout. /// /// Wraps the retry logic with `tokio::time::timeout(self.max_timeout, ...)` so /// the entire operation (including time spent inside the RPC call) cannot exceed /// `max_timeout`. /// /// # Errors - /// - Returns `RpcError` with message "total operation timeout exceeded" if - /// the overall timeout elapses. - /// - Propagates any `RpcError` from the underlying retries. + /// + /// - Returns [`RpcError`] with message "total operation timeout exceeded" + /// if the overall timeout elapses. + /// - Propagates any [`RpcError`] from the underlying retries. async fn retry_with_total_timeout( &self, operation: F, @@ -218,7 +204,16 @@ impl SafeProvider { F: Fn() -> Fut, Fut: Future>>, { - match tokio::time::timeout(self.max_timeout, self.retry_with_timeout(operation)).await { + let retry_strategy = ExponentialBuilder::default() + .with_max_times(self.max_retries) + .with_min_delay(self.retry_interval); + + match tokio::time::timeout( + self.max_timeout, + operation.retry(retry_strategy).sleep(tokio::time::sleep), + ) + .await + { Ok(res) => res, Err(_) => Err(TransportErrorKind::custom_str("total operation timeout exceeded")), } @@ -254,7 +249,7 @@ mod tests { let call_count_clone = call_count.clone(); let result = provider - .retry_with_timeout(move || { + .retry_with_total_timeout(move || { let count = call_count_clone.clone(); async move { let mut c = count.lock().unwrap(); @@ -278,7 +273,7 @@ mod tests { let call_count_clone = call_count.clone(); let result = provider - .retry_with_timeout(move || { + .retry_with_total_timeout(move || { let count = call_count_clone.clone(); async move { let mut c = count.lock().unwrap(); @@ -306,7 +301,7 @@ mod tests { let call_count_clone = call_count.clone(); let result = provider - .retry_with_timeout(move || { + .retry_with_total_timeout(move || { let count = call_count_clone.clone(); async move { let mut c = count.lock().unwrap(); From ff40e8aad6d41a420c2c4f25466682c0802c252c Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 20:24:15 +0900 Subject: [PATCH 19/47] ref: better syntax --- src/safe_provider.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 9eca83be..eb5241ce 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -116,9 +116,8 @@ impl SafeProvider { /// after exhausting retries or if the call times out. pub async fn get_block_number(&self) -> Result> { info!("eth_getBlockNumber called"); - let provider = self.provider.clone(); - let result = - self.retry_with_total_timeout(|| async { provider.get_block_number().await }).await; + let operation = || self.provider.get_block_number(); + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!("eth_getBlockNumber failed: {}", e); } From 992853caf99d17f5ba0821c4c07f06a817ea4d16 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 20:27:54 +0900 Subject: [PATCH 20/47] ref: remove with and address default nit --- src/block_range_scanner.rs | 9 +++------ src/safe_provider.rs | 6 +++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 50368d66..9c86fb93 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -331,9 +331,9 @@ impl BlockRangeScanner { provider: RootProvider, ) -> TransportResult> { let safe_provider = SafeProvider::new(provider) - .with_max_timeout(self.max_timeout) - .with_max_retries(self.max_retries) - .with_retry_interval(self.retry_interval); + .max_timeout(self.max_timeout) + .max_retries(self.max_retries) + .retry_interval(self.retry_interval); Ok(ConnectedBlockRangeScanner { provider: safe_provider, @@ -1138,9 +1138,6 @@ mod tests { fn mocked_provider(asserter: Asserter) -> SafeProvider { let root_provider = RootProvider::new(RpcClient::mocked(asserter)); SafeProvider::new(root_provider) - .with_max_timeout(DEFAULT_MAX_TIMEOUT) - .with_max_retries(DEFAULT_MAX_RETRIES) - .with_retry_interval(DEFAULT_RETRY_INTERVAL) } #[test] diff --git a/src/safe_provider.rs b/src/safe_provider.rs index eb5241ce..41e48b9a 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -70,19 +70,19 @@ impl SafeProvider { } #[must_use] - pub fn with_max_timeout(mut self, timeout: Duration) -> Self { + pub fn max_timeout(mut self, timeout: Duration) -> Self { self.max_timeout = timeout; self } #[must_use] - pub fn with_max_retries(mut self, max_retries: usize) -> Self { + pub fn max_retries(mut self, max_retries: usize) -> Self { self.max_retries = max_retries; self } #[must_use] - pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self { + pub fn retry_interval(mut self, retry_interval: Duration) -> Self { self.retry_interval = retry_interval; self } From ab35dc52aefe1add867b0a071874610b40c316f3 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 20:31:09 +0900 Subject: [PATCH 21/47] fix: doctest --- src/safe_provider.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 41e48b9a..9e4f2798 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -18,9 +18,8 @@ //! let provider = RootProvider::::new( //! ClientBuilder::default().ws(WsConnect::new("wss://localhost:8000")).await?, //! ); -//! let safe_provider = SafeProvider::new(provider) -//! .with_max_timeout(Duration::from_secs(30)) -//! .with_max_retries(5); +//! let safe_provider = +//! SafeProvider::new(provider).max_timeout(Duration::from_secs(30)).max_retries(5); //! //! let block = safe_provider.get_block_by_number(12345.into()).await?; //! Ok(()) From cdf9b9539ebc21de67dd15cab71222617e7d24f1 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 22:39:49 +0900 Subject: [PATCH 22/47] Update src/safe_provider.rs Co-authored-by: Nenad --- src/safe_provider.rs | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 41e48b9a..b19195b7 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -226,41 +226,34 @@ mod tests { use std::sync::{Arc, Mutex}; use tokio::time::sleep; - fn create_test_provider( - timeout: Duration, + fn test_provider( + timeout: u64, max_retries: usize, - retry_interval: Duration, + retry_interval: u64, ) -> SafeProvider { SafeProvider { - provider: RootProvider::::new_http("http://localhost:8545".parse().unwrap()), - max_timeout: timeout, + provider: RootProvider::new_http("http://localhost:8545".parse().unwrap()), + max_timeout: Duration::from_millis(timeout), max_retries, - retry_interval, + retry_interval: Duration::from_millis(retry_interval), } } #[tokio::test] async fn test_retry_with_timeout_succeeds_on_first_attempt() { - let provider = - create_test_provider(Duration::from_millis(100), 3, Duration::from_millis(10)); + let provider = test_provider(100, 3, 10); - let call_count = Arc::new(Mutex::new(0)); - let call_count_clone = call_count.clone(); + let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(move || { - let count = call_count_clone.clone(); - async move { - let mut c = count.lock().unwrap(); - *c += 1; - Ok(42) - } + .retry_with_timeout(|| async { + call_count.fetch_add(1, Ordering::SeqCst); + Ok(42) }) .await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), 42); - assert_eq!(*call_count.lock().unwrap(), 1); + assert!(matches!(result, Ok(42))); + assert_eq!(call_count.load(Ordering::SeqCst), 1); } #[tokio::test] From 74cd3d7766ddc2c5673a3357d09d547eff4db9bd Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 22:51:21 +0900 Subject: [PATCH 23/47] ref: use atomic usize --- src/safe_provider.rs | 56 +++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index cfe3a258..00483d63 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -222,7 +222,7 @@ impl SafeProvider { mod tests { use super::*; use alloy::network::Ethereum; - use std::sync::{Arc, Mutex}; + use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::time::sleep; fn test_provider( @@ -245,7 +245,7 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .retry_with_timeout(|| async { + .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); Ok(42) }) @@ -257,65 +257,53 @@ mod tests { #[tokio::test] async fn test_retry_with_timeout_retries_on_error() { - let provider = - create_test_provider(Duration::from_millis(100), 3, Duration::from_millis(10)); + let provider = test_provider(100, 3, 10); - let call_count = Arc::new(Mutex::new(0)); - let call_count_clone = call_count.clone(); + let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(move || { - let count = call_count_clone.clone(); - async move { - let mut c = count.lock().unwrap(); - *c += 1; - if *c < 3 { - Err(TransportErrorKind::custom_str("temporary error")) - } else { - Ok(42) - } + .retry_with_total_timeout(|| async { + call_count.fetch_add(1, Ordering::SeqCst); + if call_count.load(Ordering::SeqCst) < 3 { + Err(TransportErrorKind::custom_str("temporary error")) + } else { + Ok(42) } }) .await; assert!(result.is_ok()); assert_eq!(result.unwrap(), 42); - assert_eq!(*call_count.lock().unwrap(), 3); + assert_eq!(call_count.load(Ordering::SeqCst), 3); } #[tokio::test] async fn test_retry_with_timeout_fails_after_max_retries() { - let provider = - create_test_provider(Duration::from_millis(100), 2, Duration::from_millis(10)); + let provider = test_provider(100, 2, 10); - let call_count = Arc::new(Mutex::new(0)); - let call_count_clone = call_count.clone(); + let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(move || { - let count = call_count_clone.clone(); - async move { - let mut c = count.lock().unwrap(); - *c += 1; - Err::>(TransportErrorKind::custom_str( - "permanent error", - )) - } + .retry_with_total_timeout(|| async { + call_count.fetch_add(1, Ordering::SeqCst); + Err::>(TransportErrorKind::custom_str( + "permanent error", + )) }) .await; assert!(result.is_err()); - assert_eq!(*call_count.lock().unwrap(), 3); + assert_eq!(call_count.load(Ordering::SeqCst), 3); } #[tokio::test] async fn test_retry_with_timeout_respects_total_delay() { - let max_timeout = Duration::from_millis(50); - let provider = create_test_provider(max_timeout, 10, Duration::from_millis(1)); + let max_timeout = 50; + let provider = test_provider(max_timeout, 10, 1); let result = provider .retry_with_total_timeout(move || async move { - sleep(max_timeout + Duration::from_millis(10)).await; + sleep(Duration::from_millis(max_timeout + 10)).await; Ok(42) }) .await; From 41cc7339bb83ede727cded2958792a9f14ecf834 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 22:57:45 +0900 Subject: [PATCH 24/47] ref: update test to match for error --- src/safe_provider.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 00483d63..3b48bb19 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -292,7 +292,8 @@ mod tests { }) .await; - assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("permanent error"),); assert_eq!(call_count.load(Ordering::SeqCst), 3); } @@ -308,6 +309,7 @@ mod tests { }) .await; - assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("total operation timeout exceeded"),); } } From 4af01ab4a03b79879b8f178e9755c7b7b742a6e7 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 23:08:30 +0900 Subject: [PATCH 25/47] ref: update doc --- src/safe_provider.rs | 72 +++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 3b48bb19..83605185 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -1,31 +1,3 @@ -//! Safe provider wrapper with built-in retry and timeout mechanisms. -//! -//! This module provides a wrapper around Alloy providers that automatically -//! handles retries, timeouts, and error logging for RPC calls. -//! -//! # Example -//! -//! ```rust,no_run -//! use alloy::{ -//! network::Ethereum, -//! providers::{RootProvider, WsConnect}, -//! rpc::client::ClientBuilder, -//! }; -//! use event_scanner::safe_provider::SafeProvider; -//! use std::time::Duration; -//! -//! async fn example() -> Result<(), Box> { -//! let provider = RootProvider::::new( -//! ClientBuilder::default().ws(WsConnect::new("wss://localhost:8000")).await?, -//! ); -//! let safe_provider = -//! SafeProvider::new(provider).max_timeout(Duration::from_secs(30)).max_retries(5); -//! -//! let block = safe_provider.get_block_by_number(12345.into()).await?; -//! Ok(()) -//! } -//! ``` - use std::{future::Future, time::Duration}; use alloy::{ @@ -39,15 +11,33 @@ use alloy::{ use backon::{ExponentialBuilder, Retryable}; use tracing::{error, info}; -// RPC retry and timeout settings -/// Default timeout used by `SafeProvider` -pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); -/// Default maximum number of retry attempts. -pub const DEFAULT_MAX_RETRIES: usize = 5; -/// Default base delay between retries. -pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); - -/// Provider wrapper adding retries and timeouts. +/// Safe provider wrapper with built-in retry and timeout mechanisms. +/// +/// This wrapper around Alloy providers automatically handles retries, +/// timeouts, and error logging for RPC calls. +/// +/// # Example +/// +/// ```rust,no_run +/// # use alloy::{ +/// # network::Ethereum, +/// # providers::{RootProvider, WsConnect}, +/// # rpc::client::ClientBuilder, +/// # }; +/// # use event_scanner::safe_provider::SafeProvider; +/// # use std::time::Duration; +/// +/// async fn create_safe_provider() -> Result<(), Box> { +/// let provider = RootProvider::::new( +/// ClientBuilder::default().ws(WsConnect::new("wss://localhost:8000")).await?, +/// ); +/// let safe_provider = +/// SafeProvider::new(provider).max_timeout(Duration::from_secs(30)).max_retries(5); +/// +/// let block = safe_provider.get_block_by_number(12345.into()).await?; +/// Ok(()) +/// } +/// ``` #[derive(Clone)] pub struct SafeProvider { provider: RootProvider, @@ -56,6 +46,14 @@ pub struct SafeProvider { retry_interval: Duration, } +// RPC retry and timeout settings +/// Default timeout used by `SafeProvider` +pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); +/// Default maximum number of retry attempts. +pub const DEFAULT_MAX_RETRIES: usize = 5; +/// Default base delay between retries. +pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); + impl SafeProvider { /// Create a new `SafeProvider` with default settings. #[must_use] From 3329c3847ce8f1d143bb9c856437dec644e3537e Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 23:21:33 +0900 Subject: [PATCH 26/47] fix: merge errors with connect methods --- src/block_range_scanner.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index f6e99b87..1fe78f56 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -82,6 +82,7 @@ use alloy::{ eips::BlockNumberOrTag, network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::{B256, BlockNumber}, + providers::RootProvider, pubsub::Subscription, rpc::client::ClientBuilder, transports::{ @@ -173,7 +174,7 @@ impl BlockRangeScanner { ws_url: Url, ) -> TransportResult> { let provider = - SafeProvider::::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?); + RootProvider::::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?); Ok(self.connect(provider)) } @@ -186,7 +187,7 @@ impl BlockRangeScanner { self, ipc_path: String, ) -> Result, RpcError> { - let provider = SafeProvider::::new(ClientBuilder::default().ipc(ipc_path.into()).await?); + let provider = RootProvider::::new(ClientBuilder::default().ipc(ipc_path.into()).await?); Ok(self.connect(provider)) } @@ -196,8 +197,15 @@ impl BlockRangeScanner { /// /// Returns an error if the connection fails #[must_use] - pub fn connect(self, provider: SafeProvider) -> ConnectedBlockRangeScanner { - ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range } + pub fn connect(self, provider: RootProvider) -> ConnectedBlockRangeScanner { + let safe_provider = SafeProvider::new(provider) + .max_timeout(self.max_timeout) + .max_retries(self.max_retries) + .retry_interval(self.retry_interval); + ConnectedBlockRangeScanner { + provider: safe_provider, + max_block_range: self.max_block_range, + } } } From 586b03e6f21804b4387d4388b77e3bc8e187a54a Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 23:22:33 +0900 Subject: [PATCH 27/47] fix: root --> safe provider --- src/event_scanner/modes/common.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index a841f4fb..b31c4ded 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,10 +3,10 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, + safe_provider::SafeProvider, }; use alloy::{ network::Network, - providers::{Provider, RootProvider}, rpc::types::{Filter, Log}, transports::{RpcError, TransportErrorKind}, }; @@ -25,7 +25,7 @@ pub enum ConsumerMode { pub async fn handle_stream( mut stream: ReceiverStream, - provider: &RootProvider, + provider: &SafeProvider, listeners: &[EventListener], mode: ConsumerMode, ) { @@ -42,7 +42,7 @@ pub async fn handle_stream( } pub fn spawn_log_consumers( - provider: &RootProvider, + provider: &SafeProvider, listeners: &[EventListener], range_tx: &Sender, mode: ConsumerMode, @@ -129,7 +129,7 @@ async fn get_logs( range: RangeInclusive, event_filter: &EventFilter, log_filter: &Filter, - provider: &RootProvider, + provider: &SafeProvider, ) -> Result, RpcError> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); From 8a080e45d091a3a598e58126019debda4722b5cf Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 23:28:23 +0900 Subject: [PATCH 28/47] ref: tracing update --- src/lib.rs | 2 +- src/safe_provider.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5034a7e1..28165f66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod block_range_scanner; pub mod error; pub mod event_scanner; -pub mod safe_provider; +mod safe_provider; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; pub mod types; diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 83605185..539b75c4 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -100,7 +100,7 @@ impl SafeProvider { .retry_with_total_timeout(|| async { provider.get_block_by_number(number).await }) .await; if let Err(e) = &result { - error!("eth_getByBlockNumber failed: {}", e); + error!(error = %e, "eth_getByBlockNumber failed"); } result } @@ -116,7 +116,7 @@ impl SafeProvider { let operation = || self.provider.get_block_number(); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { - error!("eth_getBlockNumber failed: {}", e); + error!(error = %e, "eth_getBlockNumber failed"); } result } @@ -137,7 +137,7 @@ impl SafeProvider { .retry_with_total_timeout(|| async { provider.get_block_by_hash(hash).await }) .await; if let Err(e) = &result { - error!("eth_getBlockByHash failed: {}", e); + error!(error = %e, "eth_getBlockByHash failed"); } result } @@ -157,7 +157,7 @@ impl SafeProvider { let result = self.retry_with_total_timeout(|| async { provider.get_logs(filter).await }).await; if let Err(e) = &result { - error!("eth_getLogs failed: {}", e); + error!(error = %e, "eth_getLogs failed"); } result } @@ -176,7 +176,7 @@ impl SafeProvider { let result = self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; if let Err(e) = &result { - error!("eth_subscribe failed: {}", e); + error!(error = %e, "eth_subscribe failed"); } result } From 982ea96c67b06376e082a515bc2ecc53b6d9137a Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 23 Oct 2025 23:29:37 +0900 Subject: [PATCH 29/47] ref: remove doc --- src/safe_provider.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 539b75c4..4b310395 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -15,29 +15,6 @@ use tracing::{error, info}; /// /// This wrapper around Alloy providers automatically handles retries, /// timeouts, and error logging for RPC calls. -/// -/// # Example -/// -/// ```rust,no_run -/// # use alloy::{ -/// # network::Ethereum, -/// # providers::{RootProvider, WsConnect}, -/// # rpc::client::ClientBuilder, -/// # }; -/// # use event_scanner::safe_provider::SafeProvider; -/// # use std::time::Duration; -/// -/// async fn create_safe_provider() -> Result<(), Box> { -/// let provider = RootProvider::::new( -/// ClientBuilder::default().ws(WsConnect::new("wss://localhost:8000")).await?, -/// ); -/// let safe_provider = -/// SafeProvider::new(provider).max_timeout(Duration::from_secs(30)).max_retries(5); -/// -/// let block = safe_provider.get_block_by_number(12345.into()).await?; -/// Ok(()) -/// } -/// ``` #[derive(Clone)] pub struct SafeProvider { provider: RootProvider, From 49f06bc0cac45ad8dc8804382859a63b06f56e92 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 27 Oct 2025 19:01:17 +0900 Subject: [PATCH 30/47] ref: avoid clone provider when possible --- src/safe_provider.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 4b310395..d454c32c 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -72,10 +72,8 @@ impl SafeProvider { number: BlockNumberOrTag, ) -> Result, RpcError> { info!("eth_getBlockByNumber called"); - let provider = self.provider.clone(); - let result = self - .retry_with_total_timeout(|| async { provider.get_block_by_number(number).await }) - .await; + let operation = async || self.provider.get_block_by_number(number).await; + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); } @@ -109,10 +107,8 @@ impl SafeProvider { hash: alloy::primitives::BlockHash, ) -> Result, RpcError> { info!("eth_getBlockByHash called"); - let provider = self.provider.clone(); - let result = self - .retry_with_total_timeout(|| async { provider.get_block_by_hash(hash).await }) - .await; + let operation = async || self.provider.get_block_by_hash(hash).await; + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); } @@ -130,9 +126,8 @@ impl SafeProvider { filter: &Filter, ) -> Result, RpcError> { info!("eth_getLogs called"); - let provider = self.provider.clone(); - let result = - self.retry_with_total_timeout(|| async { provider.get_logs(filter).await }).await; + let operation = || self.provider.get_logs(filter); + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); } From 0546feb8c6b1d32a401215974341ab3d33d800b6 Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 27 Oct 2025 23:18:25 +0900 Subject: [PATCH 31/47] ref: update test --- src/safe_provider.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index d454c32c..d1b1d869 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -217,12 +217,11 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - Ok(42) + Ok(call_count.load(Ordering::SeqCst)) }) .await; - assert!(matches!(result, Ok(42))); - assert_eq!(call_count.load(Ordering::SeqCst), 1); + assert!(matches!(result, Ok(1))); } #[tokio::test] @@ -237,14 +236,12 @@ mod tests { if call_count.load(Ordering::SeqCst) < 3 { Err(TransportErrorKind::custom_str("temporary error")) } else { - Ok(42) + Ok(call_count.load(Ordering::SeqCst)) } }) .await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), 42); - assert_eq!(call_count.load(Ordering::SeqCst), 3); + assert!(matches!(result, Ok(3))); } #[tokio::test] From 01a9cf117697bc480d0be598fb4c9e0ec8eaf5eb Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 00:08:25 +0900 Subject: [PATCH 32/47] feat: add custom error for safe provider --- src/error.rs | 18 +++++++- src/event_scanner/message.rs | 9 +++- src/event_scanner/modes/common.rs | 5 +-- src/safe_provider.rs | 74 ++++++++++++++++++++----------- 4 files changed, 76 insertions(+), 30 deletions(-) diff --git a/src/error.rs b/src/error.rs index 20801b90..7eb6b887 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use alloy::{ }; use thiserror::Error; -use crate::block_range_scanner::Message; +use crate::{block_range_scanner::Message, safe_provider::SafeProviderError}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -42,6 +42,22 @@ pub enum ScannerError { #[error("Block not found, block number: {0}")] BlockNotFound(BlockNumberOrTag), + + #[error("Operation timed out")] + Timeout, + + #[error("Retry failed after {0} tries")] + RetryFail(usize), +} + +impl From for ScannerError { + fn from(error: SafeProviderError) -> ScannerError { + match error { + SafeProviderError::RpcError(err) => ScannerError::RpcError(err), + SafeProviderError::Timeout => ScannerError::Timeout, + SafeProviderError::RetryFail(num) => ScannerError::RetryFail(num), + } + } } impl From, ScannerError>> for Message { diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 5f916388..1a2480f0 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage}; +use crate::{ScannerError, ScannerMessage, safe_provider::SafeProviderError}; pub type Message = ScannerMessage, ScannerError>; @@ -10,6 +10,13 @@ impl From> for Message { } } +impl From for Message { + fn from(error: SafeProviderError) -> Message { + let scanner_error: ScannerError = error.into(); + scanner_error.into() + } +} + impl PartialEq> for Message { fn eq(&self, other: &Vec) -> bool { self.eq(&other.as_slice()) diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index b31c4ded..62cc0ca8 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,12 +3,11 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, - safe_provider::SafeProvider, + safe_provider::{SafeProvider, SafeProviderError}, }; use alloy::{ network::Network, rpc::types::{Filter, Log}, - transports::{RpcError, TransportErrorKind}, }; use tokio::sync::{ broadcast::{self, Sender, error::RecvError}, @@ -130,7 +129,7 @@ async fn get_logs( event_filter: &EventFilter, log_filter: &Filter, provider: &SafeProvider, -) -> Result, RpcError> { +) -> Result, SafeProviderError> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); match provider.get_logs(&log_filter).await { diff --git a/src/safe_provider.rs b/src/safe_provider.rs index d1b1d869..5563986d 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -1,4 +1,4 @@ -use std::{future::Future, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use alloy::{ eips::BlockNumberOrTag, @@ -9,8 +9,25 @@ use alloy::{ transports::{RpcError, TransportErrorKind}, }; use backon::{ExponentialBuilder, Retryable}; +use thiserror::Error; use tracing::{error, info}; +#[derive(Error, Debug, Clone)] +pub enum SafeProviderError { + #[error("RPC error: {0}")] + RpcError(Arc>), + #[error("Operation timed out")] + Timeout, + #[error("Retry failed after {0} tries")] + RetryFail(usize), +} + +impl From> for SafeProviderError { + fn from(err: RpcError) -> Self { + SafeProviderError::RpcError(Arc::new(err)) + } +} + /// Safe provider wrapper with built-in retry and timeout mechanisms. /// /// This wrapper around Alloy providers automatically handles retries, @@ -70,9 +87,11 @@ impl SafeProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result, RpcError> { + ) -> Result, SafeProviderError> { info!("eth_getBlockByNumber called"); - let operation = async || self.provider.get_block_by_number(number).await; + let operation = async || { + self.provider.get_block_by_number(number).await.map_err(SafeProviderError::from) + }; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); @@ -86,9 +105,10 @@ impl SafeProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_block_number(&self) -> Result> { + pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); - let operation = || self.provider.get_block_number(); + let operation = + async || self.provider.get_block_number().await.map_err(SafeProviderError::from); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockNumber failed"); @@ -105,9 +125,10 @@ impl SafeProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, RpcError> { + ) -> Result, SafeProviderError> { info!("eth_getBlockByHash called"); - let operation = async || self.provider.get_block_by_hash(hash).await; + let operation = + async || self.provider.get_block_by_hash(hash).await.map_err(SafeProviderError::from); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); @@ -121,12 +142,10 @@ impl SafeProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_logs( - &self, - filter: &Filter, - ) -> Result, RpcError> { + pub async fn get_logs(&self, filter: &Filter) -> Result, SafeProviderError> { info!("eth_getLogs called"); - let operation = || self.provider.get_logs(filter); + let operation = + async || self.provider.get_logs(filter).await.map_err(SafeProviderError::from); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); @@ -142,11 +161,14 @@ impl SafeProvider { /// after exhausting retries or if the call times out. pub async fn subscribe_blocks( &self, - ) -> Result, RpcError> { + ) -> Result, SafeProviderError> { info!("eth_subscribe called"); let provider = self.provider.clone(); - let result = - self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; + let result = self + .retry_with_total_timeout(|| async { + provider.subscribe_blocks().await.map_err(SafeProviderError::from) + }) + .await; if let Err(e) = &result { error!(error = %e, "eth_subscribe failed"); } @@ -167,10 +189,10 @@ impl SafeProvider { async fn retry_with_total_timeout( &self, operation: F, - ) -> Result> + ) -> Result where F: Fn() -> Fut, - Fut: Future>>, + Fut: Future>, { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) @@ -182,8 +204,9 @@ impl SafeProvider { ) .await { - Ok(res) => res, - Err(_) => Err(TransportErrorKind::custom_str("total operation timeout exceeded")), + Ok(Ok(res)) => Ok(res), + Ok(Err(_)) => Err(SafeProviderError::RetryFail(self.max_retries + 1)), + Err(_) => Err(SafeProviderError::Timeout), } } } @@ -234,7 +257,9 @@ mod tests { .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); if call_count.load(Ordering::SeqCst) < 3 { - Err(TransportErrorKind::custom_str("temporary error")) + Err(SafeProviderError::RpcError(Arc::new(TransportErrorKind::custom_str( + "temp error", + )))) } else { Ok(call_count.load(Ordering::SeqCst)) } @@ -253,14 +278,13 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - Err::>(TransportErrorKind::custom_str( - "permanent error", - )) + // permanent error + Err::(SafeProviderError::Timeout) }) .await; let err = result.unwrap_err(); - assert!(err.to_string().contains("permanent error"),); + assert!(matches!(err, SafeProviderError::RetryFail(3))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } @@ -277,6 +301,6 @@ mod tests { .await; let err = result.unwrap_err(); - assert!(err.to_string().contains("total operation timeout exceeded"),); + assert!(matches!(err, SafeProviderError::Timeout)); } } From a82b12fa817d6f6e5012ae9e1ddfbe43373341ea Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 00:11:02 +0900 Subject: [PATCH 33/47] ref: remove moves --- src/safe_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/safe_provider.rs b/src/safe_provider.rs index 5563986d..91547e85 100644 --- a/src/safe_provider.rs +++ b/src/safe_provider.rs @@ -294,7 +294,7 @@ mod tests { let provider = test_provider(max_timeout, 10, 1); let result = provider - .retry_with_total_timeout(move || async move { + .retry_with_total_timeout(|| async { sleep(Duration::from_millis(max_timeout + 10)).await; Ok(42) }) From 1ee07ca6ece43f4f9cd584c9550758c062a4a4ee Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 17:11:20 +0900 Subject: [PATCH 34/47] ref: rename safe to robust provider --- src/block_range_scanner.rs | 26 +++++----- src/error.rs | 12 ++--- src/event_scanner/message.rs | 6 +-- src/event_scanner/modes/common.rs | 10 ++-- src/lib.rs | 2 +- src/{safe_provider.rs => robust_provider.rs} | 54 ++++++++++---------- 6 files changed, 55 insertions(+), 55 deletions(-) rename src/{safe_provider.rs => robust_provider.rs} (85%) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 1fe78f56..a690a4a2 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -72,8 +72,8 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ error::ScannerError, - safe_provider::{ - DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, SafeProvider, + robust_provider::{ + DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, RobustProvider, }, types::{ScannerMessage, ScannerStatus}, }; @@ -198,26 +198,26 @@ impl BlockRangeScanner { /// Returns an error if the connection fails #[must_use] pub fn connect(self, provider: RootProvider) -> ConnectedBlockRangeScanner { - let safe_provider = SafeProvider::new(provider) + let robust_provider = RobustProvider::new(provider) .max_timeout(self.max_timeout) .max_retries(self.max_retries) .retry_interval(self.retry_interval); ConnectedBlockRangeScanner { - provider: safe_provider, + provider: robust_provider, max_block_range: self.max_block_range, } } } pub struct ConnectedBlockRangeScanner { - provider: SafeProvider, + provider: RobustProvider, max_block_range: u64, } impl ConnectedBlockRangeScanner { - /// Returns the `SafeProvider` + /// Returns the `RobustProvider` #[must_use] - pub fn provider(&self) -> &SafeProvider { + pub fn provider(&self) -> &RobustProvider { &self.provider } @@ -269,7 +269,7 @@ pub enum Command { } struct Service { - provider: SafeProvider, + provider: RobustProvider, max_block_range: u64, subscriber: Option>, websocket_connected: bool, @@ -280,7 +280,7 @@ struct Service { } impl Service { - pub fn new(provider: SafeProvider, max_block_range: u64) -> (Self, mpsc::Sender) { + pub fn new(provider: RobustProvider, max_block_range: u64) -> (Self, mpsc::Sender) { let (cmd_tx, cmd_rx) = mpsc::channel(100); let service = Self { @@ -678,7 +678,7 @@ impl Service { async fn stream_live_blocks( mut range_start: BlockNumber, - provider: SafeProvider, + provider: RobustProvider, sender: mpsc::Sender, block_confirmations: u64, max_block_range: u64, @@ -783,7 +783,7 @@ impl Service { } async fn get_block_subscription( - provider: &SafeProvider, + provider: &RobustProvider, ) -> Result, ScannerError> { let ws_stream = provider .subscribe_blocks() @@ -1018,9 +1018,9 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::StreamExt; - fn mocked_provider(asserter: Asserter) -> SafeProvider { + fn mocked_provider(asserter: Asserter) -> RobustProvider { let root_provider = RootProvider::new(RpcClient::mocked(asserter)); - SafeProvider::new(root_provider) + RobustProvider::new(root_provider) } #[test] diff --git a/src/error.rs b/src/error.rs index 7eb6b887..4ea30a15 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use alloy::{ }; use thiserror::Error; -use crate::{block_range_scanner::Message, safe_provider::SafeProviderError}; +use crate::{block_range_scanner::Message, robust_provider::RobustProviderError}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -50,12 +50,12 @@ pub enum ScannerError { RetryFail(usize), } -impl From for ScannerError { - fn from(error: SafeProviderError) -> ScannerError { +impl From for ScannerError { + fn from(error: RobustProviderError) -> ScannerError { match error { - SafeProviderError::RpcError(err) => ScannerError::RpcError(err), - SafeProviderError::Timeout => ScannerError::Timeout, - SafeProviderError::RetryFail(num) => ScannerError::RetryFail(num), + RobustProviderError::RpcError(err) => ScannerError::RpcError(err), + RobustProviderError::Timeout => ScannerError::Timeout, + RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num), } } } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 1a2480f0..33da6c61 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, safe_provider::SafeProviderError}; +use crate::{ScannerError, ScannerMessage, robust_provider::RobustProviderError}; pub type Message = ScannerMessage, ScannerError>; @@ -10,8 +10,8 @@ impl From> for Message { } } -impl From for Message { - fn from(error: SafeProviderError) -> Message { +impl From for Message { + fn from(error: RobustProviderError) -> Message { let scanner_error: ScannerError = error.into(); scanner_error.into() } diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index 62cc0ca8..59748dce 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, - safe_provider::{SafeProvider, SafeProviderError}, + robust_provider::{RobustProvider, RobustProviderError}, }; use alloy::{ network::Network, @@ -24,7 +24,7 @@ pub enum ConsumerMode { pub async fn handle_stream( mut stream: ReceiverStream, - provider: &SafeProvider, + provider: &RobustProvider, listeners: &[EventListener], mode: ConsumerMode, ) { @@ -41,7 +41,7 @@ pub async fn handle_stream( } pub fn spawn_log_consumers( - provider: &SafeProvider, + provider: &RobustProvider, listeners: &[EventListener], range_tx: &Sender, mode: ConsumerMode, @@ -128,8 +128,8 @@ async fn get_logs( range: RangeInclusive, event_filter: &EventFilter, log_filter: &Filter, - provider: &SafeProvider, -) -> Result, SafeProviderError> { + provider: &RobustProvider, +) -> Result, RobustProviderError> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); match provider.get_logs(&log_filter).await { diff --git a/src/lib.rs b/src/lib.rs index 28165f66..69d93152 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod block_range_scanner; pub mod error; pub mod event_scanner; -mod safe_provider; +mod robust_provider; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; pub mod types; diff --git a/src/safe_provider.rs b/src/robust_provider.rs similarity index 85% rename from src/safe_provider.rs rename to src/robust_provider.rs index 91547e85..1b8aab78 100644 --- a/src/safe_provider.rs +++ b/src/robust_provider.rs @@ -13,7 +13,7 @@ use thiserror::Error; use tracing::{error, info}; #[derive(Error, Debug, Clone)] -pub enum SafeProviderError { +pub enum RobustProviderError { #[error("RPC error: {0}")] RpcError(Arc>), #[error("Operation timed out")] @@ -22,9 +22,9 @@ pub enum SafeProviderError { RetryFail(usize), } -impl From> for SafeProviderError { +impl From> for RobustProviderError { fn from(err: RpcError) -> Self { - SafeProviderError::RpcError(Arc::new(err)) + RobustProviderError::RpcError(Arc::new(err)) } } @@ -33,7 +33,7 @@ impl From> for SafeProviderError { /// This wrapper around Alloy providers automatically handles retries, /// timeouts, and error logging for RPC calls. #[derive(Clone)] -pub struct SafeProvider { +pub struct RobustProvider { provider: RootProvider, max_timeout: Duration, max_retries: usize, @@ -41,15 +41,15 @@ pub struct SafeProvider { } // RPC retry and timeout settings -/// Default timeout used by `SafeProvider` +/// Default timeout used by `RobustProvider` pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); /// Default maximum number of retry attempts. pub const DEFAULT_MAX_RETRIES: usize = 5; /// Default base delay between retries. pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); -impl SafeProvider { - /// Create a new `SafeProvider` with default settings. +impl RobustProvider { + /// Create a new `RobustProvider` with default settings. #[must_use] pub fn new(provider: RootProvider) -> Self { Self { @@ -87,10 +87,10 @@ impl SafeProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result, SafeProviderError> { + ) -> Result, RobustProviderError> { info!("eth_getBlockByNumber called"); let operation = async || { - self.provider.get_block_by_number(number).await.map_err(SafeProviderError::from) + self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from) }; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { @@ -105,10 +105,10 @@ impl SafeProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_block_number(&self) -> Result { + pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); let operation = - async || self.provider.get_block_number().await.map_err(SafeProviderError::from); + async || self.provider.get_block_number().await.map_err(RobustProviderError::from); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockNumber failed"); @@ -125,10 +125,10 @@ impl SafeProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, SafeProviderError> { + ) -> Result, RobustProviderError> { info!("eth_getBlockByHash called"); let operation = - async || self.provider.get_block_by_hash(hash).await.map_err(SafeProviderError::from); + async || self.provider.get_block_by_hash(hash).await.map_err(RobustProviderError::from); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); @@ -142,10 +142,10 @@ impl SafeProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_logs(&self, filter: &Filter) -> Result, SafeProviderError> { + pub async fn get_logs(&self, filter: &Filter) -> Result, RobustProviderError> { info!("eth_getLogs called"); let operation = - async || self.provider.get_logs(filter).await.map_err(SafeProviderError::from); + async || self.provider.get_logs(filter).await.map_err(RobustProviderError::from); let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); @@ -161,12 +161,12 @@ impl SafeProvider { /// after exhausting retries or if the call times out. pub async fn subscribe_blocks( &self, - ) -> Result, SafeProviderError> { + ) -> Result, RobustProviderError> { info!("eth_subscribe called"); let provider = self.provider.clone(); let result = self .retry_with_total_timeout(|| async { - provider.subscribe_blocks().await.map_err(SafeProviderError::from) + provider.subscribe_blocks().await.map_err(RobustProviderError::from) }) .await; if let Err(e) = &result { @@ -189,10 +189,10 @@ impl SafeProvider { async fn retry_with_total_timeout( &self, operation: F, - ) -> Result + ) -> Result where F: Fn() -> Fut, - Fut: Future>, + Fut: Future>, { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) @@ -205,8 +205,8 @@ impl SafeProvider { .await { Ok(Ok(res)) => Ok(res), - Ok(Err(_)) => Err(SafeProviderError::RetryFail(self.max_retries + 1)), - Err(_) => Err(SafeProviderError::Timeout), + Ok(Err(_)) => Err(RobustProviderError::RetryFail(self.max_retries + 1)), + Err(_) => Err(RobustProviderError::Timeout), } } } @@ -222,8 +222,8 @@ mod tests { timeout: u64, max_retries: usize, retry_interval: u64, - ) -> SafeProvider { - SafeProvider { + ) -> RobustProvider { + RobustProvider { provider: RootProvider::new_http("http://localhost:8545".parse().unwrap()), max_timeout: Duration::from_millis(timeout), max_retries, @@ -257,7 +257,7 @@ mod tests { .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); if call_count.load(Ordering::SeqCst) < 3 { - Err(SafeProviderError::RpcError(Arc::new(TransportErrorKind::custom_str( + Err(RobustProviderError::RpcError(Arc::new(TransportErrorKind::custom_str( "temp error", )))) } else { @@ -279,12 +279,12 @@ mod tests { .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); // permanent error - Err::(SafeProviderError::Timeout) + Err::(RobustProviderError::Timeout) }) .await; let err = result.unwrap_err(); - assert!(matches!(err, SafeProviderError::RetryFail(3))); + assert!(matches!(err, RobustProviderError::RetryFail(3))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } @@ -301,6 +301,6 @@ mod tests { .await; let err = result.unwrap_err(); - assert!(matches!(err, SafeProviderError::Timeout)); + assert!(matches!(err, RobustProviderError::Timeout)); } } From f2b6d47f238ce9e5a34716cc9f5e2876b80d27f4 Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 17:13:13 +0900 Subject: [PATCH 35/47] ref: comment --- src/robust_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 1b8aab78..ef3890df 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -28,7 +28,7 @@ impl From> for RobustProviderError { } } -/// Safe provider wrapper with built-in retry and timeout mechanisms. +/// Provider wrapper with built-in retry and timeout mechanisms. /// /// This wrapper around Alloy providers automatically handles retries, /// timeouts, and error logging for RPC calls. From f622b21385b41817f97e8f7e9c031c1bb17b1fad Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 18:42:43 +0900 Subject: [PATCH 36/47] feat: move block not found to provider --- src/block_range_scanner.rs | 45 ++++++++++---------------------------- src/error.rs | 15 ++----------- src/robust_provider.rs | 10 +++++++-- 3 files changed, 22 insertions(+), 48 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index a690a4a2..23642b5f 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -65,8 +65,8 @@ use std::{cmp::Ordering, ops::RangeInclusive, time::Duration}; use tokio::{ - join, sync::{mpsc, oneshot}, + try_join, }; use tokio_stream::{StreamExt, wrappers::ReceiverStream}; @@ -398,10 +398,8 @@ impl Service { self.provider.get_block_by_number(end_height) )?; - let start_block_num = - start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number(); - let end_block_num = - end_block.ok_or_else(|| ScannerError::BlockNotFound(end_height))?.header().number(); + let start_block_num = start_block.header().number(); + let end_block_num = end_block.header().number(); let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) { Ordering::Greater => (end_block_num, start_block_num), @@ -435,12 +433,8 @@ impl Service { self.provider.get_block_by_number(BlockNumberOrTag::Latest) )?; - let start_block_num = - start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number(); - let latest_block = latest_block - .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? - .header() - .number(); + let start_block_num = start_block.header().number(); + let latest_block = latest_block.header().number(); let confirmed_tip_num = latest_block.saturating_sub(block_confirmations); @@ -536,13 +530,10 @@ impl Service { start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, ) -> Result<(), ScannerError> { - let (start_block, end_block) = join!( + let (start_block, end_block) = try_join!( self.provider.get_block_by_number(start_height), self.provider.get_block_by_number(end_height), - ); - - let start_block = start_block?.ok_or(ScannerError::BlockNotFound(start_height))?; - let end_block = end_block?.ok_or(ScannerError::BlockNotFound(end_height))?; + )?; // normalize block range let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) { @@ -606,13 +597,7 @@ impl Service { // restart rewind batch_from = from; // store the updated end block hash - tip_hash = self - .provider - .get_block_by_number(from.into()) - .await? - .expect("Chain should have the same height post-reorg") - .header() - .hash(); + tip_hash = self.provider.get_block_by_number(from.into()).await?.header().hash(); } else { // SAFETY: `batch_to` is always greater than `to`, so `batch_to - 1` is always // a valid unsigned integer @@ -785,11 +770,7 @@ impl Service { async fn get_block_subscription( provider: &RobustProvider, ) -> Result, ScannerError> { - let ws_stream = provider - .subscribe_blocks() - .await - .map_err(|_| ScannerError::WebSocketConnectionFailed(1))?; - + let ws_stream = provider.subscribe_blocks().await?; Ok(ws_stream) } @@ -1656,13 +1637,11 @@ mod tests { let (tx, mut rx) = mpsc::channel(1); service.subscriber = Some(tx); - service - .send_to_subscriber(Message::Error(ScannerError::WebSocketConnectionFailed(4))) - .await; + service.send_to_subscriber(Message::Error(ScannerError::BlockNotFound(4.into()))).await; match rx.recv().await.expect("subscriber should stay open") { - Message::Error(ScannerError::WebSocketConnectionFailed(attempts)) => { - assert_eq!(attempts, 4); + Message::Error(ScannerError::BlockNotFound(attempts)) => { + assert_eq!(attempts, 4.into()); } other => panic!("unexpected message: {other:?}"), } diff --git a/src/error.rs b/src/error.rs index 4ea30a15..d168548a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,7 +3,7 @@ use std::{ops::RangeInclusive, sync::Arc}; use alloy::{ eips::BlockNumberOrTag, primitives::BlockNumber, - transports::{RpcError, TransportErrorKind, http::reqwest}, + transports::{RpcError, TransportErrorKind}, }; use thiserror::Error; @@ -11,9 +11,6 @@ use crate::{block_range_scanner::Message, robust_provider::RobustProviderError}; #[derive(Error, Debug, Clone)] pub enum ScannerError { - #[error("HTTP request failed: {0}")] - HttpError(Arc), - // #[error("WebSocket error: {0}")] // WebSocketError(#[from] tokio_tungstenite::tungstenite::Error), #[error("Serialization error: {0}")] @@ -37,9 +34,6 @@ pub enum ScannerError { #[error("Historical sync failed: {0}")] HistoricalSyncError(String), - #[error("WebSocket connection failed after {0} attempts")] - WebSocketConnectionFailed(usize), - #[error("Block not found, block number: {0}")] BlockNotFound(BlockNumberOrTag), @@ -56,6 +50,7 @@ impl From for ScannerError { RobustProviderError::RpcError(err) => ScannerError::RpcError(err), RobustProviderError::Timeout => ScannerError::Timeout, RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num), + RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block), } } } @@ -69,12 +64,6 @@ impl From, ScannerError>> for Message { } } -impl From for ScannerError { - fn from(error: reqwest::Error) -> Self { - ScannerError::HttpError(Arc::new(error)) - } -} - impl From for ScannerError { fn from(error: serde_json::Error) -> Self { ScannerError::SerializationError(Arc::new(error)) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index ef3890df..ab66d4a7 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -20,6 +20,8 @@ pub enum RobustProviderError { Timeout, #[error("Retry failed after {0} tries")] RetryFail(usize), + #[error("Block not found, block number: {0}")] + BlockNotFound(BlockNumberOrTag), } impl From> for RobustProviderError { @@ -87,7 +89,7 @@ impl RobustProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result, RobustProviderError> { + ) -> Result { info!("eth_getBlockByNumber called"); let operation = async || { self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from) @@ -96,7 +98,11 @@ impl RobustProvider { if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); } - result + + match result? { + Some(block) => Ok(block), + None => Err(RobustProviderError::BlockNotFound(number)), + } } /// Fetch the latest block number with retry and timeout. From 058a3edcb5934b1cb2a04167a90ba25229272e8e Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 18:58:06 +0900 Subject: [PATCH 37/47] fix: doc test --- src/block_range_scanner.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 23642b5f..517880cc 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -41,11 +41,6 @@ //! error!("Received error from subscription: {e}"); //! match e { //! ScannerError::ServiceShutdown => break, -//! ScannerError::WebSocketConnectionFailed(_) => { -//! error!( -//! "WebSocket connection failed, continuing to listen for reconnection" -//! ); -//! } //! _ => { //! error!("Non-fatal error, continuing: {e}"); //! } From 14ed0e4dda0f7a9c378968255f661848e437f4aa Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 19:05:34 +0900 Subject: [PATCH 38/47] ref: use matches --- src/block_range_scanner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 517880cc..db0d7f59 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -1635,8 +1635,8 @@ mod tests { service.send_to_subscriber(Message::Error(ScannerError::BlockNotFound(4.into()))).await; match rx.recv().await.expect("subscriber should stay open") { - Message::Error(ScannerError::BlockNotFound(attempts)) => { - assert_eq!(attempts, 4.into()); + Message::Error(err) => { + assert!(matches!(err, ScannerError::BlockNotFound(BlockNumberOrTag::Number(4)))); } other => panic!("unexpected message: {other:?}"), } From a531b7dd25af51bbe4dd56e9ec07122d7b98358d Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 19:13:30 +0900 Subject: [PATCH 39/47] ref: refactor ok or else --- src/robust_provider.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index ab66d4a7..2cf05eec 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -99,10 +99,7 @@ impl RobustProvider { error!(error = %e, "eth_getByBlockNumber failed"); } - match result? { - Some(block) => Ok(block), - None => Err(RobustProviderError::BlockNotFound(number)), - } + result?.ok_or_else(|| RobustProviderError::BlockNotFound(number)) } /// Fetch the latest block number with retry and timeout. From 435dee3fcef737a20a037185b55922a1fea5e716 Mon Sep 17 00:00:00 2001 From: Nenad Date: Tue, 28 Oct 2025 11:14:20 +0100 Subject: [PATCH 40/47] fix: Retry updates (#141) Co-authored-by: Leo --- src/error.rs | 9 ++-- src/event_scanner/message.rs | 2 +- src/event_scanner/modes/common.rs | 2 +- src/robust_provider.rs | 83 ++++++++++++------------------- 4 files changed, 37 insertions(+), 59 deletions(-) diff --git a/src/error.rs b/src/error.rs index d168548a..cd4b4926 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use alloy::{ }; use thiserror::Error; -use crate::{block_range_scanner::Message, robust_provider::RobustProviderError}; +use crate::{block_range_scanner::Message, robust_provider::Error as RobustProviderError}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -40,16 +40,15 @@ pub enum ScannerError { #[error("Operation timed out")] Timeout, - #[error("Retry failed after {0} tries")] - RetryFail(usize), + #[error("RPC call failed after exhausting all retry attempts: {0}")] + RetryFailure(Arc>), } impl From for ScannerError { fn from(error: RobustProviderError) -> ScannerError { match error { - RobustProviderError::RpcError(err) => ScannerError::RpcError(err), RobustProviderError::Timeout => ScannerError::Timeout, - RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num), + RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err), RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block), } } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 33da6c61..ebd1081a 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, robust_provider::RobustProviderError}; +use crate::{ScannerError, ScannerMessage, robust_provider::Error as RobustProviderError}; pub type Message = ScannerMessage, ScannerError>; diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index 59748dce..ef93e17e 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, - robust_provider::{RobustProvider, RobustProviderError}, + robust_provider::{Error as RobustProviderError, RobustProvider}, }; use alloy::{ network::Network, diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 2cf05eec..872c7ba6 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -13,20 +13,18 @@ use thiserror::Error; use tracing::{error, info}; #[derive(Error, Debug, Clone)] -pub enum RobustProviderError { - #[error("RPC error: {0}")] - RpcError(Arc>), +pub enum Error { #[error("Operation timed out")] Timeout, - #[error("Retry failed after {0} tries")] - RetryFail(usize), + #[error("RPC call failed after exhausting all retry attempts: {0}")] + RetryFailure(Arc>), #[error("Block not found, block number: {0}")] BlockNotFound(BlockNumberOrTag), } -impl From> for RobustProviderError { +impl From> for Error { fn from(err: RpcError) -> Self { - RobustProviderError::RpcError(Arc::new(err)) + Error::RetryFailure(Arc::new(err)) } } @@ -89,11 +87,9 @@ impl RobustProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result { + ) -> Result { info!("eth_getBlockByNumber called"); - let operation = async || { - self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from) - }; + let operation = async || self.provider.get_block_by_number(number).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); @@ -108,10 +104,9 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_block_number(&self) -> Result { + pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); - let operation = - async || self.provider.get_block_number().await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_block_number().await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockNumber failed"); @@ -128,10 +123,9 @@ impl RobustProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, RobustProviderError> { + ) -> Result, Error> { info!("eth_getBlockByHash called"); - let operation = - async || self.provider.get_block_by_hash(hash).await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_block_by_hash(hash).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); @@ -145,10 +139,9 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_logs(&self, filter: &Filter) -> Result, RobustProviderError> { + pub async fn get_logs(&self, filter: &Filter) -> Result, Error> { info!("eth_getLogs called"); - let operation = - async || self.provider.get_logs(filter).await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_logs(filter).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); @@ -162,16 +155,10 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn subscribe_blocks( - &self, - ) -> Result, RobustProviderError> { + pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); - let provider = self.provider.clone(); - let result = self - .retry_with_total_timeout(|| async { - provider.subscribe_blocks().await.map_err(RobustProviderError::from) - }) - .await; + let operation = async || self.provider.subscribe_blocks().await; + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_subscribe failed"); } @@ -189,13 +176,10 @@ impl RobustProvider { /// - Returns [`RpcError`] with message "total operation timeout exceeded" /// if the overall timeout elapses. /// - Propagates any [`RpcError`] from the underlying retries. - async fn retry_with_total_timeout( - &self, - operation: F, - ) -> Result + async fn retry_with_total_timeout(&self, operation: F) -> Result where F: Fn() -> Fut, - Fut: Future>, + Fut: Future>>, { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) @@ -207,9 +191,8 @@ impl RobustProvider { ) .await { - Ok(Ok(res)) => Ok(res), - Ok(Err(_)) => Err(RobustProviderError::RetryFail(self.max_retries + 1)), - Err(_) => Err(RobustProviderError::Timeout), + Ok(res) => res.map_err(Error::from), + Err(_) => Err(Error::Timeout), } } } @@ -243,7 +226,8 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - Ok(call_count.load(Ordering::SeqCst)) + let count = call_count.load(Ordering::SeqCst); + Ok(count) }) .await; @@ -259,12 +243,10 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - if call_count.load(Ordering::SeqCst) < 3 { - Err(RobustProviderError::RpcError(Arc::new(TransportErrorKind::custom_str( - "temp error", - )))) - } else { - Ok(call_count.load(Ordering::SeqCst)) + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + _ => Err(TransportErrorKind::BackendGone.into()), } }) .await; @@ -278,21 +260,19 @@ mod tests { let call_count = AtomicUsize::new(0); - let result = provider + let result: Result<(), Error> = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - // permanent error - Err::(RobustProviderError::Timeout) + Err(TransportErrorKind::BackendGone.into()) }) .await; - let err = result.unwrap_err(); - assert!(matches!(err, RobustProviderError::RetryFail(3))); + assert!(matches!(result, Err(Error::RetryFailure(_)))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } #[tokio::test] - async fn test_retry_with_timeout_respects_total_delay() { + async fn test_retry_with_timeout_respects_max_timeout() { let max_timeout = 50; let provider = test_provider(max_timeout, 10, 1); @@ -303,7 +283,6 @@ mod tests { }) .await; - let err = result.unwrap_err(); - assert!(matches!(err, RobustProviderError::Timeout)); + assert!(matches!(result, Err(Error::Timeout))); } } From 617629e6194655ccac6da54eff44e16735a8301a Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 19:15:58 +0900 Subject: [PATCH 41/47] fix: rename error --- src/robust_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 872c7ba6..3e90f4c4 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -95,7 +95,7 @@ impl RobustProvider { error!(error = %e, "eth_getByBlockNumber failed"); } - result?.ok_or_else(|| RobustProviderError::BlockNotFound(number)) + result?.ok_or_else(|| Error::BlockNotFound(number)) } /// Fetch the latest block number with retry and timeout. From f2b2281a88c032b2c79c4e092177c542e5ef96a7 Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 20:49:12 +0900 Subject: [PATCH 42/47] feat: unwrap block in robust provider --- src/block_range_scanner.rs | 17 +++++++++-------- src/error.rs | 6 +++--- src/robust_provider.rs | 13 +++++++------ 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index db0d7f59..3a8a1e53 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -606,12 +606,7 @@ impl Service { } async fn reorg_detected(&self, hash_to_check: B256) -> Result { - Ok(self - .provider - .get_block_by_hash(hash_to_check) - .await - .map_err(ScannerError::from)? - .is_none()) + Ok(self.provider.get_block_by_hash(hash_to_check).await.is_err()) } async fn stream_historical_blocks( @@ -978,7 +973,10 @@ impl BlockRangeScannerClient { #[cfg(test)] mod tests { - use alloy::providers::{Provider, RootProvider}; + use alloy::{ + eips::BlockId, + providers::{Provider, RootProvider}, + }; use std::time::Duration; use tokio::time::timeout; @@ -1636,7 +1634,10 @@ mod tests { match rx.recv().await.expect("subscriber should stay open") { Message::Error(err) => { - assert!(matches!(err, ScannerError::BlockNotFound(BlockNumberOrTag::Number(4)))); + assert!(matches!( + err, + ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4))) + )); } other => panic!("unexpected message: {other:?}"), } diff --git a/src/error.rs b/src/error.rs index cd4b4926..d6465a3f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,7 @@ use std::{ops::RangeInclusive, sync::Arc}; use alloy::{ - eips::BlockNumberOrTag, + eips::BlockId, primitives::BlockNumber, transports::{RpcError, TransportErrorKind}, }; @@ -34,8 +34,8 @@ pub enum ScannerError { #[error("Historical sync failed: {0}")] HistoricalSyncError(String), - #[error("Block not found, block number: {0}")] - BlockNotFound(BlockNumberOrTag), + #[error("Block not found, Block Id: {0}")] + BlockNotFound(BlockId), #[error("Operation timed out")] Timeout, diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 3e90f4c4..f66158d4 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -1,7 +1,7 @@ use std::{future::Future, sync::Arc, time::Duration}; use alloy::{ - eips::BlockNumberOrTag, + eips::{BlockId, BlockNumberOrTag}, network::Network, providers::{Provider, RootProvider}, pubsub::Subscription, @@ -18,8 +18,8 @@ pub enum Error { Timeout, #[error("RPC call failed after exhausting all retry attempts: {0}")] RetryFailure(Arc>), - #[error("Block not found, block number: {0}")] - BlockNotFound(BlockNumberOrTag), + #[error("Block not found, Block Id: {0}")] + BlockNotFound(BlockId), } impl From> for Error { @@ -95,7 +95,7 @@ impl RobustProvider { error!(error = %e, "eth_getByBlockNumber failed"); } - result?.ok_or_else(|| Error::BlockNotFound(number)) + result?.ok_or_else(|| Error::BlockNotFound(number.into())) } /// Fetch the latest block number with retry and timeout. @@ -123,14 +123,15 @@ impl RobustProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, Error> { + ) -> Result { info!("eth_getBlockByHash called"); let operation = async || self.provider.get_block_by_hash(hash).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); } - result + + result?.ok_or_else(|| Error::BlockNotFound(hash.into())) } /// Fetch logs for the given filter with retry and timeout. From c9f140dcf92e16e55c2c2bc0dfc9aa17665894b8 Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 22:00:57 +0900 Subject: [PATCH 43/47] fix: more merge errors --- src/block_range_scanner.rs | 55 +++++++++++++++++-------------- src/error.rs | 12 +++---- src/event_scanner/modes/common.rs | 1 + 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 95597066..f106458a 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -68,6 +68,10 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerMessage, error::ScannerError, + robust_provider::{ + DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, + Error as RobustProviderError, RobustProvider, + }, types::{ScannerStatus, TryStream}, }; use alloy::{ @@ -108,6 +112,12 @@ impl PartialEq> for Message { } } +impl From for Message { + fn from(error: RobustProviderError) -> Self { + Message::Error(error.into()) + } +} + impl From> for Message { fn from(error: RpcError) -> Self { Message::Error(error.into()) @@ -513,7 +523,7 @@ impl Service { let max_block_range = self.max_block_range; let provider = self.provider.clone(); - let (start_block, end_block) = join!( + let (start_block, end_block) = try_join!( self.provider.get_block_by_number(start_height), self.provider.get_block_by_number(end_height), )?; @@ -543,7 +553,7 @@ impl Service { to: N::BlockResponse, max_block_range: u64, sender: &mpsc::Sender, - provider: &RootProvider, + provider: &RobustProvider, ) { let mut batch_count = 0; @@ -595,13 +605,11 @@ impl Service { batch_from = from; // store the updated end block hash tip_hash = match provider.get_block_by_number(from.into()).await { - Ok(block) => block - .unwrap_or_else(|| { - panic!("Block with number '{from}' should exist post-reorg") - }) - .header() - .hash(), + Ok(block) => block.header().hash(), Err(e) => { + if matches!(e, RobustProviderError::BlockNotFound(_)) { + panic!("Block with number '{from}' should exist post-reorg"); + } error!(error = %e, "Terminal RPC call error, shutting down"); _ = sender.try_stream(e); return; @@ -762,10 +770,10 @@ impl Service { } async fn reorg_detected( - provider: &RootProvider, + provider: &RobustProvider, hash_to_check: B256, ) -> Result> { - Ok(provider.get_block_by_hash(hash_to_check).await?.is_none()) + Ok(provider.get_block_by_hash(hash_to_check).await.is_err()) } pub struct BlockRangeScannerClient { @@ -914,6 +922,7 @@ mod tests { use super::*; use crate::{assert_closed, assert_empty, assert_next}; use alloy::{ + eips::BlockId, network::Ethereum, providers::{ProviderBuilder, ext::AnvilApi}, rpc::types::anvil::ReorgOptions, @@ -1366,21 +1375,16 @@ mod tests { #[tokio::test] async fn try_send_forwards_errors_to_subscribers() { - let (tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::channel::(1); - service.send_to_subscriber(Message::Error(ScannerError::BlockNotFound(4.into()))).await; + _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await; - match rx.recv().await.expect("subscriber should stay open") { - Message::Error(err) => { - assert!(matches!( - err, - ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4))) - )); - } - other => panic!("unexpected message: {other:?}"), - } - - Ok(()) + assert!(matches!( + rx.recv().await, + Some(ScannerMessage::Error(ScannerError::BlockNotFound(BlockId::Number( + BlockNumberOrTag::Number(4) + )))) + )); } #[tokio::test] @@ -1575,7 +1579,10 @@ mod tests { let stream = client.rewind(0, 999).await; - assert!(matches!(stream, Err(ScannerError::BlockNotFound(BlockNumberOrTag::Number(999))))); + assert!(matches!( + stream, + Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(999)))) + )); Ok(()) } diff --git a/src/error.rs b/src/error.rs index d64a524e..cc67dc13 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use alloy::{ - eips::BlockNumberOrTag, - transports::{RpcError, TransportErrorKind, http::reqwest}, + eips::BlockId, + transports::{RpcError, TransportErrorKind}, }; use thiserror::Error; +use crate::robust_provider::Error as RobustProviderError; + #[derive(Error, Debug, Clone)] pub enum ScannerError { // #[error("WebSocket error: {0}")] @@ -48,12 +50,6 @@ impl From for ScannerError { } } -impl From for ScannerError { - fn from(error: reqwest::Error) -> Self { - ScannerError::HttpError(Arc::new(error)) - } -} - impl From for ScannerError { fn from(error: serde_json::Error) -> Self { ScannerError::SerializationError(Arc::new(error)) diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index 4757c54f..205c2f9f 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,6 +3,7 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener}, + robust_provider::{Error as RobustProviderError, RobustProvider}, types::TryStream, }; use alloy::{ From 8eff737e23d1c45b2db4188b85728c923b16120e Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 23:08:50 +0900 Subject: [PATCH 44/47] Update src/block_range_scanner.rs Co-authored-by: Nenad --- src/block_range_scanner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index f106458a..8ed72d04 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -606,10 +606,10 @@ impl Service { // store the updated end block hash tip_hash = match provider.get_block_by_number(from.into()).await { Ok(block) => block.header().hash(), + Err(RobustProviderError::BlockNotFound(_) => { + panic!("Block with number '{from}' should exist post-reorg"); + } Err(e) => { - if matches!(e, RobustProviderError::BlockNotFound(_)) { - panic!("Block with number '{from}' should exist post-reorg"); - } error!(error = %e, "Terminal RPC call error, shutting down"); _ = sender.try_stream(e); return; From b34dbbe1b9536df916f74978bff3fa1a0dac0b2a Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 23:11:52 +0900 Subject: [PATCH 45/47] fix: brackets --- src/block_range_scanner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 8ed72d04..81b1e987 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -606,7 +606,7 @@ impl Service { // store the updated end block hash tip_hash = match provider.get_block_by_number(from.into()).await { Ok(block) => block.header().hash(), - Err(RobustProviderError::BlockNotFound(_) => { + Err(RobustProviderError::BlockNotFound(_)) => { panic!("Block with number '{from}' should exist post-reorg"); } Err(e) => { From 82878f6d59d6e450d374b0bab43ecbdb5e982903 Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 28 Oct 2025 23:18:53 +0900 Subject: [PATCH 46/47] fix: only return true on reorg detected if err is block not found --- src/block_range_scanner.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 81b1e987..44f25db1 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -772,8 +772,12 @@ impl Service { async fn reorg_detected( provider: &RobustProvider, hash_to_check: B256, -) -> Result> { - Ok(provider.get_block_by_hash(hash_to_check).await.is_err()) +) -> Result { + match provider.get_block_by_hash(hash_to_check).await { + Ok(_) => Ok(false), + Err(RobustProviderError::BlockNotFound(_)) => Ok(true), + Err(e) => Err(e.into()), + } } pub struct BlockRangeScannerClient { From 63cefb9db3a9ac37504a3f7120527c66f89397d6 Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 29 Oct 2025 23:26:23 +0900 Subject: [PATCH 47/47] feat: merge changes --- Cargo.lock | 24 +++++++++++++++++++ Cargo.toml | 1 + src/block_range_scanner.rs | 15 +++--------- src/event_scanner/scanner/common.rs | 2 +- src/event_scanner/scanner/sync/from_latest.rs | 9 ++----- 5 files changed, 31 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e7f81ec..346f80e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1084,6 +1084,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1667,6 +1678,7 @@ dependencies = [ "alloy-node-bindings", "anyhow", "async-trait", + "backon", "chrono", "serde", "serde_json", @@ -1917,6 +1929,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" diff --git a/Cargo.toml b/Cargo.toml index 388fc462..71f1bf85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ chrono.workspace = true alloy-node-bindings.workspace = true tokio-stream.workspace = true tracing.workspace = true +backon.workspace = true [dev-dependencies] tracing-subscriber.workspace = true diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index dd4c7a99..e1064152 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -424,23 +424,14 @@ impl Service { let get_start_block = async || -> Result { let block = match start_height { BlockNumberOrTag::Number(num) => num, - block_tag => provider - .get_block_by_number(block_tag) - .await? - .ok_or_else(|| ScannerError::BlockNotFound(block_tag))? - .header() - .number(), + block_tag => provider.get_block_by_number(block_tag).await?.header().number(), }; Ok(block) }; let get_latest_block = async || -> Result { - let block = provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? - .header() - .number(); + let block = + provider.get_block_by_number(BlockNumberOrTag::Latest).await?.header().number(); Ok(block) }; diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 5dd10a65..6bae3d9f 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -47,7 +47,7 @@ pub enum ConsumerMode { /// Assumes it is running in a separate tokio task, so as to be non-blocking. pub async fn handle_stream + Unpin>( mut stream: S, - provider: &RootProvider, + provider: &RobustProvider, listeners: &[EventListener], mode: ConsumerMode, ) { diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 91626c18..e266b699 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -2,7 +2,6 @@ use alloy::{ consensus::BlockHeader, eips::BlockNumberOrTag, network::{BlockResponse, Network}, - providers::Provider, }; use tokio::sync::mpsc; @@ -57,12 +56,8 @@ impl EventScanner { // This is used to determine the starting point for the rewind stream and the live // stream. We do this before starting the streams to avoid a race condition // where the latest block changes while we're setting up the streams. - let latest_block = provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .ok_or(ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? - .header() - .number(); + let latest_block = + provider.get_block_by_number(BlockNumberOrTag::Latest).await?.header().number(); // Setup rewind and live streams to run in parallel. let rewind_stream = client.rewind(BlockNumberOrTag::Earliest, latest_block).await?;