-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add 'Get Latest X Events and Switch to Live Streaming' Method #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This reverts commit 528bf86.
| "examples/sync_scanning" | ||
| "examples/sync_from_latest_scanning", | ||
| "examples/sync_from_block_scanning", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align names
|
|
||
| See integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples. | ||
|
|
||
| ### Scanning Latest Events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to the appropriate builder fn's doc comment
| let get_start_block = async || -> Result<BlockNumber, ScannerError> { | ||
| let block = match start_height { | ||
| BlockNumberOrTag::Number(num) => num, | ||
| block_tag => provider | ||
| .get_block_by_number(block_tag) | ||
| .await? | ||
| .ok_or_else(|| ScannerError::BlockNotFound(block_tag))? | ||
| .header() | ||
| .number(), | ||
| }; | ||
| Ok(block) | ||
| }; | ||
|
|
||
| let get_latest_block = async || -> Result<BlockNumber, ScannerError> { | ||
| let block = provider | ||
| .get_block_by_number(BlockNumberOrTag::Latest) | ||
| .await? | ||
| .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? | ||
| .header() | ||
| .number(); | ||
| Ok(block) | ||
| }; | ||
|
|
||
| // Step 1: | ||
| // Fetches the starting block and end block for historical sync in parallel | ||
| let (start_block, latest_block) = tokio::try_join!( | ||
| self.provider.get_block_by_number(start_height), | ||
| self.provider.get_block_by_number(BlockNumberOrTag::Latest) | ||
| )?; | ||
| let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?; | ||
|
|
||
| let start_block_num = | ||
| start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number(); | ||
| let latest_block = latest_block | ||
| .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? | ||
| .header() | ||
| .number(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the big changes in this PR is the introduction of support for starting the stream from a future block, which is needed for implementing the PR's main feature (sync from latest). For more info, see How it works doc section.
Because the start_block logic changed a bit to accommodate this new requirement, I refactored the implementation to make it a bit easier to reason about.
Open to suggestions though
| pub async fn stream_historical<N: Into<BlockNumberOrTag>>( | ||
| pub async fn stream_historical( | ||
| &self, | ||
| start_height: N, | ||
| end_height: N, | ||
| start_height: impl Into<BlockNumberOrTag>, | ||
| end_height: impl Into<BlockNumberOrTag>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separating param types enables previously impossible invocations, like:
stream_historical(0, BlockNumberOrTag::Latest)| pub async fn handle_stream<N: Network>( | ||
| mut stream: ReceiverStream<BlockRangeMessage>, | ||
| // Note: assumes it is running in a separate tokio task, so as to be non-blocking. | ||
| pub async fn handle_stream<N: Network, S: Stream<Item = BlockRangeMessage> + Unpin>( | ||
| mut stream: S, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enables passing both ReceiverStream and Chain<ReceiverStream<...
| }, | ||
| }; | ||
|
|
||
| pub struct SyncFromBlockEventScannerBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except the builder name, nothing changed, just moved to the new file
Co-authored-by: Leo <leonard.paturel@openzeppelin.com>
Resolves #113
Resolves #72