-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Run Scanner in a Separate Tokio Task #138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
LeoPatOZ
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to merge (once conflicts are resolved ofc)
| Unsubscribe { | ||
| response: oneshot::Sender<Result<(), ScannerError>>, | ||
| }, | ||
| Shutdown { | ||
| response: oneshot::Sender<Result<(), ScannerError>>, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant, we'll add back if and when necessary
|
|
||
| info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data"); | ||
|
|
||
| let sender = self.subscriber.take().ok_or_else(|| ScannerError::ServiceShutdown)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more need to manually drop the sender (with .take() call), as this is done implicitly with sender being a locally scoped parameter.
| let live_subscription_task = tokio::spawn(async move { | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to manually stop live task, as stream_historical_blocks no longer returns errors
| Command::StreamLive { sender, block_confirmations, response } => { | ||
| self.ensure_no_subscriber()?; | ||
| info!("Starting live stream"); | ||
| self.subscriber = Some(sender); | ||
| let result = self.handle_live(block_confirmations).await; | ||
| let result = self.handle_live(block_confirmations, sender).await; | ||
| let _ = response.send(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is redundant to:
- ensure no subscribers exist
- set option field
self.subscriber - immediately after
.take()it to ensure the subscriber channel is closed after scanner is done. - pass "taken" subscriber into tokio tasks
Much more straightforward to pass the subscriber directly.
| assert_next!(stream, None); | ||
| assert_closed!(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intention is clearer
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn mixed_ranges_are_handled_correctly() -> anyhow::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrealistic case, and anyway handled by other related tests
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn cutoff_at_zero_handles_all_ranges() -> anyhow::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implicitly handled by buffered_messages_after_cutoff_are_all_passed
| #[error("Only one subscriber allowed at a time")] | ||
| MultipleSubscribers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can no longer occur
| tokio::spawn(async move { | ||
| while let Some(message) = stream.next().await { | ||
| if let Err(err) = range_tx.send(message) { | ||
| error!(error = %err, "No receivers, stopping broadcast"); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Block range broadcasting is no longer blocking
| let mut stream = setup.stream.take(expected_event_count); | ||
|
|
||
| tokio::spawn(async move { scanner.start().await }); | ||
| scanner.start().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since scanner starts processes in other tokio tasks, initialization errors can now easily be returned immediately and there's no more need for this tokio spawn ritual
src/block_range_scanner.rs
Outdated
| .hash(); | ||
| tip_hash = match provider.get_block_by_number(from.into()).await { | ||
| Ok(block) => { | ||
| block.expect("Chain should have the same height post-reorg").header().hash() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we avoid string errors and have like unwrap_or / unwrap_or_else ScannerError ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to unwrap_or_else to be able to reference from value.
The reason we do not return ScannerError and instead panic is because this is an invariant that must always hold true. I.e. one of our core assumptions is that a block number with value from will exist pre- and post-reorg. If this doesn't happen, our assumptions are terribly wrong and the scanner crashes, indicating we need to fix our assumptions (and the scanner).
| let mut stream = setup.stream; | ||
|
|
||
| tokio::spawn(async move { scanner.start().await }); | ||
| contract.increase().send().await?.watch().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the unravel ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's clearer and shorter (less LoC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goes for all the test should we not use the new assert_next and increase macro for all the tests? Currently it seems inconsistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- assert_next: yes! didn't bother doing that in this PR, as it's too much work; only made the changes for: unreliable tests that I was debugging + new tests + tests where it was convenient to update. We'll make these updates either incrementally, or when we have nothing better to do (e.g. waiting on reviews, blocked otherwise etc.)
- increase macro: in my other PR feat: Add 'Get Latest X Events and Switch to Live Streaming' Method #133 , I introduced a common
fn increase, which we'll be able to use instead of the locally defined macros with the same name. The reason for a regular function instead of the macro is thatfnis sufficient and is easier to debug
LeoPatOZ
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice very clean!
Towards #73
Closes #120
Closes #118
Additional changes: