From 5025c49a0e015d615b1af3b25d36ac5dc59a6f0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Thu, 18 Sep 2025 10:38:27 +0100 Subject: [PATCH 1/3] using same hierarchy of trackers for client shutdown control --- .../client-core/src/client/base_client/mod.rs | 22 +++++++++---------- sdk/rust/nym-sdk/src/mixnet/client.rs | 2 +- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index a093977bbb..0002da00b5 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -892,7 +892,7 @@ where // Create a shutdown tracker for this client - either as a child of provided tracker // or get one from the registry let shutdown_tracker = match self.shutdown { - Some(parent_tracker) => parent_tracker.child_tracker(), + Some(parent_tracker) => parent_tracker.clone(), None => nym_task::get_sdk_shutdown_tracker()?, }; @@ -926,7 +926,7 @@ where self.user_agent.clone(), generate_client_stats_id(*self_address.identity()), input_sender.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); // needs to be started as the first thing to block if required waiting for the gateway @@ -936,7 +936,7 @@ where shared_topology_accessor.clone(), self_address.gateway(), self.wait_for_gateway, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; @@ -956,7 +956,7 @@ where stats_reporter.clone(), #[cfg(unix)] self.connection_fd_callback, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; let gateway_ws_fd = gateway_transceiver.ws_fd(); @@ -964,7 +964,7 @@ where let reply_storage = Self::setup_persistent_reply_storage( reply_storage_backend, key_rotation_config, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; @@ -975,7 +975,7 @@ where reply_storage.key_storage(), reply_controller_sender.clone(), stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); // The message_sender is the transmitter for any component generating sphinx packets @@ -983,10 +983,8 @@ where // traffic stream. // The MixTrafficController then sends the actual traffic - let (message_sender, client_request_sender) = Self::start_mix_traffic_controller( - gateway_transceiver, - &shutdown_tracker.child_tracker(), - ); + let (message_sender, client_request_sender) = + Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker.clone()); // Channels that the websocket listener can use to signal downstream to the real traffic // controller that connections are closed. @@ -1015,7 +1013,7 @@ where shared_lane_queue_lengths.clone(), client_connection_rx, stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); if !self @@ -1031,7 +1029,7 @@ where shared_topology_accessor.clone(), message_sender, stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); } diff --git a/sdk/rust/nym-sdk/src/mixnet/client.rs b/sdk/rust/nym-sdk/src/mixnet/client.rs index 6783776efa..bd596fa849 100644 --- a/sdk/rust/nym-sdk/src/mixnet/client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/client.rs @@ -759,7 +759,7 @@ where client_output, client_state.clone(), nym_address, - started_client.shutdown_handle.child_tracker(), + started_client.shutdown_handle.clone(), packet_type, ); From 7f4ef7f7724bf15ccf497db956a36b0ac8d63134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Thu, 18 Sep 2025 10:55:13 +0100 Subject: [PATCH 2/3] add drop guard to client tasks --- .../client-core/src/client/base_client/mod.rs | 36 ++++++++++++------- .../client-core/src/client/mix_traffic/mod.rs | 1 + .../acknowledgement_listener.rs | 1 + .../action_controller.rs | 1 + .../input_message_listener.rs | 1 + .../retransmission_request_listener.rs | 1 + .../real_traffic_stream.rs | 1 + .../client-core/src/client/received_buffer.rs | 4 +++ .../client/replies/reply_controller/mod.rs | 1 + .../src/client/statistics_control.rs | 2 ++ common/client-core/surb-storage/src/lib.rs | 1 + .../gateway-client/src/socket_state.rs | 3 ++ 12 files changed, 40 insertions(+), 13 deletions(-) diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index 0002da00b5..3508e305ae 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -341,8 +341,14 @@ where debug_config.cover_traffic, stats_tx, ); - shutdown_tracker - .try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream"); + let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard(); + shutdown_tracker.try_spawn_named_with_shutdown( + async move { + let _ = drop_guard; + stream.run().await + }, + "CoverTrafficStream", + ); } #[allow(clippy::too_many_arguments)] @@ -419,8 +425,10 @@ where "AcknowledgementController::RetransmissionRequestListener", ); + let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard(); shutdown_tracker.try_spawn_named_with_shutdown( async move { + let _ = drop_guard; sent_notification_listener.run().await; }, "AcknowledgementController::SentNotificationListener", @@ -431,8 +439,6 @@ where async move { ack_action_controller.run(shutdown_token).await }, "AcknowledgementController::ActionController", ); - - // .start(packet_type); } // buffer controlling all messages fetched from provider @@ -705,8 +711,12 @@ where // don't spawn the refresher if we don't want to be refreshing the topology. // only use the initial values obtained info!("Starting topology refresher..."); + let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard(); shutdown_tracker.try_spawn_named_with_shutdown( - async move { topology_refresher.run().await }, + async move { + let _ = drop_guard; + topology_refresher.run().await + }, "TopologyRefresher", ); } @@ -926,7 +936,7 @@ where self.user_agent.clone(), generate_client_stats_id(*self_address.identity()), input_sender.clone(), - &shutdown_tracker.clone(), + &shutdown_tracker, ); // needs to be started as the first thing to block if required waiting for the gateway @@ -936,7 +946,7 @@ where shared_topology_accessor.clone(), self_address.gateway(), self.wait_for_gateway, - &shutdown_tracker.clone(), + &shutdown_tracker, ) .await?; @@ -956,7 +966,7 @@ where stats_reporter.clone(), #[cfg(unix)] self.connection_fd_callback, - &shutdown_tracker.clone(), + &shutdown_tracker, ) .await?; let gateway_ws_fd = gateway_transceiver.ws_fd(); @@ -964,7 +974,7 @@ where let reply_storage = Self::setup_persistent_reply_storage( reply_storage_backend, key_rotation_config, - &shutdown_tracker.clone(), + &shutdown_tracker, ) .await?; @@ -975,7 +985,7 @@ where reply_storage.key_storage(), reply_controller_sender.clone(), stats_reporter.clone(), - &shutdown_tracker.clone(), + &shutdown_tracker, ); // The message_sender is the transmitter for any component generating sphinx packets @@ -984,7 +994,7 @@ where // The MixTrafficController then sends the actual traffic let (message_sender, client_request_sender) = - Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker.clone()); + Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker); // Channels that the websocket listener can use to signal downstream to the real traffic // controller that connections are closed. @@ -1013,7 +1023,7 @@ where shared_lane_queue_lengths.clone(), client_connection_rx, stats_reporter.clone(), - &shutdown_tracker.clone(), + &shutdown_tracker, ); if !self @@ -1029,7 +1039,7 @@ where shared_topology_accessor.clone(), message_sender, stats_reporter.clone(), - &shutdown_tracker.clone(), + &shutdown_tracker, ); } diff --git a/common/client-core/src/client/mix_traffic/mod.rs b/common/client-core/src/client/mix_traffic/mod.rs index 37f88ea66a..57c802449f 100644 --- a/common/client-core/src/client/mix_traffic/mod.rs +++ b/common/client-core/src/client/mix_traffic/mod.rs @@ -138,6 +138,7 @@ impl MixTrafficController { pub async fn run(&mut self) { debug!("Started MixTrafficController with graceful shutdown support"); + let _drop_guard = self.shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/real_messages_control/acknowledgement_control/acknowledgement_listener.rs b/common/client-core/src/client/real_messages_control/acknowledgement_control/acknowledgement_listener.rs index 2167718e57..665f478681 100644 --- a/common/client-core/src/client/real_messages_control/acknowledgement_control/acknowledgement_listener.rs +++ b/common/client-core/src/client/real_messages_control/acknowledgement_control/acknowledgement_listener.rs @@ -80,6 +80,7 @@ impl AcknowledgementListener { pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) { debug!("Started AcknowledgementListener with graceful shutdown support"); + let _drop_guard = shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/real_messages_control/acknowledgement_control/action_controller.rs b/common/client-core/src/client/real_messages_control/acknowledgement_control/action_controller.rs index 6262a37e23..15e9c57398 100644 --- a/common/client-core/src/client/real_messages_control/acknowledgement_control/action_controller.rs +++ b/common/client-core/src/client/real_messages_control/acknowledgement_control/action_controller.rs @@ -245,6 +245,7 @@ impl ActionController { pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) { debug!("Started ActionController with graceful shutdown support"); + let _drop_guard = shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs b/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs index 69ca92709f..2bf98fc45a 100644 --- a/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs +++ b/common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs @@ -216,6 +216,7 @@ where pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) { debug!("Started InputMessageListener with graceful shutdown support"); + let _drop_guard = shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs b/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs index 597bfecf56..0c087d558a 100644 --- a/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs +++ b/common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs @@ -167,6 +167,7 @@ where pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) { debug!("Started RetransmissionRequestListener with graceful shutdown support"); + let _drop_guard = shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index 07b3506c45..039f2f355d 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -585,6 +585,7 @@ where // avoid borrow on self let shutdown_token = self.shutdown_token.clone(); + let _drop_guard = shutdown_token.clone().drop_guard(); #[cfg(not(target_arch = "wasm32"))] { let mut status_timer = tokio::time::interval(Duration::from_secs(5)); diff --git a/common/client-core/src/client/received_buffer.rs b/common/client-core/src/client/received_buffer.rs index b31cccb419..c98ff1179c 100644 --- a/common/client-core/src/client/received_buffer.rs +++ b/common/client-core/src/client/received_buffer.rs @@ -497,6 +497,8 @@ impl RequestReceiver { pub(crate) async fn run(&mut self) { debug!("Started RequestReceiver with graceful shutdown support"); + + let _drop_guard = self.shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; @@ -540,6 +542,8 @@ impl FragmentedMessageReceiver { pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> { debug!("Started FragmentedMessageReceiver with graceful shutdown support"); + + let _drop_guard = self.shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/replies/reply_controller/mod.rs b/common/client-core/src/client/replies/reply_controller/mod.rs index 63ead1c0be..65bb9ea276 100644 --- a/common/client-core/src/client/replies/reply_controller/mod.rs +++ b/common/client-core/src/client/replies/reply_controller/mod.rs @@ -152,6 +152,7 @@ where let polling_rate = self.config.key_rotation.epoch_duration / 8; let mut invalidation_inspection = new_interval_stream(polling_rate); + let _drop_guard = shutdown_token.clone().drop_guard(); loop { tokio::select! { biased; diff --git a/common/client-core/src/client/statistics_control.rs b/common/client-core/src/client/statistics_control.rs index dcfbd2e19c..3778fa7f6e 100644 --- a/common/client-core/src/client/statistics_control.rs +++ b/common/client-core/src/client/statistics_control.rs @@ -119,6 +119,8 @@ impl StatisticsControl { let mut snapshot_interval = gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32); + let _drop_guard = shutdown_token.clone().drop_guard(); + loop { tokio::select! { biased; diff --git a/common/client-core/surb-storage/src/lib.rs b/common/client-core/surb-storage/src/lib.rs index 079c213bbc..74cc87ad7c 100644 --- a/common/client-core/surb-storage/src/lib.rs +++ b/common/client-core/surb-storage/src/lib.rs @@ -46,6 +46,7 @@ where debug!("Started PersistentReplyStorage"); if let Err(err) = self.backend.start_storage_session().await { + shutdown.cancel(); error!("failed to start the storage session - {err}"); return; } diff --git a/common/client-libs/gateway-client/src/socket_state.rs b/common/client-libs/gateway-client/src/socket_state.rs index 4f3009e389..44f740c19f 100644 --- a/common/client-libs/gateway-client/src/socket_state.rs +++ b/common/client-libs/gateway-client/src/socket_state.rs @@ -89,6 +89,8 @@ impl PartiallyDelegatedRouter { async fn run(mut self, mut split_stream: SplitStream, shutdown_token: ShutdownToken) { let mut chunked_stream = (&mut split_stream).ready_chunks(8); + + let drop_guard = shutdown_token.clone().drop_guard(); let ret: Result<_, GatewayClientError> = loop { tokio::select! { biased; @@ -101,6 +103,7 @@ impl PartiallyDelegatedRouter { // received request to stop the task and return the stream _ = &mut self.stream_return_requester => { log::debug!("received request to return the split ws stream"); + drop_guard.disarm(); break Ok(()) } socket_msgs = chunked_stream.next() => { From 351acb7875cff4da0a4e169b3639439ebdf1aa51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Thu, 18 Sep 2025 10:57:01 +0100 Subject: [PATCH 3/3] expose cancelled --- sdk/rust/nym-sdk/src/mixnet/native_client.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdk/rust/nym-sdk/src/mixnet/native_client.rs b/sdk/rust/nym-sdk/src/mixnet/native_client.rs index c9495680b2..9050ff45a1 100644 --- a/sdk/rust/nym-sdk/src/mixnet/native_client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/native_client.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio::sync::RwLockReadGuard; use tokio_util::sync::CancellationToken; +use tokio_util::sync::WaitForCancellationFutureOwned; /// Client connected to the Nym mixnet. pub struct MixnetClient { @@ -273,6 +274,12 @@ impl MixnetClient { } } } + + pub fn cancelled(&self) -> WaitForCancellationFutureOwned { + self.shutdown_handle + .clone_shutdown_token() + .cancelled_owned() + } } #[derive(Clone)]