Skip to content

Commit fb04761

Browse files
committed
Add metrics for shard broadcast + decoding
1 parent c03e181 commit fb04761

File tree

3 files changed

+100
-8
lines changed

3 files changed

+100
-8
lines changed

consensus/src/marshal/ingress/coding/application.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ use crate::{
1010
use commonware_coding::{Config as CodingConfig, Scheme};
1111
use commonware_cryptography::{bls12381::primitives::variant::Variant, Committable, PublicKey};
1212
use commonware_runtime::{Clock, Metrics, Spawner};
13-
use futures::channel::oneshot;
13+
use futures::{channel::oneshot, lock::Mutex};
14+
use prometheus_client::metrics::gauge::Gauge;
1415
use rand::Rng;
15-
use std::sync::{Arc, Mutex};
16+
use std::{sync::Arc, time::Instant};
1617
use tracing::{debug, info, warn};
1718

1819
/// An [Application] adapter that handles erasure coding and shard verification for consensus.
@@ -34,6 +35,10 @@ where
3435
identity: P,
3536
supervisor: Z,
3637
last_built: Arc<Mutex<Option<(View, CodedBlock<B, S>)>>>,
38+
39+
parent_fetch_duration: Gauge,
40+
build_duration: Gauge,
41+
erasure_code_duration: Gauge,
3742
}
3843

3944
impl<E, A, V, B, S, P, Z> CodingAdapter<E, A, V, B, S, P, Z>
@@ -53,13 +58,38 @@ where
5358
identity: P,
5459
supervisor: Z,
5560
) -> Self {
61+
let parent_fetch_duration = Gauge::default();
62+
context.register(
63+
"parent_fetch_duration",
64+
"Time taken to fetch a parent block from marshal to build on top of, in milliseconds",
65+
parent_fetch_duration.clone(),
66+
);
67+
68+
let build_duration = Gauge::default();
69+
context.register(
70+
"build_duration",
71+
"Time taken for the application to build a new block, in milliseconds",
72+
build_duration.clone(),
73+
);
74+
75+
let erasure_code_duration = Gauge::default();
76+
context.register(
77+
"erasure_code_duration",
78+
"Time taken to erasure code a built block, in milliseconds",
79+
erasure_code_duration.clone(),
80+
);
81+
5682
Self {
5783
context,
5884
application,
5985
marshal,
6086
identity,
6187
supervisor,
6288
last_built: Arc::new(Mutex::new(None)),
89+
90+
parent_fetch_duration,
91+
build_duration,
92+
erasure_code_duration,
6393
}
6494
}
6595
}
@@ -100,10 +130,16 @@ where
100130
let n_participants = participants.len() as u16;
101131
let coding_config = coding_config_for_participants(n_participants);
102132

