Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 0ffd222

Browse files
committed
feat: switch tce-lib action to spawn tasks
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
1 parent 5b6ddb8 commit 0ffd222

File tree

3 files changed

+138
-116
lines changed

3 files changed

+138
-116
lines changed

crates/topos-tce/src/app_context/api.rs

Lines changed: 72 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::AppContext;
22
use std::collections::HashMap;
3+
use tokio::spawn;
34
use topos_core::uci::{Certificate, SubnetId};
45
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
56
use topos_tce_api::RuntimeError;
@@ -20,79 +21,82 @@ impl AppContext {
2021
self.delivery_latency
2122
.insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer());
2223

23-
_ = match self
24-
.validator_store
25-
.insert_pending_certificate(&certificate)
26-
.await
27-
{
28-
Ok(Some(pending_id)) => {
29-
let certificate_id = certificate.id;
30-
debug!(
31-
"Certificate {} from subnet {} has been inserted into pending pool",
32-
certificate_id, certificate.source_subnet_id
33-
);
24+
let validator_store = self.validator_store.clone();
25+
let double_echo = self.tce_cli.get_double_echo_channel();
3426

35-
if self
36-
.tce_cli
37-
.get_double_echo_channel()
38-
.send(DoubleEchoCommand::Broadcast {
39-
need_gossip: true,
40-
cert: *certificate,
41-
pending_id,
42-
})
43-
.await
44-
.is_err()
45-
{
46-
error!(
47-
"Unable to send DoubleEchoCommand::Broadcast command to double \
48-
echo for {}",
49-
certificate_id
27+
spawn(async move {
28+
_ = match validator_store
29+
.insert_pending_certificate(&certificate)
30+
.await
31+
{
32+
Ok(Some(pending_id)) => {
33+
let certificate_id = certificate.id;
34+
debug!(
35+
"Certificate {} from subnet {} has been inserted into pending pool",
36+
certificate_id, certificate.source_subnet_id
5037
);
5138

52-
sender.send(Err(RuntimeError::CommunicationError(
53-
"Unable to send DoubleEchoCommand::Broadcast command to double \
54-
echo"
55-
.to_string(),
56-
)))
57-
} else {
58-
sender.send(Ok(PendingResult::InPending(pending_id)))
39+
if double_echo
40+
.send(DoubleEchoCommand::Broadcast {
41+
need_gossip: true,
42+
cert: *certificate,
43+
pending_id,
44+
})
45+
.await
46+
.is_err()
47+
{
48+
error!(
49+
"Unable to send DoubleEchoCommand::Broadcast command to \
50+
double echo for {}",
51+
certificate_id
52+
);
53+
54+
sender.send(Err(RuntimeError::CommunicationError(
55+
"Unable to send DoubleEchoCommand::Broadcast command to \
56+
double echo"
57+
.to_string(),
58+
)))
59+
} else {
60+
sender.send(Ok(PendingResult::InPending(pending_id)))
61+
}
5962
}
60-
}
61-
Ok(None) => {
62-
debug!(
63-
"Certificate {} from subnet {} has been inserted into precedence pool \
64-
waiting for {}",
65-
certificate.id, certificate.source_subnet_id, certificate.prev_id
66-
);
67-
sender.send(Ok(PendingResult::AwaitPrecedence))
68-
}
69-
Err(StorageError::InternalStorage(
70-
InternalStorageError::CertificateAlreadyPending,
71-
)) => {
72-
debug!(
73-
"Certificate {} has already been added to the pending pool, skipping",
74-
certificate.id
75-
);
76-
sender.send(Ok(PendingResult::AlreadyPending))
77-
}
78-
Err(StorageError::InternalStorage(
79-
InternalStorageError::CertificateAlreadyExists,
80-
)) => {
81-
debug!(
82-
"Certificate {} has already been delivered, skipping",
83-
certificate.id
84-
);
85-
sender.send(Ok(PendingResult::AlreadyDelivered))
86-
}
87-
Err(error) => {
88-
error!(
89-
"Unable to insert pending certificate {}: {}",
90-
certificate.id, error
91-
);
63+
Ok(None) => {
64+
debug!(
65+
"Certificate {} from subnet {} has been inserted into precedence \
66+
pool waiting for {}",
67+
certificate.id, certificate.source_subnet_id, certificate.prev_id
68+
);
69+
sender.send(Ok(PendingResult::AwaitPrecedence))
70+
}
71+
Err(StorageError::InternalStorage(
72+
InternalStorageError::CertificateAlreadyPending,
73+
)) => {
74+
debug!(
75+
"Certificate {} has already been added to the pending pool, \
76+
skipping",
77+
certificate.id
78+
);
79+
sender.send(Ok(PendingResult::AlreadyPending))
80+
}
81+
Err(StorageError::InternalStorage(
82+
InternalStorageError::CertificateAlreadyExists,
83+
)) => {
84+
debug!(
85+
"Certificate {} has already been delivered, skipping",
86+
certificate.id
87+
);
88+
sender.send(Ok(PendingResult::AlreadyDelivered))
89+
}
90+
Err(error) => {
91+
error!(
92+
"Unable to insert pending certificate {}: {}",
93+
certificate.id, error
94+
);
9295

93-
sender.send(Err(error.into()))
94-
}
95-
};
96+
sender.send(Err(error.into()))
97+
}
98+
};
99+
});
96100
}
97101

98102
ApiEvent::GetSourceHead { subnet_id, sender } => {

crates/topos-tce/src/app_context/protocol.rs

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use tokio::spawn;
12
use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready};
23
use topos_tce_broadcast::event::ProtocolEvents;
34
use tracing::{error, info, warn};
@@ -14,65 +15,68 @@ impl AppContext {
1415
ProtocolEvents::Gossip { cert } => {
1516
let cert_id = cert.id;
1617

17-
let request = DoubleEchoRequest {
18-
request: Some(double_echo_request::Request::Gossip(Gossip {
19-
certificate: Some(cert.into()),
20-
})),
21-
};
18+
let network_client = self.network_client.clone();
19+
spawn(async move {
20+
let request = DoubleEchoRequest {
21+
request: Some(double_echo_request::Request::Gossip(Gossip {
22+
certificate: Some(cert.into()),
23+
})),
24+
};
2225

23-
info!("Sending Gossip for certificate {}", cert_id);
24-
if let Err(e) = self
25-
.network_client
26-
.publish(topos_p2p::TOPOS_GOSSIP, request)
27-
.await
28-
{
29-
error!("Unable to send Gossip: {e}");
30-
}
26+
info!("Sending Gossip for certificate {}", cert_id);
27+
if let Err(e) = network_client
28+
.publish(topos_p2p::TOPOS_GOSSIP, request)
29+
.await
30+
{
31+
error!("Unable to send Gossip: {e}");
32+
}
33+
});
3134
}
3235

3336
ProtocolEvents::Echo {
3437
certificate_id,
3538
signature,
3639
validator_id,
3740
} if self.is_validator => {
38-
// Send echo message
39-
let request = DoubleEchoRequest {
40-
request: Some(double_echo_request::Request::Echo(Echo {
41-
certificate_id: Some(certificate_id.into()),
42-
signature: Some(signature.into()),
43-
validator_id: Some(validator_id.into()),
44-
})),
45-
};
41+
let network_client = self.network_client.clone();
42+
spawn(async move {
43+
// Send echo message
44+
let request = DoubleEchoRequest {
45+
request: Some(double_echo_request::Request::Echo(Echo {
46+
certificate_id: Some(certificate_id.into()),
47+
signature: Some(signature.into()),
48+
validator_id: Some(validator_id.into()),
49+
})),
50+
};
4651

47-
if let Err(e) = self
48-
.network_client
49-
.publish(topos_p2p::TOPOS_ECHO, request)
50-
.await
51-
{
52-
error!("Unable to send Echo: {e}");
53-
}
52+
if let Err(e) = network_client.publish(topos_p2p::TOPOS_ECHO, request).await {
53+
error!("Unable to send Echo: {e}");
54+
}
55+
});
5456
}
5557

5658
ProtocolEvents::Ready {
5759
certificate_id,
5860
signature,
5961
validator_id,
6062
} if self.is_validator => {
61-
let request = DoubleEchoRequest {
62-
request: Some(double_echo_request::Request::Ready(Ready {
63-
certificate_id: Some(certificate_id.into()),
64-
signature: Some(signature.into()),
65-
validator_id: Some(validator_id.into()),
66-
})),
67-
};
63+
let network_client = self.network_client.clone();
64+
spawn(async move {
65+
let request = DoubleEchoRequest {
66+
request: Some(double_echo_request::Request::Ready(Ready {
67+
certificate_id: Some(certificate_id.into()),
68+
signature: Some(signature.into()),
69+
validator_id: Some(validator_id.into()),
70+
})),
71+
};
6872

69-
if let Err(e) = self
70-
.network_client
71-
.publish(topos_p2p::TOPOS_READY, request)
72-
.await
73-
{
74-
error!("Unable to send Ready: {e}");
75-
}
73+
if let Err(e) = network_client
74+
.publish(topos_p2p::TOPOS_READY, request)
75+
.await
76+
{
77+
error!("Unable to send Ready: {e}");
78+
}
79+
});
7680
}
7781
ProtocolEvents::BroadcastFailed { certificate_id } => {
7882
warn!("Broadcast failed for certificate {certificate_id}")

crates/topos-tce/src/tests/mod.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use libp2p::PeerId;
22
use rstest::{fixture, rstest};
3-
use std::{collections::HashSet, future::IntoFuture, sync::Arc};
3+
use std::{collections::HashSet, future::IntoFuture, sync::Arc, time::Duration};
44
use tokio_stream::Stream;
55
use topos_tce_api::RuntimeEvent;
66
use topos_tce_broadcast::event::ProtocolEvents;
77
use topos_tce_gatekeeper::Gatekeeper;
88

9+
use test_log::test;
910
use tokio::sync::{broadcast, mpsc};
1011
use topos_crypto::messages::MessageSigner;
1112
use topos_p2p::{utils::GrpcOverP2P, NetworkClient};
@@ -24,7 +25,8 @@ mod api;
2425
mod network;
2526

2627
#[rstest]
27-
#[tokio::test]
28+
#[test(tokio::test)]
29+
#[timeout(Duration::from_secs(1))]
2830
async fn non_validator_publish_gossip(
2931
#[future] setup_test: (
3032
AppContext,
@@ -33,21 +35,25 @@ async fn non_validator_publish_gossip(
3335
),
3436
) {
3537
let (mut context, mut p2p_receiver, _) = setup_test.await;
38+
3639
let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 1);
40+
3741
context
3842
.on_protocol_event(ProtocolEvents::Gossip {
3943
cert: certificates[0].certificate.clone(),
4044
})
4145
.await;
4246

47+
let x = p2p_receiver.recv().await;
4348
assert!(matches!(
44-
p2p_receiver.try_recv(),
45-
Ok(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip"
49+
x,
50+
Some(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip"
4651
));
4752
}
4853

4954
#[rstest]
50-
#[tokio::test]
55+
#[test(tokio::test)]
56+
#[timeout(Duration::from_secs(1))]
5157
async fn non_validator_do_not_publish_echo(
5258
#[future] setup_test: (
5359
AppContext,
@@ -64,11 +70,16 @@ async fn non_validator_do_not_publish_echo(
6470
})
6571
.await;
6672

67-
assert!(p2p_receiver.try_recv().is_err(),);
73+
assert!(
74+
tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv())
75+
.await
76+
.is_err()
77+
);
6878
}
6979

7080
#[rstest]
7181
#[tokio::test]
82+
#[timeout(Duration::from_secs(1))]
7283
async fn non_validator_do_not_publish_ready(
7384
#[future] setup_test: (
7485
AppContext,
@@ -77,6 +88,7 @@ async fn non_validator_do_not_publish_ready(
7788
),
7889
) {
7990
let (mut context, mut p2p_receiver, message_signer) = setup_test.await;
91+
8092
context
8193
.on_protocol_event(ProtocolEvents::Ready {
8294
certificate_id: CERTIFICATE_ID_1,
@@ -98,6 +110,7 @@ pub async fn setup_test(
98110
Arc<MessageSigner>,
99111
) {
100112
let validator_store = create_validator_store.await;
113+
101114
let is_validator = false;
102115
let message_signer = Arc::new(MessageSigner::new(&[5u8; 32]).unwrap());
103116
let validator_id = message_signer.public_address.into();
@@ -128,6 +141,7 @@ pub async fn setup_test(
128141
};
129142

130143
let (api_context, _api_stream) = create_public_api.await;
144+
131145
let api_client = api_context.client;
132146

133147
let (gatekeeper_client, _) = Gatekeeper::builder().into_future().await.unwrap();

0 commit comments

Comments
 (0)