Skip to content

Commit 7b3194d

Browse files
simonwickyjstuczyn
authored andcommitted
calling for shutdown from the MixTrafficController
1 parent 0b81edf commit 7b3194d

File tree

3 files changed

+14
-19
lines changed

3 files changed

+14
-19
lines changed

common/client-core/src/client/base_client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ where
801801
event_tx,
802802
);
803803

804-
let mix_tx = mix_traffic_controller.mix_rx();
804+
let mix_tx = mix_traffic_controller.mix_tx();
805805
let client_tx = mix_traffic_controller.client_tx();
806806

807807
shutdown_tracker.try_spawn_named(

common/client-core/src/client/mix_traffic/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl MixTrafficController {
8787
self.client_tx.clone()
8888
}
8989

90-
pub fn mix_rx(&self) -> BatchMixMessageSender {
90+
pub fn mix_tx(&self) -> BatchMixMessageSender {
9191
self.mix_tx.clone()
9292
}
9393

@@ -159,6 +159,11 @@ impl MixTrafficController {
159159
// Do we need to handle the embedded mixnet client case
160160
// separately?
161161
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
162+
// IMO it shouldn't be signalled from there but it is how it is
163+
// TODO : report the failure upwards and shutdown from upwards
164+
// Gateway is dead, we have to shut down currently
165+
error!("Signalling shutdown from the MixTrafficController");
166+
self.shutdown_token.cancel();
162167
break;
163168
}
164169
}

common/client-core/src/client/real_messages_control/real_traffic_stream.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -598,20 +598,15 @@ where
598598
tracing::trace!("OutQueueControl: Received shutdown");
599599
break;
600600
}
601-
_ = status_timer.tick() => {
602-
self.log_status();
603-
}
604-
next_message = self.next() => if let Some(next_message) = next_message {
605-
// Check if mix_tx channel is closed BEFORE processing message
606-
if self.mix_tx.is_closed() {
607-
tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream");
601+
_ = status_timer.tick() => {
602+
self.log_status();
603+
}
604+
next_message = self.next() => if let Some(next_message) = next_message {
605+
self.on_message(next_message).await;
606+
} else {
607+
tracing::trace!("OutQueueControl: Stopping since channel closed");
608608
break;
609609
}
610-
self.on_message(next_message).await;
611-
} else {
612-
tracing::trace!("OutQueueControl: Stopping since channel closed");
613-
break;
614-
}
615610
}
616611
}
617612
}
@@ -626,11 +621,6 @@ where
626621
break;
627622
}
628623
next_message = self.next() => if let Some(next_message) = next_message {
629-
// Check if mix_tx channel is closed BEFORE processing message
630-
if self.mix_tx.is_closed() {
631-
tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream");
632-
break;
633-
}
634624
self.on_message(next_message).await;
635625
} else {
636626
tracing::trace!("OutQueueControl: Stopping since channel closed");

0 commit comments

Comments
 (0)