133+
// Metrics
134+
let parent_fetch_duration = self.parent_fetch_duration.clone();
135+
let build_duration = self.build_duration.clone();
136+
let erasure_code_duration = self.erasure_code_duration.clone();
137+
103138
let (tx, rx) = oneshot::channel();
104139
self.context
105140
.with_label("propose")
106141
.spawn(move |r_ctx| async move {
142+
let start = Instant::now();
107143
let parent_block = if parent_commitment == genesis.commitment() {
108144
genesis
109145
} else {
@@ -122,16 +158,25 @@ where
122158
return;
123159
}
124160
};
161+
parent_fetch_duration.set(start.elapsed().as_millis() as i64);
125162

163+
let start = Instant::now();
126164
let built_block = application
127165
.build(r_ctx.with_label("build"), parent_commitment, parent_block)
128166
.await;
167+
build_duration.set(start.elapsed().as_millis() as i64);
168+
169+
let start = Instant::now();
129170
let coded_block = CodedBlock::new(built_block, coding_config);
171+
erasure_code_duration.set(start.elapsed().as_millis() as i64);
172+
130173
let commitment = coded_block.commitment();
131174

132175
// Update the latest built block.
133-
let mut lock = last_built.lock().expect("failed to lock last_built mutex");
134-
*lock = Some((context.view(), coded_block));
176+
{
177+
let mut lock = last_built.lock().await;
178+
*lock = Some((context.view(), coded_block));
179+
}
135180

136181
let result = tx.send(commitment);
137182
info!(
@@ -197,7 +242,7 @@ where
197242
type Digest = B::Commitment;
198243

199244
async fn broadcast(&mut self, _commitment: Self::Digest) {
200-
let Some((round, block)) = self.last_built.lock().unwrap().clone() else {
245+
let Some((round, block)) = self.last_built.lock().await.clone() else {
201246
warn!("missing block to broadcast");
202247
return;
203248
};

consensus/src/marshal/ingress/coding/mailbox.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ use commonware_codec::{Decode, Error as CodecError};
1010
use commonware_coding::Scheme;
1111
use commonware_cryptography::{Hasher, PublicKey};
1212
use commonware_p2p::Recipients;
13+
use commonware_runtime::Metrics;
1314
use futures::channel::oneshot;
15+
use prometheus_client::metrics::gauge::Gauge;
1416
use std::{
1517
collections::{btree_map::Entry, BTreeMap},
1618
ops::Deref,
19+
time::Instant,
1720
};
1821
use thiserror::Error;
1922
use tracing::debug;
@@ -61,6 +64,10 @@ where
6164
///
6265
/// These blocks are evicted by marshal after they are delivered to the application.
6366
reconstructed_blocks: BTreeMap<CodingCommitment, CodedBlock<B, S>>,
67+
68+
block_broadcast_duration: Gauge,
69+
shard_broadcast_duration: Gauge,
70+
erasure_decode_duration: Gauge,
6471
}
6572

6673
impl<S, H, B, P> ShardMailbox<S, H, B, P>
@@ -70,12 +77,40 @@ where
7077
B: Block<Commitment = CodingCommitment>,
7178
P: PublicKey,
7279
{
73-
pub fn new(mailbox: buffered::Mailbox<P, Shard<S, H>>, block_codec_cfg: B::Cfg) -> Self {
80+
pub fn new(
81+
context: impl Metrics,
82+
mailbox: buffered::Mailbox<P, Shard<S, H>>,
83+
block_codec_cfg: B::Cfg,
84+
) -> Self {
85+
let block_broadcast_duration = Gauge::default();
86+
context.register(
87+
"block_broadcast_duration",
88+
"Duration of proposer broadcast in milliseconds",
89+
block_broadcast_duration.clone(),
90+
);
91+
92+
let shard_broadcast_duration = Gauge::default();
93+
context.register(
94+
"shard_broadcast_duration",
95+
"Duration of individual shard broadcast in milliseconds",
96+
shard_broadcast_duration.clone(),
97+
);
98+
99+
let erasure_decode_duration = Gauge::default();
100+
context.register(
101+
"erasure_decode_duration",
102+
"Duration of erasure decoding in milliseconds",
103+
erasure_decode_duration.clone(),
104+
);
105+
74106
Self {
75107
mailbox,
76108
block_codec_cfg,
77109
block_subscriptions: BTreeMap::new(),
78110
reconstructed_blocks: BTreeMap::new(),
111+
block_broadcast_duration,
112+
shard_broadcast_duration,
113+
erasure_decode_duration,
79114
}
80115
}
81116

@@ -91,12 +126,15 @@ where
91126
"number of participants must equal number of shards"
92127
);
93128

129+
let start = Instant::now();
94130
for (index, peer) in participants.into_iter().enumerate() {
95131
let message = block
96132
.shard(index)
97133
.expect("peer index impossibly out of bounds");
98134
let _peers = self.mailbox.broadcast(Recipients::One(peer), message).await;
99135
}
136+
self.block_broadcast_duration
137+
.set(start.elapsed().as_millis() as i64);
100138
}
101139

102140
/// Broadcasts a local [Shard] of a block to all peers, if the [Shard] is present
@@ -133,7 +171,11 @@ where
133171

134172
// Broadcast the weak shard to all peers for reconstruction.
135173
let reshard = Shard::new(commitment, index, DistributionShard::Weak(reshard));
174+
175+
let start = Instant::now();
136176
let _peers = self.mailbox.broadcast(Recipients::All, reshard).await;
177+
self.shard_broadcast_duration
178+
.set(start.elapsed().as_millis() as i64);
137179

138180
debug!(%commitment, index, "broadcasted local shard to all peers");
139181
} else {
@@ -218,13 +260,16 @@ where
218260
}
219261

220262
// Attempt to reconstruct the encoded blob
263+
let start = Instant::now();
221264
let decoded = S::decode(
222265
&config,
223266
&commitment.inner(),
224267
checking_data.clone(),
225268
checked_shards.as_slice(),
226269
)
227270
.map_err(ReconstructionError::CodingRecovery)?;
271+
self.erasure_decode_duration
272+
.set(start.elapsed().as_millis() as i64);
228273

229274
// Attempt to decode the block from the encoded blob
230275
let block = CodedBlock::<B, S>::decode_cfg(decoded.as_slice(), &self.block_codec_cfg)?;
@@ -418,7 +463,8 @@ mod test {
418463
PublicKey,
419464
Shard<ReedSolomon<Sha256>, Sha256>,
420465
>::new(context.clone(), config);
421-
let shard_mailbox = SMailbox::new(engine_mailbox, ());
466+
let shard_mailbox =
467+
SMailbox::new(context.with_label("shard_mailbox"), engine_mailbox, ());
422468
mailboxes.insert(peer.clone(), shard_mailbox);
423469

424470
engine.start(network);

consensus/src/marshal/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ mod tests {
206206
let network = oracle.register(secret.public_key(), 2).await.unwrap();
207207
broadcast_engine.start(network);
208208

209-
let shard_mailbox = ShardMailbox::<_, H, _, _>::new(buffer, ());
209+
let shard_mailbox =
210+
ShardMailbox::<_, H, _, _>::new(context.with_label("shard_mailbox"), buffer, ());
210211

211212
let (actor, mailbox) = actor::Actor::init(context.clone(), config).await;
212213
let application = Application::<B>::default();

0 commit comments

Comments
 (0)