Skip to content

Conversation

@0xNeshi
Copy link
Collaborator

@0xNeshi 0xNeshi commented Oct 23, 2025

Towards #73
Closes #120
Closes #118

  • avoids forcing lib users to manually wrap scanners in tokio tasks
  • better error bubbling DevEx
  • fixes flaky tests <- used to fail due to race conditions; with this PR, scanner logic becomes more deterministic

Additional changes:

  • remove redundant code
  • add additional macro helpers for stream assertions
  • test refactors <- use macro helpers

@0xNeshi 0xNeshi self-assigned this Oct 23, 2025
LeoPatOZ
LeoPatOZ previously approved these changes Oct 27, 2025
Copy link
Collaborator

@LeoPatOZ LeoPatOZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to merge (once conflicts are resolved ofc)

Comment on lines -227 to -232
Unsubscribe {
response: oneshot::Sender<Result<(), ScannerError>>,
},
Shutdown {
response: oneshot::Sender<Result<(), ScannerError>>,
},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant, we'll add back if and when necessary


info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");

let sender = self.subscriber.take().ok_or_else(|| ScannerError::ServiceShutdown)?;
Copy link
Collaborator Author

@0xNeshi 0xNeshi Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more need to manually drop the sender (with .take() call), as this is done implicitly with sender being a locally scoped parameter.

Comment on lines -454 to +444
let live_subscription_task = tokio::spawn(async move {
tokio::spawn(async move {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to manually stop live task, as stream_historical_blocks no longer returns errors

Comment on lines 289 to 292
Command::StreamLive { sender, block_confirmations, response } => {
self.ensure_no_subscriber()?;
info!("Starting live stream");
self.subscriber = Some(sender);
let result = self.handle_live(block_confirmations).await;
let result = self.handle_live(block_confirmations, sender).await;
let _ = response.send(result);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is redundant to:

  • ensure no subscribers exist
  • set option field self.subscriber
  • immediately after .take() it to ensure the subscriber channel is closed after scanner is done.
  • pass "taken" subscriber into tokio tasks

Much more straightforward to pass the subscriber directly.

Comment on lines -1352 to +1147
assert_next!(stream, None);
assert_closed!(stream);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intention is clearer

}

#[tokio::test]
async fn mixed_ranges_are_handled_correctly() -> anyhow::Result<()> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrealistic case, and anyway handled by other related tests

}

#[tokio::test]
async fn cutoff_at_zero_handles_all_ranges() -> anyhow::Result<()> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implicitly handled by buffered_messages_after_cutoff_are_all_passed

Comment on lines -31 to -32
#[error("Only one subscriber allowed at a time")]
MultipleSubscribers,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can no longer occur

Comment on lines +34 to +41
tokio::spawn(async move {
while let Some(message) = stream.next().await {
if let Err(err) = range_tx.send(message) {
error!(error = %err, "No receivers, stopping broadcast");
break;
}
}
}
});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Block range broadcasting is no longer blocking

let mut stream = setup.stream.take(expected_event_count);

tokio::spawn(async move { scanner.start().await });
scanner.start().await?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since scanner starts processes in other tokio tasks, initialization errors can now easily be returned immediately and there's no more need for this tokio spawn ritual

@0xNeshi 0xNeshi requested a review from LeoPatOZ October 27, 2025 12:41
.hash();
tip_hash = match provider.get_block_by_number(from.into()).await {
Ok(block) => {
block.expect("Chain should have the same height post-reorg").header().hash()
Copy link
Collaborator

@LeoPatOZ LeoPatOZ Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we avoid string errors and have like unwrap_or / unwrap_or_else ScannerError ?

Copy link
Collaborator Author

@0xNeshi 0xNeshi Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change to unwrap_or_else to be able to reference from value.

The reason we do not return ScannerError and instead panic is because this is an invariant that must always hold true. I.e. one of our core assumptions is that a block number with value from will exist pre- and post-reorg. If this doesn't happen, our assumptions are terribly wrong and the scanner crashes, indicating we need to fix our assumptions (and the scanner).

let mut stream = setup.stream;

tokio::spawn(async move { scanner.start().await });
contract.increase().send().await?.watch().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the unravel ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's clearer and shorter (less LoC)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goes for all the test should we not use the new assert_next and increase macro for all the tests? Currently it seems inconsistent

Copy link
Collaborator Author

@0xNeshi 0xNeshi Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • assert_next: yes! didn't bother doing that in this PR, as it's too much work; only made the changes for: unreliable tests that I was debugging + new tests + tests where it was convenient to update. We'll make these updates either incrementally, or when we have nothing better to do (e.g. waiting on reviews, blocked otherwise etc.)
  • increase macro: in my other PR feat: Add 'Get Latest X Events and Switch to Live Streaming' Method #133 , I introduced a common fn increase, which we'll be able to use instead of the locally defined macros with the same name. The reason for a regular function instead of the macro is that fn is sufficient and is easier to debug

@0xNeshi 0xNeshi requested a review from LeoPatOZ October 28, 2025 10:07
Copy link
Collaborator

@LeoPatOZ LeoPatOZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice very clean!

@0xNeshi 0xNeshi merged commit 7a3ae85 into main Oct 28, 2025
13 checks passed
@0xNeshi 0xNeshi deleted the test-fixes branch October 28, 2025 12:02
@github-actions github-actions bot locked and limited conversation to collaborators Oct 28, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix: send_to_subscriber Should Stop Streaming on Channel Closed fix: Ensure Channel Senders Are Appropriately Dropped after Streaming is Done

3 participants