Skip to content
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5f92f5c
feat: add backon library
LeoPatOZ Oct 16, 2025
2ad6516
feat: add provider wrapper
LeoPatOZ Oct 16, 2025
41824f0
Merge branch 'main' into retry-logic
LeoPatOZ Oct 16, 2025
22c2448
feat: use internal
LeoPatOZ Oct 16, 2025
d738ab2
feat: add get logs to safe provider
LeoPatOZ Oct 16, 2025
d068c8e
chore: made var public
LeoPatOZ Oct 16, 2025
d1e12e5
feat: added safe provider conifgs to block range scanner
LeoPatOZ Oct 16, 2025
0e9aa3b
feat: use safe provider in event scanner
LeoPatOZ Oct 16, 2025
319940a
feat: undo safe provider errors
LeoPatOZ Oct 16, 2025
65527b8
feat: implement stream block
LeoPatOZ Oct 20, 2025
b5bf428
test: add basic testing to safe provider
LeoPatOZ Oct 20, 2025
6241e83
chore: delete other constants
LeoPatOZ Oct 20, 2025
e24a6f7
Merge branch 'main' into retry-logic
LeoPatOZ Oct 20, 2025
79eaaae
fix: fmt
LeoPatOZ Oct 20, 2025
112c860
feat: add logging to rpc calls
LeoPatOZ Oct 21, 2025
9fc6543
chore: add comments and rename timeout
LeoPatOZ Oct 21, 2025
8bd13b1
chore: doctest
LeoPatOZ Oct 21, 2025
af7ce01
feat: add total timeout
LeoPatOZ Oct 21, 2025
c1ff8b5
Merge branch 'main' into retry-logic
LeoPatOZ Oct 23, 2025
471f767
ref: imporving tracing message
LeoPatOZ Oct 23, 2025
855e167
ref: collapse timeout fn to one
LeoPatOZ Oct 23, 2025
ff40e8a
ref: better syntax
LeoPatOZ Oct 23, 2025
992853c
ref: remove with and address default nit
LeoPatOZ Oct 23, 2025
ab35dc5
fix: doctest
LeoPatOZ Oct 23, 2025
cdf9b95
Update src/safe_provider.rs
LeoPatOZ Oct 23, 2025
3c0a971
Merge remote-tracking branch 'refs/remotes/origin/retry-logic' into r…
LeoPatOZ Oct 23, 2025
74cd3d7
ref: use atomic usize
LeoPatOZ Oct 23, 2025
41cc733
ref: update test to match for error
LeoPatOZ Oct 23, 2025
4af01ab
ref: update doc
LeoPatOZ Oct 23, 2025
3e32b8e
Merge branch 'main' into retry-logic
LeoPatOZ Oct 23, 2025
3329c38
fix: merge errors with connect methods
LeoPatOZ Oct 23, 2025
586b03e
fix: root --> safe provider
LeoPatOZ Oct 23, 2025
8a080e4
ref: tracing update
LeoPatOZ Oct 23, 2025
982ea96
ref: remove doc
LeoPatOZ Oct 23, 2025
49f06bc
ref: avoid clone provider when possible
LeoPatOZ Oct 27, 2025
0546feb
ref: update test
LeoPatOZ Oct 27, 2025
01a9cf1
feat: add custom error for safe provider
LeoPatOZ Oct 27, 2025
a82b12f
ref: remove moves
LeoPatOZ Oct 27, 2025
6c995a2
Merge branch 'main' into retry-logic
LeoPatOZ Oct 27, 2025
1ee07ca
ref: rename safe to robust provider
LeoPatOZ Oct 28, 2025
f2b6d47
ref: comment
LeoPatOZ Oct 28, 2025
f622b21
feat: move block not found to provider
LeoPatOZ Oct 28, 2025
058a3ed
fix: doc test
LeoPatOZ Oct 28, 2025
14ed0e4
ref: use matches
LeoPatOZ Oct 28, 2025
a531b7d
ref: refactor ok or else
LeoPatOZ Oct 28, 2025
435dee3
fix: Retry updates (#141)
0xNeshi Oct 28, 2025
617629e
fix: rename error
LeoPatOZ Oct 28, 2025
f2b2281
feat: unwrap block in robust provider
LeoPatOZ Oct 28, 2025
ec32c31
Merge branch 'main' into retry-logic
LeoPatOZ Oct 28, 2025
c9f140d
fix: more merge errors
LeoPatOZ Oct 28, 2025
8eff737
Update src/block_range_scanner.rs
LeoPatOZ Oct 28, 2025
b34dbbe
fix: brackets
LeoPatOZ Oct 28, 2025
82878f6
fix: only return true on reorg detected if err is block not found
LeoPatOZ Oct 28, 2025
ac3d47f
Merge branch 'main' into retry-logic
LeoPatOZ Oct 29, 2025
63cefb9
feat: merge changes
LeoPatOZ Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tokio-stream = "0.1.17"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
hex = "0.4"
backon = "1.5.2"

[package]
name = "event-scanner"
Expand Down Expand Up @@ -66,6 +67,7 @@ alloy-node-bindings.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
backon.workspace = true

[lints]
workspace = true
Expand Down
131 changes: 80 additions & 51 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@
//! error!("Received error from subscription: {e}");
//! match e {
//! ScannerError::ServiceShutdown => break,
//! ScannerError::WebSocketConnectionFailed(_) => {
//! error!(
//! "WebSocket connection failed, continuing to listen for reconnection"
//! );
//! }
//! _ => {
//! error!("Non-fatal error, continuing: {e}");
//! }
Expand All @@ -63,24 +58,28 @@
//! }
//! ```

use std::{cmp::Ordering, ops::RangeInclusive};
use std::{cmp::Ordering, ops::RangeInclusive, time::Duration};
use tokio::{
join,
sync::{mpsc, oneshot},
try_join,
};
use tokio_stream::{StreamExt, wrappers::ReceiverStream};

use crate::{
ScannerMessage,
error::ScannerError,
robust_provider::{
DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL,
Error as RobustProviderError, RobustProvider,
},
types::{ScannerStatus, TryStream},
};
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::{B256, BlockNumber},
providers::{Provider, RootProvider},
providers::RootProvider,
pubsub::Subscription,
rpc::client::ClientBuilder,
transports::{
Expand Down Expand Up @@ -113,6 +112,12 @@ impl PartialEq<RangeInclusive<BlockNumber>> for Message {
}
}

impl From<RobustProviderError> for Message {
fn from(error: RobustProviderError) -> Self {
Message::Error(error.into())
}
}

impl From<RpcError<TransportErrorKind>> for Message {
fn from(error: RpcError<TransportErrorKind>) -> Self {
Message::Error(error.into())
Expand All @@ -128,6 +133,9 @@ impl From<ScannerError> for Message {
#[derive(Clone, Copy)]
pub struct BlockRangeScanner {
pub max_block_range: u64,
pub max_timeout: Duration,
pub max_retries: usize,
pub retry_interval: Duration,
}

impl Default for BlockRangeScanner {
Expand All @@ -139,7 +147,12 @@ impl Default for BlockRangeScanner {
impl BlockRangeScanner {
#[must_use]
pub fn new() -> Self {
Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
Self {
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
max_timeout: DEFAULT_MAX_TIMEOUT,
max_retries: DEFAULT_MAX_RETRIES,
retry_interval: DEFAULT_RETRY_INTERVAL,
}
}

#[must_use]
Expand All @@ -148,6 +161,24 @@ impl BlockRangeScanner {
self
}

#[must_use]
pub fn with_max_timeout(mut self, rpc_timeout: Duration) -> Self {
self.max_timeout = rpc_timeout;
self
}

#[must_use]
pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self {
self.max_retries = rpc_max_retries;
self
}

#[must_use]
pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self {
self.retry_interval = rpc_retry_interval;
self
}

/// Connects to the provider via WebSocket
///
/// # Errors
Expand Down Expand Up @@ -182,19 +213,26 @@ impl BlockRangeScanner {
/// Returns an error if the connection fails
#[must_use]
pub fn connect<N: Network>(self, provider: RootProvider<N>) -> ConnectedBlockRangeScanner<N> {
ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
let robust_provider = RobustProvider::new(provider)
.max_timeout(self.max_timeout)
.max_retries(self.max_retries)
.retry_interval(self.retry_interval);
ConnectedBlockRangeScanner {
provider: robust_provider,
max_block_range: self.max_block_range,
}
}
}

pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RootProvider<N>,
provider: RobustProvider<N>,
max_block_range: u64,
}

impl<N: Network> ConnectedBlockRangeScanner<N> {
/// Returns the underlying Provider.
/// Returns the `RobustProvider`
#[must_use]
pub fn provider(&self) -> &RootProvider<N> {
pub fn provider(&self) -> &RobustProvider<N> {
&self.provider
}

Expand Down Expand Up @@ -240,15 +278,15 @@ pub enum Command {
}

struct Service<N: Network> {
provider: RootProvider<N>,
provider: RobustProvider<N>,
max_block_range: u64,
error_count: u64,
command_receiver: mpsc::Receiver<Command>,
shutdown: bool,
}

impl<N: Network> Service<N> {
pub fn new(provider: RootProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
pub fn new(provider: RobustProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);

let service = Self {
Expand Down Expand Up @@ -351,10 +389,8 @@ impl<N: Network> Service<N> {
self.provider.get_block_by_number(end_height)
)?;

let start_block_num =
start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
let end_block_num =
end_block.ok_or_else(|| ScannerError::BlockNotFound(end_height))?.header().number();
let start_block_num = start_block.header().number();
let end_block_num = end_block.header().number();

let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
Ordering::Greater => (end_block_num, start_block_num),
Expand Down Expand Up @@ -391,12 +427,8 @@ impl<N: Network> Service<N> {
self.provider.get_block_by_number(BlockNumberOrTag::Latest)
)?;

let start_block_num =
start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
let latest_block = latest_block
.ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
.header()
.number();
let start_block_num = start_block.header().number();
let latest_block = latest_block.header().number();

let confirmed_tip_num = latest_block.saturating_sub(block_confirmations);

Expand Down Expand Up @@ -491,13 +523,10 @@ impl<N: Network> Service<N> {
let max_block_range = self.max_block_range;
let provider = self.provider.clone();

let (start_block, end_block) = join!(
let (start_block, end_block) = try_join!(
self.provider.get_block_by_number(start_height),
self.provider.get_block_by_number(end_height),
);

let start_block = start_block?.ok_or(ScannerError::BlockNotFound(start_height))?;
let end_block = end_block?.ok_or(ScannerError::BlockNotFound(end_height))?;
)?;

// normalize block range
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
Expand All @@ -524,7 +553,7 @@ impl<N: Network> Service<N> {
to: N::BlockResponse,
max_block_range: u64,
sender: &mpsc::Sender<Message>,
provider: &RootProvider<N>,
provider: &RobustProvider<N>,
) {
let mut batch_count = 0;

Expand Down Expand Up @@ -576,13 +605,11 @@ impl<N: Network> Service<N> {
batch_from = from;
// store the updated end block hash
tip_hash = match provider.get_block_by_number(from.into()).await {
Ok(block) => block
.unwrap_or_else(|| {
panic!("Block with number '{from}' should exist post-reorg")
})
.header()
.hash(),
Ok(block) => block.header().hash(),
Err(e) => {
if matches!(e, RobustProviderError::BlockNotFound(_)) {
panic!("Block with number '{from}' should exist post-reorg");
}
error!(error = %e, "Terminal RPC call error, shutting down");
_ = sender.try_stream(e);
return;
Expand Down Expand Up @@ -636,9 +663,9 @@ impl<N: Network> Service<N> {
info!(batch_count = batch_count, "Historical sync completed");
}

async fn stream_live_blocks<P: Provider<N>>(
async fn stream_live_blocks(
mut range_start: BlockNumber,
provider: P,
provider: RobustProvider<N>,
sender: mpsc::Sender<Message>,
block_confirmations: u64,
max_block_range: u64,
Expand Down Expand Up @@ -735,22 +762,18 @@ impl<N: Network> Service<N> {
}

async fn get_block_subscription(
provider: &impl Provider<N>,
provider: &RobustProvider<N>,
) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
let ws_stream = provider
.subscribe_blocks()
.await
.map_err(|_| ScannerError::WebSocketConnectionFailed(1))?;

let ws_stream = provider.subscribe_blocks().await?;
Ok(ws_stream)
}
}

async fn reorg_detected<N: Network>(
provider: &RootProvider<N>,
provider: &RobustProvider<N>,
hash_to_check: B256,
) -> Result<bool, RpcError<TransportErrorKind>> {
Ok(provider.get_block_by_hash(hash_to_check).await?.is_none())
Ok(provider.get_block_by_hash(hash_to_check).await.is_err())
}

pub struct BlockRangeScannerClient {
Expand Down Expand Up @@ -899,6 +922,7 @@ mod tests {
use super::*;
use crate::{assert_closed, assert_empty, assert_next};
use alloy::{
eips::BlockId,
network::Ethereum,
providers::{ProviderBuilder, ext::AnvilApi},
rpc::types::anvil::ReorgOptions,
Expand Down Expand Up @@ -1351,13 +1375,15 @@ mod tests {

#[tokio::test]
async fn try_send_forwards_errors_to_subscribers() {
let (tx, mut rx) = mpsc::channel(1);
let (tx, mut rx) = mpsc::channel::<Message>(1);

_ = tx.try_stream(ScannerError::WebSocketConnectionFailed(4)).await;
_ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await;

assert!(matches!(
rx.recv().await,
Some(Message::Error(ScannerError::WebSocketConnectionFailed(4)))
Some(ScannerMessage::Error(ScannerError::BlockNotFound(BlockId::Number(
BlockNumberOrTag::Number(4)
))))
));
}

Expand Down Expand Up @@ -1553,7 +1579,10 @@ mod tests {

let stream = client.rewind(0, 999).await;

assert!(matches!(stream, Err(ScannerError::BlockNotFound(BlockNumberOrTag::Number(999)))));
assert!(matches!(
stream,
Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(999))))
));

Ok(())
}
Expand Down
Loading