diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index 541a71c27..4018a6141 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -331,6 +331,27 @@ impl OpManager { } } +async fn notify_transaction_timeout( + event_loop_notifier: &EventLoopNotificationsSender, + tx: Transaction, +) -> bool { + match event_loop_notifier + .notifications_sender + .send(Either::Right(NodeEvent::TransactionTimedOut(tx))) + .await + { + Ok(()) => true, + Err(err) => { + tracing::warn!( + tx = %tx, + error = ?err, + "Failed to notify event loop about timed out transaction; receiver likely dropped" + ); + false + } + } +} + async fn garbage_cleanup_task( mut new_transactions: tokio::sync::mpsc::Receiver, ops: Arc, @@ -376,7 +397,7 @@ async fn garbage_cleanup_task( ops.under_progress.remove(&tx); ops.completed.remove(&tx); tracing::debug!("Transaction timed out: {tx}"); - event_loop_notifier.notifications_sender.send(Either::Right(NodeEvent::TransactionTimedOut(tx))).await.unwrap(); + notify_transaction_timeout(&event_loop_notifier, tx).await; live_tx_tracker.remove_finished_transaction(tx); } } @@ -405,7 +426,7 @@ async fn garbage_cleanup_task( }; if removed { tracing::debug!("Transaction timed out: {tx}"); - event_loop_notifier.notifications_sender.send(Either::Right(NodeEvent::TransactionTimedOut(tx))).await.unwrap(); + notify_transaction_timeout(&event_loop_notifier, tx).await; live_tx_tracker.remove_finished_transaction(tx); } } @@ -413,3 +434,55 @@ async fn garbage_cleanup_task( } } } + +#[cfg(test)] +mod tests { + use super::super::network_bridge::event_loop_notification_channel; + use super::*; + use crate::node::network_bridge::EventLoopNotificationsReceiver; + use either::Either; + use tokio::time::{timeout, Duration}; + + #[tokio::test] + async fn notify_timeout_succeeds_when_receiver_alive() { + let (receiver, notifier) = event_loop_notification_channel(); + let EventLoopNotificationsReceiver { + mut notifications_receiver, + .. + } = receiver; + + let tx = Transaction::ttl_transaction(); + + let delivered = notify_transaction_timeout(¬ifier, tx).await; + assert!( + delivered, + "notification should be delivered while receiver is alive" + ); + + let received = timeout(Duration::from_millis(100), notifications_receiver.recv()) + .await + .expect("timed out waiting for notification") + .expect("notification channel closed"); + + match received { + Either::Right(NodeEvent::TransactionTimedOut(observed)) => { + assert_eq!(observed, tx, "unexpected transaction in notification"); + } + other => panic!("unexpected notification: {other:?}"), + } + } + + #[tokio::test] + async fn notify_timeout_handles_dropped_receiver() { + let (receiver, notifier) = event_loop_notification_channel(); + drop(receiver); + + let tx = Transaction::ttl_transaction(); + + let delivered = notify_transaction_timeout(¬ifier, tx).await; + assert!( + !delivered, + "notification delivery should fail once receiver is dropped" + ); + } +}