Skip to content

Commit dbaa491

Browse files
committed
use prunable::Archive for mappings
1 parent faf9db6 commit dbaa491

File tree

3 files changed

+143
-70
lines changed

3 files changed

+143
-70
lines changed

consensus/src/marshal/actor.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ impl<
246246
pub fn start<R>(
247247
mut self,
248248
application: impl Reporter<Activity = B>,
249-
shards: ShardLayer<P, B, H>,
249+
shards: ShardLayer<E, P, B, H>,
250250
resolver: (mpsc::Receiver<handler::Message<B>>, R),
251251
) -> Handle<()>
252252
where
@@ -259,7 +259,7 @@ impl<
259259
async fn run<R>(
260260
mut self,
261261
application: impl Reporter<Activity = B>,
262-
mut shard_layer: ShardLayer<P, B, H>,
262+
mut shard_layer: ShardLayer<E, P, B, H>,
263263
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
264264
) where
265265
R: Resolver<Key = handler::Request<B>>,
@@ -327,13 +327,13 @@ impl<
327327

328328
// Block on waiting for the block to be reconstructed; We cannot move forward without
329329
// having the block digest.
330-
while !shard_layer.has_digest(&commitment) {
330+
while !shard_layer.has_digest(&commitment).await {
331331
dbg!("spinning");
332332
shard_layer.try_reconstruct(commitment).await.expect("Reconstruction error not yet handled");
333333
}
334334

335335
// Request the block digest for the block corresponding to the coding commitment.
336-
let digest = shard_layer.get_digest(&commitment).unwrap();
336+
let digest = shard_layer.get_digest(&commitment).await.unwrap();
337337

338338
// Store notarization by view
339339
self.cache.put_notarization(round, digest, notarization.clone()).await;
@@ -362,13 +362,13 @@ impl<
362362

363363
// Block on waiting for the block to be reconstructed; We cannot move forward without
364364
// having the block digest.
365-
while !shard_layer.has_digest(&commitment) {
365+
while !shard_layer.has_digest(&commitment).await {
366366
dbg!("spinning");
367367
shard_layer.try_reconstruct(commitment).await.expect("Reconstruction error not yet handled");
368368
}
369369

370370
// Request the block digest for the block corresponding to the coding commitment.
371-
let digest = shard_layer.get_digest(&commitment).unwrap();
371+
let digest = shard_layer.get_digest(&commitment).await.unwrap();
372372

373373
self.cache.put_finalization(round, digest, finalization.clone()).await;
374374

@@ -465,6 +465,7 @@ impl<
465465

466466
// Prune archives
467467
self.cache.prune(prune_round).await;
468+
shard_layer.prune(height).await;
468469

469470
// Update the last processed round
470471
let round = finalization.round();
@@ -558,7 +559,7 @@ impl<
558559

559560
// Get block
560561
let commitment = notarization.proposal.payload;
561-
let digest = match shard_layer.get_digest(&commitment) {
562+
let digest = match shard_layer.get_digest(&commitment).await {
562563
Some(digest) => digest,
563564
None => {
564565
debug!(?commitment, "notarized block missing commitment mapping on request");
@@ -754,11 +755,15 @@ impl<
754755
/// Looks for a block anywhere in local storage.
755756
async fn find_block(
756757
&mut self,
757-
shards: &mut ShardLayer<P, B, H>,
758+
shards: &mut ShardLayer<E, P, B, H>,
758759
commitment: B::Commitment,
759760
) -> Option<B> {
760761
// Check shard layer.
761-
if let Some(block) = shards.get(commitment) {
762+
if let Some(block) = shards
763+
.block_by_digest(commitment)
764+
.await
765+
.expect("reconstruction error not yet handled")
766+
{
762767
return Some(block);
763768
}
764769
// Check verified / notarized blocks via cache manager.

consensus/src/marshal/ingress/coding.rs

Lines changed: 118 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,19 @@ use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, W
1313
use commonware_coding::reed_solomon::{decode, Chunk, Error as ReedSolomonError};
1414
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
1515
use commonware_p2p::Recipients;
16+
use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Spawner, Storage};
17+
use commonware_storage::{
18+
archive::{prunable, Archive, Identifier},
19+
translator::TwoCap,
20+
};
1621
use futures::channel::oneshot;
17-
use std::{collections::HashMap, fmt::Debug, ops::Deref};
22+
use governor::clock::Clock as GClock;
23+
use rand::Rng;
24+
use std::{
25+
fmt::Debug,
26+
num::{NonZero, NonZeroUsize},
27+
ops::Deref,
28+
};
1829
use thiserror::Error;
1930
use tracing::{debug, info};
2031

@@ -30,11 +41,29 @@ pub enum ReconstructionError {
3041
Codec(#[from] CodecError),
3142
}
3243

44+
/// Storage configuration for the [ShardLayer].
45+
pub struct Config {
46+
/// Namespace prefix for the underlying storage partitions.
47+
pub partition_prefix: String,
48+
49+
/// Number of items per section in the [prunable::Archive].
50+
pub items_per_section: NonZero<u64>,
51+
52+
/// Rpelay buffer size for the [prunable::Archive].
53+
pub replay_buffer: NonZeroUsize,
54+
55+
/// Write buffer size for the [prunable::Archive].
56+
pub write_buffer: NonZeroUsize,
57+
58+
/// Buffer pool for the [prunable::Archive].
59+
pub buffer_pool: PoolRef,
60+
}
61+
3362
/// A layer that handles receiving erasure coded [Block]s from the [Actor](super::super::actor::Actor),
3463
/// broadcasting them to peers, and reassembling them from received [Shard]s.
35-
#[derive(Clone)]
36-
pub struct ShardLayer<P, B, H>
64+
pub struct ShardLayer<E, P, B, H>
3765
where
66+
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
3867
P: PublicKey,
3968
B: Block<Digest = H::Digest, Commitment = H::Digest>,
4069
H: Hasher,
@@ -45,30 +74,50 @@ where
4574
/// [`Read`] configuration for the block type.
4675
block_codec_cfg: B::Cfg,
4776

48-
/// A map of coding commitments to block digests.
49-
///
50-
/// TODO: This map has no durability nor pruning; just for testing / getting things working...
51-
commitment_map: HashMap<B::Commitment, B::Digest>,
77+
/// Map of block digests -> coding commitments.
78+
commitment_map: prunable::Archive<TwoCap, E, H::Digest, H::Digest>,
5279

53-
/// A map of block digest to reconstructed blocks.
54-
///
55-
/// TODO: This map has no durability nor pruning; just for testing / getting things working...
56-
reconstruction_cache: HashMap<B::Digest, B>,
80+
/// Map of coding commitments -> block digests.
81+
digest_map: prunable::Archive<TwoCap, E, H::Digest, H::Digest>,
5782
}
5883

59-
impl<P, B, H> ShardLayer<P, B, H>
84+
impl<E, P, B, H> ShardLayer<E, P, B, H>
6085
where
86+
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
6187
P: PublicKey,
6288
B: Block<Digest = H::Digest, Commitment = H::Digest> + Debug,
6389
H: Hasher,
6490
{
6591
/// Create a new [ShardLayer] with the given buffered mailbox.
66-
pub fn new(mailbox: buffered::Mailbox<P, Shard<B, H>>, cfg: B::Cfg) -> Self {
92+
pub async fn init(
93+
context: E,
94+
cfg: Config,
95+
mailbox: buffered::Mailbox<P, Shard<B, H>>,
96+
block_codec_cfg: B::Cfg,
97+
) -> Self {
98+
let cfg = |name: &str| prunable::Config {
99+
partition: format!("shard-layer-{}-{name}-map", cfg.partition_prefix),
100+
translator: TwoCap,
101+
items_per_section: cfg.items_per_section,
102+
compression: None,
103+
codec_config: (),
104+
buffer_pool: cfg.buffer_pool.clone(),
105+
replay_buffer: cfg.replay_buffer,
106+
write_buffer: cfg.write_buffer,
107+
};
108+
let commitment_map =
109+
prunable::Archive::init(context.with_label("commitment-map"), cfg("commitment"))
110+
.await
111+
.unwrap_or_else(|_| panic!("Failed to initialize commitment archive"));
112+
let digest_map = prunable::Archive::init(context.with_label("digest-map"), cfg("digest"))
113+
.await
114+
.unwrap_or_else(|_| panic!("Failed to initialize digest archive"));
115+
67116
Self {
68117
mailbox,
69-
block_codec_cfg: cfg,
70-
commitment_map: HashMap::new(),
71-
reconstruction_cache: HashMap::new(),
118+
block_codec_cfg,
119+
commitment_map,
120+
digest_map,
72121
}
73122
}
74123

@@ -81,7 +130,7 @@ where
81130
) {
82131
for (peer, chunk) in chunks {
83132
let message = Shard::new(coding_commitment, config, chunk);
84-
let _peers = self.broadcast(Recipients::One(peer), message).await;
133+
let _peers = self.mailbox.broadcast(Recipients::One(peer), message).await;
85134
}
86135
}
87136

@@ -92,25 +141,10 @@ where
92141
let available_shards = self.mailbox.get(None, commitment, None).await;
93142

94143
for shard in available_shards {
95-
let _peers = self.broadcast(Recipients::All, shard).await;
144+
let _peers = self.mailbox.broadcast(Recipients::All, shard).await;
96145
}
97146
}
98147

99-
/// Attempts to fetch a cached reconstructed [Block] by its digest.
100-
pub fn get(&mut self, digest: B::Digest) -> Option<B> {
101-
self.reconstruction_cache.get(&digest).cloned()
102-
}
103-
104-
/// Checks if the shard layer has the digest corresponding to a given coding commitment.
105-
pub fn has_digest(&self, commitment: &B::Commitment) -> bool {
106-
self.commitment_map.contains_key(commitment)
107-
}
108-
109-
/// Gets the digest corresponding to a given coding commitment, if known.
110-
pub fn get_digest(&self, commitment: &B::Commitment) -> Option<B::Digest> {
111-
self.commitment_map.get(commitment).copied()
112-
}
113-
114148
/// Attempts to retrieve and reconstruct a [Block] by its coding commitment from a set of [Shard]s
115149
/// received from peers.
116150
///
@@ -121,12 +155,12 @@ where
121155
pub async fn try_reconstruct(
122156
&mut self,
123157
commitment: B::Commitment,
124-
) -> Result<(), ReconstructionError> {
158+
) -> Result<Option<B>, ReconstructionError> {
125159
let available_chunks = self.mailbox.get(None, commitment, None).await;
126160

127161
let Some((total, min)) = available_chunks.first().map(|c| c.config) else {
128162
// No chunks available.
129-
return Ok(());
163+
return Ok(None);
130164
};
131165
let coded_chunks = available_chunks
132166
.iter()
@@ -142,7 +176,7 @@ where
142176
need = min,
143177
"not enough chunks to reconstruct block",
144178
);
145-
return Ok(());
179+
return Ok(None);
146180
}
147181

148182
// Attempt to recover the block from the available chunks. This process will also
@@ -152,13 +186,18 @@ where
152186
// Attempt to decode the block from the recovered data.
153187
let block = B::decode_cfg(&mut recovered.as_slice(), &self.block_codec_cfg)?;
154188

155-
// ---- DEBUG ----
189+
self.commitment_map
190+
.put(block.height(), commitment, block.digest())
191+
.await
192+
.expect("failed to put commitment");
193+
self.digest_map
194+
.put(block.height(), block.digest(), commitment)
195+
.await
196+
.expect("failed to put digest");
197+
156198
info!(%commitment, ?block, "successfully reconstructed block");
157-
self.commitment_map.insert(commitment, block.digest());
158-
self.reconstruction_cache.insert(block.digest(), block);
159-
// ----
160199

161-
Ok(())
200+
Ok(Some(block))
162201
}
163202

164203
/// Subscribes to a block by commitment with an externally prepared responder.
@@ -173,25 +212,46 @@ where
173212
) -> Result<(), ReconstructionError> {
174213
todo!("Subscribe to all chunks, reconstruct block when enough are available.");
175214
}
176-
}
177-
178-
impl<P, B, H> Broadcaster for ShardLayer<P, B, H>
179-
where
180-
P: PublicKey,
181-
B: Block<Digest = H::Digest, Commitment = H::Digest>,
182-
H: Hasher,
183-
{
184-
type Recipients = Recipients<P>;
185-
type Message = Shard<B, H>;
186-
type Response = Vec<P>;
187215

188-
async fn broadcast(
216+
/// Attempts to fetch a reconstructed [Block] by its digest.
217+
pub async fn block_by_digest(
189218
&mut self,
190-
recipients: Self::Recipients,
191-
message: Self::Message,
192-
) -> oneshot::Receiver<Self::Response> {
193-
// Direct broadcasts of individual chunks to the underlying mailbox.
194-
self.mailbox.broadcast(recipients, message).await
219+
digest: B::Digest,
220+
) -> Result<Option<B>, ReconstructionError> {
221+
match self
222+
.digest_map
223+
.get(Identifier::Key(&digest))
224+
.await
225+
.expect("failed to get digest")
226+
{
227+
Some(commitment) => self.try_reconstruct(commitment).await,
228+
None => Ok(None),
229+
}
230+
}
231+
232+
/// Checks if the shard layer has the digest corresponding to a given coding commitment.
233+
pub async fn has_digest(&self, commitment: &B::Commitment) -> bool {
234+
self.commitment_map
235+
.has(Identifier::Key(commitment))
236+
.await
237+
.expect("failed to get commitment")
238+
}
239+
240+
/// Gets the digest corresponding to a given coding commitment, if known.
241+
pub async fn get_digest(&self, commitment: &B::Commitment) -> Option<B::Digest> {
242+
self.commitment_map
243+
.get(Identifier::Key(commitment))
244+
.await
245+
.expect("failed to get commitment")
246+
}
247+
248+
/// Prunes old entries from the internal maps.
249+
pub async fn prune(&mut self, up_to: u64) {
250+
futures::try_join!(
251+
self.commitment_map.prune(up_to),
252+
self.digest_map.prune(up_to)
253+
)
254+
.expect("failed to prune maps");
195255
}
196256
}
197257

consensus/src/marshal/mod.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ mod tests {
7676
resolver::p2p as resolver,
7777
};
7878
use crate::{
79-
marshal::ingress::coding::ShardLayer,
79+
marshal::ingress::coding::{self, ShardLayer},
8080
threshold_simplex::types::{
8181
finalize_namespace, notarize_namespace, seed_namespace, Activity, Finalization,
8282
Finalize, Notarization, Notarize, Proposal,
@@ -113,7 +113,7 @@ mod tests {
113113
use rand::{seq::SliceRandom, Rng};
114114
use std::{
115115
collections::BTreeMap,
116-
num::{NonZeroU32, NonZeroUsize},
116+
num::{NonZero, NonZeroU32, NonZeroUsize},
117117
time::Duration,
118118
};
119119

@@ -200,10 +200,18 @@ mod tests {
200200
codec_config: (),
201201
};
202202
let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
203-
let shards = ShardLayer::new(buffer, ());
204203
let network = oracle.register(secret.public_key(), 2).await.unwrap();
205204
broadcast_engine.start(network);
206205

206+
let shard_config = coding::Config {
207+
partition_prefix: "shards".to_string(),
208+
items_per_section: NonZero::new(100).unwrap(),
209+
replay_buffer: NonZero::new(100).unwrap(),
210+
write_buffer: NonZero::new(100).unwrap(),
211+
buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
212+
};
213+
let shards = ShardLayer::init(context.with_label("shard"), shard_config, buffer, ()).await;
214+
207215
let (actor, mailbox) = actor::Actor::init(context.clone(), config).await;
208216
let application = Application::<B>::default();
209217

0 commit comments

Comments
 (0)