Skip to content

Commit 75280eb

Browse files
committed
add drop guard to client tasks
1 parent 3633f02 commit 75280eb

File tree

12 files changed

+40
-13
lines changed

12 files changed

+40
-13
lines changed

common/client-core/src/client/base_client/mod.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,14 @@ where
337337
debug_config.cover_traffic,
338338
stats_tx,
339339
);
340-
shutdown_tracker
341-
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
340+
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
341+
shutdown_tracker.try_spawn_named_with_shutdown(
342+
async move {
343+
let _ = drop_guard;
344+
stream.run().await
345+
},
346+
"CoverTrafficStream",
347+
);
342348
}
343349

344350
#[allow(clippy::too_many_arguments)]
@@ -415,8 +421,10 @@ where
415421
"AcknowledgementController::RetransmissionRequestListener",
416422
);
417423

424+
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
418425
shutdown_tracker.try_spawn_named_with_shutdown(
419426
async move {
427+
let _ = drop_guard;
420428
sent_notification_listener.run().await;
421429
},
422430
"AcknowledgementController::SentNotificationListener",
@@ -427,8 +435,6 @@ where
427435
async move { ack_action_controller.run(shutdown_token).await },
428436
"AcknowledgementController::ActionController",
429437
);
430-
431-
// .start(packet_type);
432438
}
433439

434440
// buffer controlling all messages fetched from provider
@@ -701,8 +707,12 @@ where
701707
// don't spawn the refresher if we don't want to be refreshing the topology.
702708
// only use the initial values obtained
703709
info!("Starting topology refresher...");
710+
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
704711
shutdown_tracker.try_spawn_named_with_shutdown(
705-
async move { topology_refresher.run().await },
712+
async move {
713+
let _ = drop_guard;
714+
topology_refresher.run().await
715+
},
706716
"TopologyRefresher",
707717
);
708718
}
@@ -917,7 +927,7 @@ where
917927
self.user_agent.clone(),
918928
generate_client_stats_id(*self_address.identity()),
919929
input_sender.clone(),
920-
&shutdown_tracker.clone(),
930+
&shutdown_tracker,
921931
);
922932

923933
// needs to be started as the first thing to block if required waiting for the gateway
@@ -927,7 +937,7 @@ where
927937
shared_topology_accessor.clone(),
928938
self_address.gateway(),
929939
self.wait_for_gateway,
930-
&shutdown_tracker.clone(),
940+
&shutdown_tracker,
931941
)
932942
.await?;
933943

@@ -947,15 +957,15 @@ where
947957
stats_reporter.clone(),
948958
#[cfg(unix)]
949959
self.connection_fd_callback,
950-
&shutdown_tracker.clone(),
960+
&shutdown_tracker,
951961
)
952962
.await?;
953963
let gateway_ws_fd = gateway_transceiver.ws_fd();
954964

955965
let reply_storage = Self::setup_persistent_reply_storage(
956966
reply_storage_backend,
957967
key_rotation_config,
958-
&shutdown_tracker.clone(),
968+
&shutdown_tracker,
959969
)
960970
.await?;
961971

@@ -966,7 +976,7 @@ where
966976
reply_storage.key_storage(),
967977
reply_controller_sender.clone(),
968978
stats_reporter.clone(),
969-
&shutdown_tracker.clone(),
979+
&shutdown_tracker,
970980
);
971981

972982
// The message_sender is the transmitter for any component generating sphinx packets
@@ -975,7 +985,7 @@ where
975985
// The MixTrafficController then sends the actual traffic
976986

977987
let (message_sender, client_request_sender) =
978-
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker.clone());
988+
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker);
979989

980990
// Channels that the websocket listener can use to signal downstream to the real traffic
981991
// controller that connections are closed.
@@ -1004,7 +1014,7 @@ where
10041014
shared_lane_queue_lengths.clone(),
10051015
client_connection_rx,
10061016
stats_reporter.clone(),
1007-
&shutdown_tracker.clone(),
1017+
&shutdown_tracker,
10081018
);
10091019

10101020
if !self
@@ -1020,7 +1030,7 @@ where
10201030
shared_topology_accessor.clone(),
10211031
message_sender,
10221032
stats_reporter.clone(),
1023-
&shutdown_tracker.clone(),
1033+
&shutdown_tracker,
10241034
);
10251035
}
10261036

common/client-core/src/client/mix_traffic/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl MixTrafficController {
138138

139139
pub async fn run(&mut self) {
140140
debug!("Started MixTrafficController with graceful shutdown support");
141+
let _drop_guard = self.shutdown_token.clone().drop_guard();
141142
loop {
142143
tokio::select! {
143144
biased;

common/client-core/src/client/real_messages_control/acknowledgement_control/acknowledgement_listener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl AcknowledgementListener {
8080
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
8181
debug!("Started AcknowledgementListener with graceful shutdown support");
8282

83+
let _drop_guard = shutdown_token.clone().drop_guard();
8384
loop {
8485
tokio::select! {
8586
biased;

common/client-core/src/client/real_messages_control/acknowledgement_control/action_controller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ impl ActionController {
245245
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
246246
debug!("Started ActionController with graceful shutdown support");
247247

248+
let _drop_guard = shutdown_token.clone().drop_guard();
248249
loop {
249250
tokio::select! {
250251
biased;

common/client-core/src/client/real_messages_control/acknowledgement_control/input_message_listener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ where
216216
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
217217
debug!("Started InputMessageListener with graceful shutdown support");
218218

219+
let _drop_guard = shutdown_token.clone().drop_guard();
219220
loop {
220221
tokio::select! {
221222
biased;

common/client-core/src/client/real_messages_control/acknowledgement_control/retransmission_request_listener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ where
167167
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
168168
debug!("Started RetransmissionRequestListener with graceful shutdown support");
169169

170+
let _drop_guard = shutdown_token.clone().drop_guard();
170171
loop {
171172
tokio::select! {
172173
biased;

common/client-core/src/client/real_messages_control/real_traffic_stream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ where
579579

580580
// avoid borrow on self
581581
let shutdown_token = self.shutdown_token.clone();
582+
let _drop_guard = shutdown_token.clone().drop_guard();
582583
#[cfg(not(target_arch = "wasm32"))]
583584
{
584585
let mut status_timer = tokio::time::interval(Duration::from_secs(5));

common/client-core/src/client/received_buffer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,8 @@ impl<R: MessageReceiver> RequestReceiver<R> {
497497

498498
pub(crate) async fn run(&mut self) {
499499
debug!("Started RequestReceiver with graceful shutdown support");
500+
501+
let _drop_guard = self.shutdown_token.clone().drop_guard();
500502
loop {
501503
tokio::select! {
502504
biased;
@@ -540,6 +542,8 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
540542

541543
pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
542544
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
545+
546+
let _drop_guard = self.shutdown_token.clone().drop_guard();
543547
loop {
544548
tokio::select! {
545549
biased;

common/client-core/src/client/replies/reply_controller/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ where
152152
let polling_rate = self.config.key_rotation.epoch_duration / 8;
153153
let mut invalidation_inspection = new_interval_stream(polling_rate);
154154

155+
let _drop_guard = shutdown_token.clone().drop_guard();
155156
loop {
156157
tokio::select! {
157158
biased;

common/client-core/src/client/statistics_control.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ impl StatisticsControl {
119119
let mut snapshot_interval =
120120
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
121121

122+
let _drop_guard = shutdown_token.clone().drop_guard();
123+
122124
loop {
123125
tokio::select! {
124126
biased;

0 commit comments

Comments
 (0)