Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,14 @@ where
debug_config.cover_traffic,
stats_tx,
);
shutdown_tracker
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
let _ = drop_guard;
stream.run().await
},
"CoverTrafficStream",
);
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -419,8 +425,10 @@ where
"AcknowledgementController::RetransmissionRequestListener",
);

let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
let _ = drop_guard;
sent_notification_listener.run().await;
},
"AcknowledgementController::SentNotificationListener",
Expand All @@ -431,8 +439,6 @@ where
async move { ack_action_controller.run(shutdown_token).await },
"AcknowledgementController::ActionController",
);

// .start(packet_type);
}

// buffer controlling all messages fetched from provider
Expand Down Expand Up @@ -705,8 +711,12 @@ where
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move { topology_refresher.run().await },
async move {
let _ = drop_guard;
topology_refresher.run().await
},
"TopologyRefresher",
);
}
Expand Down Expand Up @@ -892,7 +902,7 @@ where
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.child_tracker(),
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::get_sdk_shutdown_tracker()?,
};

Expand Down Expand Up @@ -926,7 +936,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);

// needs to be started as the first thing to block if required waiting for the gateway
Expand All @@ -936,7 +946,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;

Expand All @@ -956,15 +966,15 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();

let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;

Expand All @@ -975,18 +985,16 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);

// The message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic

let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
);
let (message_sender, client_request_sender) =
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker);

// Channels that the websocket listener can use to signal downstream to the real traffic
// controller that connections are closed.
Expand Down Expand Up @@ -1015,7 +1023,7 @@ where
shared_lane_queue_lengths.clone(),
client_connection_rx,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);

if !self
Expand All @@ -1031,7 +1039,7 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
}

Expand Down
1 change: 1 addition & 0 deletions common/client-core/src/client/mix_traffic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl MixTrafficController {

pub async fn run(&mut self) {
debug!("Started MixTrafficController with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl AcknowledgementListener {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started AcknowledgementListener with graceful shutdown support");

let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl ActionController {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ActionController with graceful shutdown support");

let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ where
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started InputMessageListener with graceful shutdown support");

let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ where
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");

let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ where

// avoid borrow on self
let shutdown_token = self.shutdown_token.clone();
let _drop_guard = shutdown_token.clone().drop_guard();
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
Expand Down
4 changes: 4 additions & 0 deletions common/client-core/src/client/received_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ impl<R: MessageReceiver> RequestReceiver<R> {

pub(crate) async fn run(&mut self) {
debug!("Started RequestReceiver with graceful shutdown support");

let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -540,6 +542,8 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {

pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");

let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ where
let polling_rate = self.config.key_rotation.epoch_duration / 8;
let mut invalidation_inspection = new_interval_stream(polling_rate);

let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
Expand Down
2 changes: 2 additions & 0 deletions common/client-core/src/client/statistics_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);

let _drop_guard = shutdown_token.clone().drop_guard();

loop {
tokio::select! {
biased;
Expand Down
1 change: 1 addition & 0 deletions common/client-core/surb-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where

debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
shutdown.cancel();
error!("failed to start the storage session - {err}");
return;
}
Expand Down
3 changes: 3 additions & 0 deletions common/client-libs/gateway-client/src/socket_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl PartiallyDelegatedRouter {

async fn run(mut self, mut split_stream: SplitStream<WsConn>, shutdown_token: ShutdownToken) {
let mut chunked_stream = (&mut split_stream).ready_chunks(8);

let drop_guard = shutdown_token.clone().drop_guard();
let ret: Result<_, GatewayClientError> = loop {
tokio::select! {
biased;
Expand All @@ -101,6 +103,7 @@ impl PartiallyDelegatedRouter {
// received request to stop the task and return the stream
_ = &mut self.stream_return_requester => {
log::debug!("received request to return the split ws stream");
drop_guard.disarm();
break Ok(())
}
socket_msgs = chunked_stream.next() => {
Expand Down
2 changes: 1 addition & 1 deletion sdk/rust/nym-sdk/src/mixnet/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ where
client_output,
client_state.clone(),
nym_address,
started_client.shutdown_handle.child_tracker(),
started_client.shutdown_handle.clone(),
packet_type,
);

Expand Down
7 changes: 7 additions & 0 deletions sdk/rust/nym-sdk/src/mixnet/native_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::RwLockReadGuard;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::WaitForCancellationFutureOwned;

/// Client connected to the Nym mixnet.
pub struct MixnetClient {
Expand Down Expand Up @@ -273,6 +274,12 @@ impl MixnetClient {
}
}
}

pub fn cancelled(&self) -> WaitForCancellationFutureOwned {
self.shutdown_handle
.clone_shutdown_token()
.cancelled_owned()
}
}

#[derive(Clone)]
Expand Down
Loading