Skip to content

Commit 4637881

Browse files
committed
Hide CodedBlock
1 parent f41e2cd commit 4637881

File tree

3 files changed

+370
-356
lines changed

3 files changed

+370
-356
lines changed

consensus/src/marshal/actor.rs

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@ use super::{
99
},
1010
};
1111
use crate::{
12-
marshal::ingress::coding::ShardLayer,
12+
marshal::ingress::coding::{CodedBlock, ShardLayer},
1313
threshold_simplex::types::{Finalization, Notarization},
1414
types::Round,
1515
Block, Reporter,
1616
};
1717
use commonware_codec::{Decode, Encode};
1818
use commonware_coding::reed_solomon::Chunk;
19-
use commonware_cryptography::{bls12381::primitives::variant::Variant, Hasher, PublicKey};
19+
use commonware_cryptography::{
20+
bls12381::primitives::variant::Variant, Committable, Hasher, PublicKey,
21+
};
2022
use commonware_macros::select;
2123
use commonware_resolver::Resolver;
2224
use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
@@ -66,7 +68,7 @@ struct ChunkSubscription<H: Hasher> {
6668
/// behind.
6769
pub struct Actor<B, E, V, P, H>
6870
where
69-
B: Block,
71+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
7072
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
7173
V: Variant,
7274
P: PublicKey,
@@ -106,11 +108,11 @@ where
106108

107109
// ---------- Storage ----------
108110
// Prunable cache
109-
cache: cache::Manager<E, B, V>,
111+
cache: cache::Manager<E, CodedBlock<B, H>, V>,
110112
// Finalizations stored by height
111113
finalizations_by_height: immutable::Archive<E, B::Commitment, Finalization<V, B::Commitment>>,
112114
// Finalized blocks stored by height
113-
finalized_blocks: immutable::Archive<E, B::Commitment, B>,
115+
finalized_blocks: immutable::Archive<E, B::Commitment, CodedBlock<B, H>>,
114116

115117
// ---------- Metrics ----------
116118
// Latest height metric
@@ -259,11 +261,11 @@ where
259261
pub fn start<R>(
260262
mut self,
261263
application: impl Reporter<Activity = B>,
262-
shards: ShardLayer<P, B, H>,
263-
resolver: (mpsc::Receiver<handler::Message<B>>, R),
264+
shards: ShardLayer<P, CodedBlock<B, H>, H>,
265+
resolver: (mpsc::Receiver<handler::Message<CodedBlock<B, H>>>, R),
264266
) -> Handle<()>
265267
where
266-
R: Resolver<Key = handler::Request<B>>,
268+
R: Resolver<Key = handler::Request<CodedBlock<B, H>>>,
267269
{
268270
self.context.spawn_ref()(self.run(application, shards, resolver))
269271
}
@@ -272,10 +274,10 @@ where
272274
async fn run<R>(
273275
mut self,
274276
application: impl Reporter<Activity = B>,
275-
mut shard_layer: ShardLayer<P, B, H>,
276-
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
277+
mut shard_layer: ShardLayer<P, CodedBlock<B, H>, H>,
278+
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<CodedBlock<B, H>>>, R),
277279
) where
278-
R: Resolver<Key = handler::Request<B>>,
280+
R: Resolver<Key = handler::Request<CodedBlock<B, H>>>,
279281
{
280282
// Process all finalized blocks in order (fetching any that are missing)
281283
let (mut notifier_tx, notifier_rx) = mpsc::channel::<()>(1);
@@ -294,7 +296,7 @@ where
294296
.spawn(|_| finalizer.run());
295297

296298
// Create a local pool for waiter futures
297-
let mut block_waiters = AbortablePool::<(B::Commitment, B)>::default();
299+
let mut block_waiters = AbortablePool::<(B::Commitment, CodedBlock<B, H>)>::default();
298300
let mut chunk_waiters = AbortablePool::<((B::Commitment, u16), Chunk<H>)>::default();
299301

300302
// Handle messages
@@ -346,7 +348,7 @@ where
346348
self.cache_block(round, commitment, block).await;
347349
} else {
348350
debug!(?round, "notarized block missing");
349-
resolver.fetch(Request::<B>::Notarized { round }).await;
351+
resolver.fetch(Request::<CodedBlock<B, H>>::Notarized { round }).await;
350352
}
351353
}
352354
Message::Finalize { finalization } => {
@@ -370,13 +372,13 @@ where
370372
} else {
371373
// Otherwise, fetch the block from the network.
372374
debug!(?round, ?commitment, "finalized block missing");
373-
resolver.fetch(Request::<B>::Block(commitment)).await;
375+
resolver.fetch(Request::<CodedBlock<B, H>>::Block(commitment)).await;
374376
}
375377
}
376378
Message::Get { commitment, response } => {
377379
// Check for block locally
378380
let result = self.find_block(&mut shard_layer, commitment).await;
379-
let _ = response.send(result);
381+
let _ = response.send(result.map(CodedBlock::take_inner));
380382
}
381383
Message::SubscribeChunk { commitment, index, response } => {
382384
match self.chunk_subscriptions.entry(commitment) {
@@ -399,7 +401,7 @@ where
399401
Message::Subscribe { round, commitment, response } => {
400402
// Check for block locally
401403
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
402-
let _ = response.send(block);
404+
let _ = response.send(block.take_inner());
403405
continue;
404406
}
405407

@@ -420,7 +422,7 @@ where
420422
// If this is a valid view, this request should be fine to keep open
421423
// until resolution or pruning (even if the oneshot is canceled).
422424
debug!(?round, ?commitment, "requested block missing");
423-
resolver.fetch(Request::<B>::Notarized { round }).await;
425+
resolver.fetch(Request::<CodedBlock<B, H>>::Notarized { round }).await;
424426
}
425427

426428
// Register subscriber
@@ -454,15 +456,15 @@ where
454456
Orchestration::Get { height, result } => {
455457
// Check if in blocks
456458
let block = self.get_finalized_block(height).await;
457-
result.send(block).unwrap_or_else(|_| warn!(?height, "Failed to send block to orchestrator"));
459+
result.send(block.map(CodedBlock::take_inner)).unwrap_or_else(|_| warn!(?height, "Failed to send block to orchestrator"));
458460
}
459461
Orchestration::Processed { height, commitment } => {
460462
// Update metrics
461463
self.processed_height.set(height as i64);
462464

463465
// Cancel any outstanding requests (by height and by commitment)
464-
resolver.cancel(Request::<B>::Block(commitment)).await;
465-
resolver.retain(Request::<B>::Finalized { height }.predicate()).await;
466+
resolver.cancel(Request::<CodedBlock<B, H>>::Block(commitment)).await;
467+
resolver.retain(Request::<CodedBlock<B, H>>::Finalized { height }.predicate()).await;
466468

467469
// If finalization exists, prune the archives
468470
if let Some(finalization) = self.get_finalization_by_height(height).await {
@@ -478,7 +480,7 @@ where
478480
self.last_processed_round = round;
479481

480482
// Cancel useless requests
481-
resolver.retain(Request::<B>::Notarized { round }.predicate()).await;
483+
resolver.retain(Request::<CodedBlock<B, H>>::Notarized { round }.predicate()).await;
482484
}
483485
}
484486
Orchestration::Repair { height } => {
@@ -505,7 +507,7 @@ where
505507
cursor = block;
506508
} else {
507509
// Request the next missing block commitment
508-
resolver.fetch(Request::<B>::Block(commitment)).await;
510+
resolver.fetch(Request::<CodedBlock<B, H>>::Block(commitment)).await;
509511
break;
510512
}
511513
}
@@ -518,7 +520,7 @@ where
518520
let gap_end = std::cmp::min(cursor.height(), gap_start.saturating_add(self.max_repair));
519521
debug!(gap_start, gap_end, "requesting any finalized blocks");
520522
for height in gap_start..gap_end {
521-
resolver.fetch(Request::<B>::Finalized { height }).await;
523+
resolver.fetch(Request::<CodedBlock<B, H>>::Finalized { height }).await;
522524
}
523525
}
524526
}
@@ -577,7 +579,7 @@ where
577579
match key {
578580
Request::Block(commitment) => {
579581
// Parse block
580-
let Ok(block) = B::decode_cfg(value.as_ref(), &self.codec_config) else {
582+
let Ok(block) = CodedBlock::<B, H>::decode_cfg(value.as_ref(), &self.codec_config) else {
581583
let _ = response.send(false);
582584
continue;
583585
};
@@ -597,7 +599,7 @@ where
597599
},
598600
Request::Finalized { height } => {
599601
// Parse finalization
600-
let Ok((finalization, block)) = <(Finalization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {
602+
let Ok((finalization, block)) = <(Finalization<V, B::Commitment>, CodedBlock<B, H>)>::decode_cfg(value, &((), self.codec_config.clone())) else {
601603
let _ = response.send(false);
602604
continue;
603605
};
@@ -618,7 +620,7 @@ where
618620
},
619621
Request::Notarized { round } => {
620622
// Parse notarization
621-
let Ok((notarization, block)) = <(Notarization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {
623+
let Ok((notarization, block)) = <(Notarization<V, B::Commitment>, CodedBlock<B, H>)>::decode_cfg(value, &((), self.codec_config.clone())) else {
622624
let _ = response.send(false);
623625
continue;
624626
};
@@ -663,26 +665,31 @@ where
663665
// -------------------- Waiters --------------------
664666

665667
/// Notify any subscribers for the given commitment with the provided block.
666-
async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
668+
async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &CodedBlock<B, H>) {
667669
if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
668670
for subscriber in bs.subscribers.drain(..) {
669-
let _ = subscriber.send(block.clone());
671+
let _ = subscriber.send(block.clone().take_inner());
670672
}
671673
}
672674
}
673675

674676
// -------------------- Prunable Storage --------------------
675677

676678
/// Add a notarized block to the prunable archive.
677-
async fn cache_block(&mut self, round: Round, commitment: B::Commitment, block: B) {
679+
async fn cache_block(
680+
&mut self,
681+
round: Round,
682+
commitment: B::Commitment,
683+
block: CodedBlock<B, H>,
684+
) {
678685
self.notify_subscribers(commitment, &block).await;
679686
self.cache.put_block(round, commitment, block).await;
680687
}
681688

682689
// -------------------- Immutable Storage --------------------
683690

684691
/// Get a finalized block from the immutable archive.
685-
async fn get_finalized_block(&self, height: u64) -> Option<B> {
692+
async fn get_finalized_block(&self, height: u64) -> Option<CodedBlock<B, H>> {
686693
match self.finalized_blocks.get(Identifier::Index(height)).await {
687694
Ok(block) => block,
688695
Err(e) => panic!("failed to get block: {e}"),
@@ -712,7 +719,7 @@ where
712719
&mut self,
713720
height: u64,
714721
commitment: B::Commitment,
715-
block: B,
722+
block: CodedBlock<B, H>,
716723
finalization: Option<Finalization<V, B::Commitment>>,
717724
notifier: &mut mpsc::Sender<()>,
718725
) {
@@ -749,9 +756,9 @@ where
749756
/// Looks for a block anywhere in local storage.
750757
async fn find_block(
751758
&mut self,
752-
shards: &mut ShardLayer<P, B, H>,
759+
shards: &mut ShardLayer<P, CodedBlock<B, H>, H>,
753760
commitment: B::Commitment,
754-
) -> Option<B> {
761+
) -> Option<CodedBlock<B, H>> {
755762
// Check shard layer.
756763
if let Some(block) = shards
757764
.try_reconstruct(commitment)

consensus/src/marshal/ingress/coding.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ where
6868
impl<P, B, H> ShardLayer<P, B, H>
6969
where
7070
P: PublicKey,
71-
B: Block<Digest = H::Digest, Commitment = H::Digest> + Debug,
71+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
7272
H: Hasher,
7373
{
7474
/// Create a new [ShardLayer] with the given buffered mailbox.
@@ -323,7 +323,7 @@ where
323323
B: Block<Digest = H::Digest, Commitment = H::Digest>,
324324
H: Hasher,
325325
{
326-
type Digest = B::Digest;
326+
type Digest = H::Digest;
327327

328328
fn digest(&self) -> Self::Digest {
329329
// NOTE: This is a lil weird; only doing this to namespace the shard within the buffered mailbox, such that
@@ -443,6 +443,11 @@ where
443443
pub fn inner(&self) -> &B {
444444
&self.inner
445445
}
446+
447+
/// Takes the inner [Block] out of the [CodedBlock].
448+
pub fn take_inner(self) -> B {
449+
self.inner
450+
}
446451
}
447452

448453
impl<B, H> Write for CodedBlock<B, H>
@@ -506,7 +511,7 @@ where
506511
B: Block<Digest = H::Digest, Commitment = H::Digest>,
507512
H: Hasher,
508513
{
509-
type Commitment = B::Commitment;
514+
type Commitment = H::Digest;
510515

511516
fn commitment(&self) -> Self::Commitment {
512517
self.commitment

0 commit comments

Comments
 (0)