Skip to content

Conversation

@0xNeshi
Copy link
Collaborator

@0xNeshi 0xNeshi commented Oct 20, 2025

Resolves #113
Resolves #72

@0xNeshi 0xNeshi self-assigned this Oct 20, 2025
@0xNeshi 0xNeshi marked this pull request as ready for review October 27, 2025 14:07
Comment on lines -7 to +8
"examples/sync_scanning"
"examples/sync_from_latest_scanning",
"examples/sync_from_block_scanning",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Align names


See integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples.

### Scanning Latest Events
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to the appropriate builder fn's doc comment

Comment on lines +388 to -399
let get_start_block = async || -> Result<BlockNumber, ScannerError> {
let block = match start_height {
BlockNumberOrTag::Number(num) => num,
block_tag => provider
.get_block_by_number(block_tag)
.await?
.ok_or_else(|| ScannerError::BlockNotFound(block_tag))?
.header()
.number(),
};
Ok(block)
};

let get_latest_block = async || -> Result<BlockNumber, ScannerError> {
let block = provider
.get_block_by_number(BlockNumberOrTag::Latest)
.await?
.ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
.header()
.number();
Ok(block)
};

// Step 1:
// Fetches the starting block and end block for historical sync in parallel
let (start_block, latest_block) = tokio::try_join!(
self.provider.get_block_by_number(start_height),
self.provider.get_block_by_number(BlockNumberOrTag::Latest)
)?;
let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?;

let start_block_num =
start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
let latest_block = latest_block
.ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
.header()
.number();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the big changes in this PR is the introduction of support for starting the stream from a future block, which is needed for implementing the PR's main feature (sync from latest). For more info, see How it works doc section.

Because the start_block logic changed a bit to accommodate this new requirement, I refactored the implementation to make it a bit easier to reason about.

Open to suggestions though

Comment on lines -810 to +827
pub async fn stream_historical<N: Into<BlockNumberOrTag>>(
pub async fn stream_historical(
&self,
start_height: N,
end_height: N,
start_height: impl Into<BlockNumberOrTag>,
end_height: impl Into<BlockNumberOrTag>,
Copy link
Collaborator Author

@0xNeshi 0xNeshi Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separating param types enables previously impossible invocations, like:

stream_historical(0, BlockNumberOrTag::Latest)

Comment on lines 24 to 29
pub async fn handle_stream<N: Network>(
mut stream: ReceiverStream<BlockRangeMessage>,
// Note: assumes it is running in a separate tokio task, so as to be non-blocking.
pub async fn handle_stream<N: Network, S: Stream<Item = BlockRangeMessage> + Unpin>(
mut stream: S,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enables passing both ReceiverStream and Chain<ReceiverStream<...

},
};

pub struct SyncFromBlockEventScannerBuilder {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except the builder name, nothing changed, just moved to the new file

LeoPatOZ
LeoPatOZ previously approved these changes Oct 29, 2025
Co-authored-by: Leo <leonard.paturel@openzeppelin.com>
@0xNeshi 0xNeshi merged commit ecd4f7e into main Oct 29, 2025
13 checks passed
@0xNeshi 0xNeshi deleted the latest-then-live branch October 29, 2025 09:50
@github-actions github-actions bot locked and limited conversation to collaborators Oct 29, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Add 'Get Latest X Events and Switch to Live Streaming' Method Close Parallel Event Logging Broadcast Channel

3 participants