Skip to content

Commit e87a951

Browse files
committed
more caching
1 parent fb04761 commit e87a951

File tree

6 files changed

+81
-52
lines changed

6 files changed

+81
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coding/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ pub trait Scheme: Debug + Clone + Send + Sync + 'static {
121121
///
122122
/// This allows excluding [Scheme::ReShard]s which are invalid, and shouldn't
123123
/// be considered as progress towards meeting the minimum number of shards.
124-
type CheckedShard;
124+
type CheckedShard: Clone + Send;
125125
type Error: std::fmt::Debug;
126126

127127
/// Encode a piece of data, returning a commitment, along with shards, and proofs.

consensus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ commonware-runtime = { workspace = true }
3030
commonware-storage = { workspace = true, features = ["std"] }
3131
prometheus-client = { workspace = true }
3232
governor = { workspace = true }
33+
rayon = { workspace = true }
3334
rand = { workspace = true }
3435
rand_distr = { workspace = true }
3536
tracing = { workspace = true }

consensus/src/marshal/actor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,17 @@ where
461461
}
462462
}
463463
}
464+
Message::Notarize { notarization } => {
465+
let commitment = notarization.proposal.payload;
466+
if !shards.has_block(&commitment) {
467+
let start = Instant::now();
468+
let _ = shards.try_reconstruct(commitment).await;
469+
let elapsed = start.elapsed();
470+
tracing::info!(?elapsed, "Attempted reconstruction");
471+
}
472+
}
464473
Message::Notarization { notarization } => {
474+
tracing::warn!(?notarization, "Received notarization");
465475
let round = notarization.round();
466476
let commitment = notarization.proposal.payload;
467477

consensus/src/marshal/ingress/coding/mailbox.rs

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,10 @@ where
6565
/// These blocks are evicted by marshal after they are delivered to the application.
6666
reconstructed_blocks: BTreeMap<CodingCommitment, CodedBlock<B, S>>,
6767

68-
block_broadcast_duration: Gauge,
69-
shard_broadcast_duration: Gauge,
68+
/// Transient caches for progressive block reconstruction.
69+
checking_data: BTreeMap<CodingCommitment, S::CheckingData>,
70+
checked_shards: BTreeMap<CodingCommitment, BTreeMap<usize, S::CheckedShard>>,
71+
7072
erasure_decode_duration: Gauge,
7173
}
7274

@@ -82,20 +84,6 @@ where
8284
mailbox: buffered::Mailbox<P, Shard<S, H>>,
8385
block_codec_cfg: B::Cfg,
8486
) -> Self {
85-
let block_broadcast_duration = Gauge::default();
86-
context.register(
87-
"block_broadcast_duration",
88-
"Duration of proposer broadcast in milliseconds",
89-
block_broadcast_duration.clone(),
90-
);
91-
92-
let shard_broadcast_duration = Gauge::default();
93-
context.register(
94-
"shard_broadcast_duration",
95-
"Duration of individual shard broadcast in milliseconds",
96-
shard_broadcast_duration.clone(),
97-
);
98-
9987
let erasure_decode_duration = Gauge::default();
10088
context.register(
10189
"erasure_decode_duration",
@@ -108,8 +96,8 @@ where
10896
block_codec_cfg,
10997
block_subscriptions: BTreeMap::new(),
11098
reconstructed_blocks: BTreeMap::new(),
111-
block_broadcast_duration,
112-
shard_broadcast_duration,
99+
checking_data: BTreeMap::new(),
100+
checked_shards: BTreeMap::new(),
113101
erasure_decode_duration,
114102
}
115103
}
@@ -126,15 +114,12 @@ where
126114
"number of participants must equal number of shards"
127115
);
128116

129-
let start = Instant::now();
130117
for (index, peer) in participants.into_iter().enumerate() {
131118
let message = block
132119
.shard(index)
133120
.expect("peer index impossibly out of bounds");
134121
let _peers = self.mailbox.broadcast(Recipients::One(peer), message).await;
135122
}
136-
self.block_broadcast_duration
137-
.set(start.elapsed().as_millis() as i64);
138123
}
139124

140125
/// Broadcasts a local [Shard] of a block to all peers, if the [Shard] is present
@@ -172,10 +157,7 @@ where
172157
// Broadcast the weak shard to all peers for reconstruction.
173158
let reshard = Shard::new(commitment, index, DistributionShard::Weak(reshard));
174159

175-
let start = Instant::now();
176160
let _peers = self.mailbox.broadcast(Recipients::All, reshard).await;
177-
self.shard_broadcast_duration
178-
.set(start.elapsed().as_millis() as i64);
179161

180162
debug!(%commitment, index, "broadcasted local shard to all peers");
181163
} else {
@@ -211,24 +193,32 @@ where
211193
// NOTE: Byzantine peers may send us strong shards as well, but we don't care about those;
212194
// `Scheme::reshard` verifies the shard against the commitment, and if it doesn't check out,
213195
// it will be ignored.
214-
let Some(checking_data) = shards.iter().find_map(|s| {
215-
if let DistributionShard::Strong(shard) = s.deref() {
216-
S::reshard(
217-
&config,
218-
&commitment.inner(),
219-
s.index() as u16,
220-
shard.clone(),
221-
)
222-
.map(|(checking_data, _, _)| checking_data)
223-
.ok()
224-
} else {
225-
None
196+
let checking_data = match self.checking_data.entry(commitment) {
197+
Entry::Vacant(entry) => {
198+
let Some(checking_data) = shards.iter().find_map(|s| {
199+
if let DistributionShard::Strong(shard) = s.deref() {
200+
S::reshard(
201+
&config,
202+
&commitment.inner(),
203+
s.index() as u16,
204+
shard.clone(),
205+
)
206+
.map(|(checking_data, _, _)| checking_data)
207+
.ok()
208+
} else {
209+
None
210+
}
211+
}) else {
212+
debug!(%commitment, "No strong shards present to form checking data");
213+
return Ok(None);
214+
};
215+
entry.insert(checking_data.clone());
216+
checking_data
226217
}
227-
}) else {
228-
debug!(%commitment, "No strong shards present to form checking data");
229-
return Ok(None);
218+
Entry::Occupied(entry) => entry.get().clone(),
230219
};
231220

221+
let cached_checked_shards = self.checked_shards.entry(commitment).or_default();
232222
let checked_shards = shards
233223
.into_iter()
234224
.filter_map(|s| {
@@ -238,18 +228,33 @@ where
238228
// Any strong shards, at this point, were sent from the proposer.
239229
// We use the reshard interface to produce our checked shard rather
240230
// than taking two hops.
241-
S::reshard(&config, &commitment.inner(), index, shard)
242-
.map(|(_, checked, _)| checked)
243-
.ok()
231+
match cached_checked_shards.entry(index as usize) {
232+
Entry::Vacant(entry) => {
233+
let (_, checked, _) =
234+
S::reshard(&config, &commitment.inner(), index, shard).ok()?;
235+
entry.insert(checked.clone());
236+
Some(checked)
237+
}
238+
Entry::Occupied(entry) => Some(entry.get().clone()),
239+
}
240+
}
241+
DistributionShard::Weak(re_shard) => {
242+
match cached_checked_shards.entry(index as usize) {
243+
Entry::Vacant(entry) => {
244+
let checked = S::check(
245+
&config,
246+
&commitment.inner(),
247+
&checking_data,
248+
index,
249+
re_shard,
250+
)
251+
.ok()?;
252+
entry.insert(checked.clone());
253+
Some(checked)
254+
}
255+
Entry::Occupied(entry) => Some(entry.get().clone()),
256+
}
244257
}
245-
DistributionShard::Weak(re_shard) => S::check(
246-
&config,
247-
&commitment.inner(),
248-
&checking_data,
249-
index,
250-
re_shard,
251-
)
252-
.ok(),
253258
}
254259
})
255260
.collect::<Vec<_>>();
@@ -282,6 +287,8 @@ where
282287
);
283288

284289
self.reconstructed_blocks.insert(commitment, block.clone());
290+
self.checking_data.remove(&commitment);
291+
self.checked_shards.remove(&commitment);
285292

286293
// Notify any subscribers that have been waiting for this block to be reconstructed
287294
if let Some(mut sub) = self.block_subscriptions.remove(&commitment) {
@@ -363,6 +370,11 @@ where
363370
pub fn evict_block(&mut self, commitment: &CodingCommitment) {
364371
self.reconstructed_blocks.remove(commitment);
365372
}
373+
374+
/// Checks if a reconstructed block is present in the local cache.
375+
pub fn has_block(&self, commitment: &CodingCommitment) -> bool {
376+
self.reconstructed_blocks.contains_key(commitment)
377+
}
366378
}
367379

368380
#[cfg(test)]

consensus/src/marshal/ingress/mailbox.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
marshal::ingress::coding::types::CodedBlock,
3-
threshold_simplex::types::{Activity, Finalization, Notarization},
3+
threshold_simplex::types::{Activity, Finalization, Notarization, Notarize},
44
types::Round,
55
Block, Reporter,
66
};
@@ -100,6 +100,10 @@ pub(crate) enum Message<V: Variant, B: Block, S: Scheme, P: PublicKey> {
100100
},
101101

102102
// -------------------- Consensus Engine Messages --------------------
103+
/// A notarize vote from the consensus engine.
104+
Notarize {
105+
notarization: Notarize<V, B::Commitment>,
106+
},
103107
/// A notarization from the consensus engine.
104108
Notarization {
105109
/// The notarization.
@@ -254,6 +258,7 @@ impl<V: Variant, B: Block, S: Scheme, P: PublicKey> Reporter for Mailbox<V, B, S
254258

255259
async fn report(&mut self, activity: Self::Activity) {
256260
let message = match activity {
261+
Activity::Notarize(notarization) => Message::Notarize { notarization },
257262
Activity::Notarization(notarization) => Message::Notarization { notarization },
258263
Activity::Finalization(finalization) => Message::Finalization { finalization },
259264
_ => {

0 commit comments

Comments
 (0)