Skip to content

Commit 6f04cbe

Browse files
committed
cleanup 🧹
1 parent 69094ff commit 6f04cbe

File tree

8 files changed

+293
-216
lines changed

8 files changed

+293
-216
lines changed

consensus/src/marshal/actor.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ struct BlockSubscription<B: Block> {
5555
/// finalization for a block that is ahead of its current view, it will request the missing blocks
5656
/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
5757
/// behind.
58-
pub struct Actor<
58+
pub struct Actor<B, E, V, P, H>
59+
where
5960
B: Block,
6061
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
6162
V: Variant,
6263
P: PublicKey,
6364
H: Hasher,
64-
> {
65+
{
6566
// ---------- Context ----------
6667
context: E,
6768

@@ -107,13 +108,13 @@ pub struct Actor<
107108
processed_height: Gauge,
108109
}
109110

110-
impl<
111-
B: Block<Digest = H::Digest, Commitment = H::Digest> + std::fmt::Debug,
112-
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
113-
V: Variant,
114-
P: PublicKey,
115-
H: Hasher,
116-
> Actor<B, E, V, P, H>
111+
impl<B, E, V, P, H> Actor<B, E, V, P, H>
112+
where
113+
B: Block<Digest = H::Digest, Commitment = H::Digest> + std::fmt::Debug,
114+
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
115+
V: Variant,
116+
P: PublicKey,
117+
H: Hasher,
117118
{
118119
/// Create a new application actor.
119120
pub async fn init(context: E, config: Config<V, B>) -> (Self, Mailbox<V, B, P, H>) {
@@ -429,12 +430,13 @@ impl<
429430
let block = self.get_finalized_block(height).await;
430431
result.send(block).unwrap_or_else(|_| warn!(?height, "Failed to send block to orchestrator"));
431432
}
432-
Orchestration::Processed { height, digest } => {
433+
Orchestration::Processed { height, digest, commitment } => {
433434
// Update metrics
434435
self.processed_height.set(height as i64);
435436

436437
// Cancel any outstanding requests (by height and by digest)
437-
resolver.cancel(Request::<B>::Block(digest)).await;
438+
resolver.cancel(Request::<B>::Block(commitment)).await;
439+
resolver.cancel(Request::<B>::CodingCommitment { height, digest }).await;
438440
resolver.retain(Request::<B>::Finalized { height }.predicate()).await;
439441

440442
// If finalization exists, prune the archives
@@ -471,9 +473,9 @@ impl<
471473

472474
// Iterate backwards, repairing blocks as we go.
473475
while cursor.height() > height {
474-
let commitment = cursor.parent();
475-
let Some(commitment) = shard_layer.get_digest(&commitment).await else {
476-
dbg!("Missing block digest");
476+
let digest = cursor.parent();
477+
let Some(commitment) = shard_layer.get_commitment(&digest).await else {
478+
resolver.fetch(Request::<B>::CodingCommitment { digest, height: cursor.height().saturating_sub(1) }).await;
477479
break;
478480
};
479481

@@ -519,6 +521,14 @@ impl<
519521
};
520522
let _ = response.send(block.encode().into());
521523
}
524+
Request::CodingCommitment { digest, .. } => {
525+
// Check for coding commitment locally
526+
let Some(commitment) = shard_layer.get_commitment(&digest).await else {
527+
debug!(?digest, "coding commitment missing on request");
528+
continue;
529+
};
530+
let _ = response.send(commitment.encode().into());
531+
}
522532
Request::Finalized { height } => {
523533
// Get finalization
524534
let Some(finalization) = self.get_finalization_by_height(height).await else {
@@ -574,6 +584,24 @@ impl<
574584
debug!(?commitment, height, "received block");
575585
let _ = response.send(true);
576586
},
587+
Request::CodingCommitment { digest, height } => {
588+
// Parse block digest and height
589+
let Ok(commitment) = B::Commitment::decode_cfg(value.as_ref(), &()) else {
590+
let _ = response.send(false);
591+
continue;
592+
};
593+
594+
// Persist the commitment.
595+
shard_layer.put_commitment(height, digest, commitment).await;
596+
597+
// If we have the block, persist it and its finalization.
598+
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
599+
let finalization = self.cache.get_finalization_for(commitment).await;
600+
self.finalize(block.height(), commitment, block.clone(), finalization, &mut notifier_tx).await;
601+
}
602+
603+
let _ = response.send(true);
604+
},
577605
Request::Finalized { height } => {
578606
// Parse finalization
579607
let Ok((finalization, block)) = <(Finalization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {

consensus/src/marshal/envelope.rs

Lines changed: 0 additions & 177 deletions
This file was deleted.

consensus/src/marshal/finalizer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>
8181
// height is processed by the application), it is possible that the application may
8282
// be asked to process a block it has already seen (which it can simply ignore).
8383
let commitment = block.commitment();
84+
let digest = block.digest();
8485
self.application.report(block).await;
8586

8687
// Record that we have processed up through this height.
@@ -91,7 +92,9 @@ impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>
9192
}
9293

9394
// Notify the orchestrator that the block has been processed.
94-
self.orchestrator.processed(height, commitment).await;
95+
self.orchestrator
96+
.processed(height, digest, commitment)
97+
.await;
9598

9699
// Loop again without waiting for a notification (there may be more to process).
97100
continue;

0 commit comments

Comments
 (0)