Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5f92f5c
feat: add backon library
LeoPatOZ Oct 16, 2025
2ad6516
feat: add provider wrapper
LeoPatOZ Oct 16, 2025
41824f0
Merge branch 'main' into retry-logic
LeoPatOZ Oct 16, 2025
22c2448
feat: use internal
LeoPatOZ Oct 16, 2025
d738ab2
feat: add get logs to safe provider
LeoPatOZ Oct 16, 2025
d068c8e
chore: made var public
LeoPatOZ Oct 16, 2025
d1e12e5
feat: added safe provider conifgs to block range scanner
LeoPatOZ Oct 16, 2025
0e9aa3b
feat: use safe provider in event scanner
LeoPatOZ Oct 16, 2025
319940a
feat: undo safe provider errors
LeoPatOZ Oct 16, 2025
65527b8
feat: implement stream block
LeoPatOZ Oct 20, 2025
b5bf428
test: add basic testing to safe provider
LeoPatOZ Oct 20, 2025
6241e83
chore: delete other constants
LeoPatOZ Oct 20, 2025
e24a6f7
Merge branch 'main' into retry-logic
LeoPatOZ Oct 20, 2025
79eaaae
fix: fmt
LeoPatOZ Oct 20, 2025
112c860
feat: add logging to rpc calls
LeoPatOZ Oct 21, 2025
9fc6543
chore: add comments and rename timeout
LeoPatOZ Oct 21, 2025
8bd13b1
chore: doctest
LeoPatOZ Oct 21, 2025
af7ce01
feat: add total timeout
LeoPatOZ Oct 21, 2025
c1ff8b5
Merge branch 'main' into retry-logic
LeoPatOZ Oct 23, 2025
471f767
ref: imporving tracing message
LeoPatOZ Oct 23, 2025
855e167
ref: collapse timeout fn to one
LeoPatOZ Oct 23, 2025
ff40e8a
ref: better syntax
LeoPatOZ Oct 23, 2025
992853c
ref: remove with and address default nit
LeoPatOZ Oct 23, 2025
ab35dc5
fix: doctest
LeoPatOZ Oct 23, 2025
cdf9b95
Update src/safe_provider.rs
LeoPatOZ Oct 23, 2025
3c0a971
Merge remote-tracking branch 'refs/remotes/origin/retry-logic' into r…
LeoPatOZ Oct 23, 2025
74cd3d7
ref: use atomic usize
LeoPatOZ Oct 23, 2025
41cc733
ref: update test to match for error
LeoPatOZ Oct 23, 2025
4af01ab
ref: update doc
LeoPatOZ Oct 23, 2025
3e32b8e
Merge branch 'main' into retry-logic
LeoPatOZ Oct 23, 2025
3329c38
fix: merge errors with connect methods
LeoPatOZ Oct 23, 2025
586b03e
fix: root --> safe provider
LeoPatOZ Oct 23, 2025
8a080e4
ref: tracing update
LeoPatOZ Oct 23, 2025
982ea96
ref: remove doc
LeoPatOZ Oct 23, 2025
49f06bc
ref: avoid clone provider when possible
LeoPatOZ Oct 27, 2025
0546feb
ref: update test
LeoPatOZ Oct 27, 2025
01a9cf1
feat: add custom error for safe provider
LeoPatOZ Oct 27, 2025
a82b12f
ref: remove moves
LeoPatOZ Oct 27, 2025
6c995a2
Merge branch 'main' into retry-logic
LeoPatOZ Oct 27, 2025
1ee07ca
ref: rename safe to robust provider
LeoPatOZ Oct 28, 2025
f2b6d47
ref: comment
LeoPatOZ Oct 28, 2025
f622b21
feat: move block not found to provider
LeoPatOZ Oct 28, 2025
058a3ed
fix: doc test
LeoPatOZ Oct 28, 2025
14ed0e4
ref: use matches
LeoPatOZ Oct 28, 2025
a531b7d
ref: refactor ok or else
LeoPatOZ Oct 28, 2025
435dee3
fix: Retry updates (#141)
0xNeshi Oct 28, 2025
617629e
fix: rename error
LeoPatOZ Oct 28, 2025
f2b2281
feat: unwrap block in robust provider
LeoPatOZ Oct 28, 2025
ec32c31
Merge branch 'main' into retry-logic
LeoPatOZ Oct 28, 2025
c9f140d
fix: more merge errors
LeoPatOZ Oct 28, 2025
8eff737
Update src/block_range_scanner.rs
LeoPatOZ Oct 28, 2025
b34dbbe
fix: brackets
LeoPatOZ Oct 28, 2025
82878f6
fix: only return true on reorg detected if err is block not found
LeoPatOZ Oct 28, 2025
ac3d47f
Merge branch 'main' into retry-logic
LeoPatOZ Oct 29, 2025
63cefb9
feat: merge changes
LeoPatOZ Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tokio-stream = "0.1.17"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
hex = "0.4"
backon = "1.5.2"

[package]
name = "event-scanner"
Expand Down Expand Up @@ -65,6 +66,7 @@ alloy-node-bindings.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
backon.workspace = true

[lints]
workspace = true
Expand Down
84 changes: 61 additions & 23 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,24 @@
//! }
//! ```

use std::{cmp::Ordering, ops::RangeInclusive, sync::Arc};
use std::{cmp::Ordering, ops::RangeInclusive, sync::Arc, time::Duration};

use tokio::{
join,
sync::{mpsc, oneshot},
};
use tokio_stream::{StreamExt, wrappers::ReceiverStream};

use crate::types::{ScannerMessage, ScannerStatus};
use crate::{
safe_provider::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL, DEFAULT_TIMEOUT, SafeProvider},
types::{ScannerMessage, ScannerStatus},
};
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::{B256, BlockNumber},
providers::{Provider, RootProvider},
providers::RootProvider,
pubsub::Subscription,
rpc::client::ClientBuilder,
transports::{
Expand All @@ -101,9 +104,10 @@ pub const MAX_BUFFERED_MESSAGES: usize = 50000;
// is considered final)
pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;

// // State sync aware retry settings
// const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
// const STATE_SYNC_MAX_RETRIES: u64 = 12;
// RPC retry and timeout settings
pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_RPC_MAX_RETRIES: usize = 5;
pub const DEFAULT_RPC_RETRY_INTERVAL: Duration = Duration::from_secs(1);

pub type BlockRangeMessage = ScannerMessage<RangeInclusive<BlockNumber>, BlockRangeScannerError>;

Expand Down Expand Up @@ -138,8 +142,11 @@ pub enum BlockRangeScannerError {
#[error("Serialization error: {0}")]
SerializationError(Arc<serde_json::Error>),

#[error("RPC error: {0}")]
RpcError(Arc<RpcError<TransportErrorKind>>),
#[error("Provider error: {0}")]
Provider(Arc<RpcError<TransportErrorKind>>),

#[error("Block not found, block number: {0}")]
BlockNotFound(BlockNumberOrTag),

#[error("Channel send error")]
ChannelError,
Expand All @@ -158,9 +165,6 @@ pub enum BlockRangeScannerError {

#[error("WebSocket connection failed after {0} attempts")]
WebSocketConnectionFailed(usize),

#[error("Block not found, block number: {0}")]
BlockNotFound(BlockNumberOrTag),
}

impl From<reqwest::Error> for BlockRangeScannerError {
Expand All @@ -177,7 +181,7 @@ impl From<serde_json::Error> for BlockRangeScannerError {

impl From<RpcError<TransportErrorKind>> for BlockRangeScannerError {
fn from(error: RpcError<TransportErrorKind>) -> Self {
BlockRangeScannerError::RpcError(Arc::new(error))
BlockRangeScannerError::Provider(Arc::new(error))
}
}

Expand Down Expand Up @@ -233,6 +237,9 @@ pub struct BlockRangeScanner {
blocks_read_per_epoch: usize,
max_reorg_depth: u64,
block_confirmations: u64,
timeout: Duration,
max_retries: usize,
retry_interval: Duration,
}

impl Default for BlockRangeScanner {
Expand All @@ -248,6 +255,9 @@ impl BlockRangeScanner {
blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH,
max_reorg_depth: DEFAULT_REORG_REWIND_DEPTH,
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
timeout: DEFAULT_TIMEOUT,
max_retries: DEFAULT_MAX_RETRIES,
retry_interval: DEFAULT_RETRY_INTERVAL,
}
}

Expand All @@ -269,6 +279,24 @@ impl BlockRangeScanner {
self
}

#[must_use]
pub fn with_timeout(mut self, rpc_timeout: Duration) -> Self {
self.timeout = rpc_timeout;
self
}

#[must_use]
pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self {
self.max_retries = rpc_max_retries;
self
}

#[must_use]
pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self {
self.retry_interval = rpc_retry_interval;
self
}

/// Connects to the provider via WebSocket
///
/// # Errors
Expand Down Expand Up @@ -305,8 +333,13 @@ impl BlockRangeScanner {
self,
provider: RootProvider<N>,
) -> TransportResult<ConnectedBlockRangeScanner<N>> {
let safe_provider = SafeProvider::new(provider)
.with_timeout(self.timeout)
.with_max_retries(self.max_retries)
.with_retry_interval(self.retry_interval);

Ok(ConnectedBlockRangeScanner {
provider,
provider: safe_provider,
config: Config {
blocks_read_per_epoch: self.blocks_read_per_epoch,
reorg_rewind_depth: self.max_reorg_depth,
Expand All @@ -317,14 +350,14 @@ impl BlockRangeScanner {
}

pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RootProvider<N>,
provider: SafeProvider<N>,
config: Config,
}

impl<N: Network> ConnectedBlockRangeScanner<N> {
/// Returns the underlying Provider.
/// Returns the `SafeProvider`
#[must_use]
pub fn provider(&self) -> &RootProvider<N> {
pub fn provider(&self) -> &SafeProvider<N> {
&self.provider
}

Expand All @@ -344,7 +377,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {

struct Service<N: Network> {
config: Config,
provider: RootProvider<N>,
provider: SafeProvider<N>,
subscriber: Option<mpsc::Sender<BlockRangeMessage>>,
websocket_connected: bool,
processed_count: u64,
Expand All @@ -354,7 +387,7 @@ struct Service<N: Network> {
}

impl<N: Network> Service<N> {
pub fn new(config: Config, provider: RootProvider<N>) -> (Self, mpsc::Sender<Command>) {
pub fn new(config: Config, provider: SafeProvider<N>) -> (Self, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);

let service = Self {
Expand Down Expand Up @@ -749,9 +782,9 @@ impl<N: Network> Service<N> {
Ok(())
}

async fn stream_live_blocks<P: Provider<N>>(
async fn stream_live_blocks(
mut range_start: BlockNumber,
provider: P,
provider: SafeProvider<N>,
sender: mpsc::Sender<BlockRangeMessage>,
block_confirmations: u64,
) {
Expand Down Expand Up @@ -857,7 +890,7 @@ impl<N: Network> Service<N> {
}

async fn get_block_subscription(
provider: &impl Provider<N>,
provider: &SafeProvider<N>,
) -> Result<Subscription<N::HeaderResponse>, BlockRangeScannerError> {
let ws_stream = provider
.subscribe_blocks()
Expand Down Expand Up @@ -1085,6 +1118,7 @@ impl BlockRangeScannerClient {
#[cfg(test)]
mod tests {

use alloy::providers::Provider;
use std::time::Duration;
use tokio::time::timeout;

Expand All @@ -1104,8 +1138,12 @@ mod tests {
Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 }
}

fn mocked_provider(asserter: Asserter) -> RootProvider<Ethereum> {
RootProvider::new(RpcClient::mocked(asserter))
fn mocked_provider(asserter: Asserter) -> SafeProvider<Ethereum> {
let root_provider = RootProvider::new(RpcClient::mocked(asserter));
SafeProvider::new(root_provider)
.with_timeout(DEFAULT_RPC_TIMEOUT)
.with_max_retries(DEFAULT_RPC_MAX_RETRIES)
.with_retry_interval(DEFAULT_RPC_RETRY_INTERVAL)
}

#[test]
Expand Down
15 changes: 11 additions & 4 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use crate::{
},
event_filter::EventFilter,
event_listener::EventListener,
safe_provider::SafeProvider,
types::ScannerMessage,
};
use alloy::{
eips::BlockNumberOrTag,
network::Network,
providers::{Provider, RootProvider},
providers::RootProvider,
rpc::types::{Filter, Log},
sol_types::SolEvent,
transports::{RpcError, TransportErrorKind, http::reqwest::Url},
Expand Down Expand Up @@ -100,6 +101,12 @@ impl From<BlockRangeScannerError> for EventScannerMessage {
}
}

impl From<EventScannerError> for EventScannerMessage {
fn from(e: EventScannerError) -> Self {
EventScannerMessage::Error(e)
}
}

impl Default for EventScanner {
fn default() -> Self {
Self::new()
Expand Down Expand Up @@ -376,8 +383,8 @@ impl<N: Network> ConnectedEventScanner<N> {
range: RangeInclusive<u64>,
event_filter: &EventFilter,
log_filter: &Filter,
provider: &RootProvider<N>,
) -> Result<Vec<Log>, RpcError<TransportErrorKind>> {
provider: &SafeProvider<N>,
) -> Result<Vec<Log>, EventScannerError> {
let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end());

match provider.get_logs(&log_filter).await {
Expand All @@ -403,7 +410,7 @@ impl<N: Network> ConnectedEventScanner<N> {
"failed to get logs for block range"
);

Err(e)
Err(e.into())
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod block_range_scanner;
pub mod event_filter;
pub mod event_listener;
pub mod event_scanner;
pub mod safe_provider;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
pub mod types;
Expand All @@ -12,3 +13,4 @@ pub use block_range_scanner::{
};
pub use event_filter::EventFilter;
pub use event_scanner::{EventScanner, EventScannerError, EventScannerMessage};
pub use safe_provider::SafeProvider;
Loading