Skip to content

Commit 3a1bf45

Browse files
committed
refactor: Make shard layer an actor
Reduces pressure on the marshal control loop
1 parent 4d5b7f4 commit 3a1bf45

File tree

13 files changed

+1945
-1810
lines changed

13 files changed

+1945
-1810
lines changed

coding/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,13 @@ pub trait Scheme: Debug + Clone + Send + Sync + 'static {
116116
/// the data.
117117
type ReShard: Clone + Eq + Codec + Send + Sync + 'static;
118118
/// Data which can assist in checking shards.
119-
type CheckingData: Clone + Send;
119+
type CheckingData: Clone + Send + Sync;
120120
/// A shard that has been checked for inclusion in the commitment.
121121
///
122122
/// This allows excluding [Scheme::ReShard]s which are invalid, and shouldn't
123123
/// be considered as progress towards meeting the minimum number of shards.
124124
type CheckedShard: Clone + Send;
125-
type Error: std::fmt::Debug;
125+
type Error: std::fmt::Debug + Send;
126126

127127
/// Encode a piece of data, returning a commitment, along with shards, and proofs.
128128
///

consensus/src/marshal/actor.rs

Lines changed: 20 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,17 @@ use super::{
99
},
1010
};
1111
use crate::{
12-
marshal::ingress::{
13-
coding::{mailbox::ShardMailbox, types::CodedBlock},
14-
mailbox::Identifier as BlockID,
12+
marshal::{
13+
coding::{self, CodedBlock},
14+
ingress::mailbox::Identifier as BlockID,
1515
},
1616
threshold_simplex::types::{Finalization, Notarization},
1717
types::{CodingCommitment, Round},
1818
Block, Reporter,
1919
};
2020
use commonware_codec::{Decode, Encode};
2121
use commonware_coding::Scheme;
22-
use commonware_cryptography::{
23-
bls12381::primitives::variant::Variant, Committable, Hasher, PublicKey,
24-
};
22+
use commonware_cryptography::{bls12381::primitives::variant::Variant, Committable, PublicKey};
2523
use commonware_macros::select;
2624
use commonware_resolver::Resolver;
2725
use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
@@ -49,14 +47,6 @@ struct BlockSubscription<B: Block> {
4947
_aborter: Aborter,
5048
}
5149

52-
/// A struct that holds multiple subscriptions for a shard's validity check.
53-
struct ShardValiditySubscription {
54-
/// The subscribers that are waiting for the chunk
55-
subscribers: Vec<oneshot::Sender<bool>>,
56-
/// Aborter that aborts the waiter future when dropped
57-
_aborter: Aborter,
58-
}
59-
6050
/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
6151
/// receiving notarizations and finalizations from consensus, and reconstructing a total order
6252
/// of blocks.
@@ -69,20 +59,19 @@ struct ShardValiditySubscription {
6959
/// finalization for a block that is ahead of its current view, it will request the missing blocks
7060
/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
7161
/// behind.
72-
pub struct Actor<B, E, V, S, P>
62+
pub struct Actor<B, E, V, S>
7363
where
7464
B: Block<Commitment = CodingCommitment>,
7565
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
7666
V: Variant,
7767
S: Scheme,
78-
P: PublicKey,
7968
{
8069
// ---------- Context ----------
8170
context: E,
8271

8372
// ---------- Message Passing ----------
8473
// Mailbox
85-
mailbox: mpsc::Receiver<Message<V, B, S, P>>,
74+
mailbox: mpsc::Receiver<Message<V, B>>,
8675

8776
// ---------- Configuration ----------
8877
// Identity
@@ -105,8 +94,6 @@ where
10594
last_processed_round: Round,
10695
// Outstanding subscriptions for blocks
10796
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
108-
// Outstanding subscriptions for shard validity checks
109-
shard_validity_subscriptions: BTreeMap<(B::Commitment, usize), ShardValiditySubscription>,
11097

11198
// ---------- Storage ----------
11299
// Prunable cache
@@ -123,16 +110,15 @@ where
123110
processed_height: Gauge,
124111
}
125112

126-
impl<B, E, V, S, P> Actor<B, E, V, S, P>
113+
impl<B, E, V, S> Actor<B, E, V, S>
127114
where
128115
B: Block<Commitment = CodingCommitment>,
129116
E: Rng + Spawner + Metrics + Clock + GClock + Storage,
130117
V: Variant,
131118
S: Scheme,
132-
P: PublicKey,
133119
{
134120
/// Create a new application actor.
135-
pub async fn init(context: E, config: Config<V, B>) -> (Self, Mailbox<V, B, S, P>) {
121+
pub async fn init(context: E, config: Config<V, B>) -> (Self, Mailbox<V, B>) {
136122
// Initialize cache
137123
let prunable_config = cache::Config {
138124
partition_prefix: format!("{}-cache", config.partition_prefix.clone()),
@@ -248,7 +234,6 @@ where
248234
partition_prefix: config.partition_prefix,
249235
last_processed_round: Round::new(0, 0),
250236
block_subscriptions: BTreeMap::new(),
251-
shard_validity_subscriptions: BTreeMap::new(),
252237
cache,
253238
finalizations_by_height,
254239
finalized_blocks,
@@ -260,28 +245,28 @@ where
260245
}
261246

262247
/// Start the actor.
263-
pub fn start<R, H>(
248+
pub fn start<R, P>(
264249
mut self,
265250
application: impl Reporter<Activity = B>,
266-
buffer: ShardMailbox<S, H, B, P>,
251+
buffer: coding::Mailbox<V, B, S, P>,
267252
resolver: (mpsc::Receiver<handler::Message<CodedBlock<B, S>>>, R),
268253
) -> Handle<()>
269254
where
270255
R: Resolver<Key = handler::Request<CodedBlock<B, S>>>,
271-
H: Hasher,
256+
P: PublicKey,
272257
{
273258
self.context.spawn_ref()(self.run(application, buffer, resolver))
274259
}
275260

276261
/// Run the application actor.
277-
async fn run<R, H>(
262+
async fn run<R, P>(
278263
mut self,
279264
application: impl Reporter<Activity = B>,
280-
mut shards: ShardMailbox<S, H, B, P>,
265+
mut shards: coding::Mailbox<V, B, S, P>,
281266
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<CodedBlock<B, S>>>, R),
282267
) where
283268
R: Resolver<Key = handler::Request<CodedBlock<B, S>>>,
284-
H: Hasher,
269+
P: PublicKey,
285270
{
286271
// Process all finalized blocks in order (fetching any that are missing)
287272
let (mut notifier_tx, notifier_rx) = mpsc::channel::<()>(1);
@@ -300,7 +285,6 @@ where
300285

301286
// Create a local pool for waiter futures
302287
let mut block_waiters = AbortablePool::<(B::Commitment, B)>::default();
303-
let mut shard_validity_waiters = AbortablePool::<((B::Commitment, usize), bool)>::default();
304288

305289
// Handle messages
306290
loop {
@@ -309,10 +293,6 @@ where
309293
bs.subscribers.retain(|tx| !tx.is_canceled());
310294
!bs.subscribers.is_empty()
311295
});
312-
self.shard_validity_subscriptions.retain(|_, cs| {
313-
cs.subscribers.retain(|tx| !tx.is_canceled());
314-
!cs.subscribers.is_empty()
315-
});
316296

317297
// Select messages
318298
select! {
@@ -323,16 +303,6 @@ where
323303
};
324304
self.notify_block_subscribers(commitment, &block).await;
325305
},
326-
result = shard_validity_waiters.next_completed() => {
327-
let Ok(((commitment, index), valid)) = result else {
328-
continue; // Aborted future
329-
};
330-
331-
self.notify_shard_validity_subscribers(commitment, index, valid).await;
332-
if valid {
333-
shards.try_broadcast_shard(commitment, index).await;
334-
}
335-
},
336306
// Handle consensus before finalizer or backfiller
337307
mailbox_message = self.mailbox.next() => {
338308
let Some(message) = mailbox_message else {
@@ -382,9 +352,6 @@ where
382352
}
383353
}
384354
}
385-
Message::Broadcast { block, peers } => {
386-
shards.broadcast_shards(block, peers).await;
387-
}
388355
Message::Subscribe { round, commitment, response } => {
389356
// Check for block locally
390357
if let Some(block) = self.find_block(&mut shards, commitment).await {
@@ -419,8 +386,7 @@ where
419386
entry.get_mut().subscribers.push(response);
420387
}
421388
Entry::Vacant(entry) => {
422-
let (tx, rx) = oneshot::channel();
423-
shards.subscribe_block(commitment, tx).await.expect("Reconstruction error not yet handled");
389+
let rx = shards.subscribe_block(commitment).await;
424390
let aborter = block_waiters.push(async move {
425391
(commitment, rx.await.expect("buffer subscriber closed").into_inner())
426392
});
@@ -431,45 +397,6 @@ where
431397
}
432398
}
433399
}
434-
Message::VerifyShard { commitment, index, response } => {
435-
// Check for shard locally
436-
if let Some(shard) = shards.get_shard(commitment, index).await {
437-
let valid = shard.verify();
438-
let _ = response.send(valid);
439-
if valid {
440-
shards.try_broadcast_shard(commitment, index).await;
441-
}
442-
continue;
443-
}
444-
445-
match self.shard_validity_subscriptions.entry((commitment, index)) {
446-
Entry::Occupied(mut entry) => {
447-
entry.get_mut().subscribers.push(response);
448-
}
449-
Entry::Vacant(entry) => {
450-
let (tx, rx) = oneshot::channel();
451-
shards.subscribe_shard(commitment, index, tx).await;
452-
let aborter = shard_validity_waiters.push(async move {
453-
let shard = rx.await.expect("shard subscriber closed");
454-
let valid = shard.verify();
455-
((commitment, index), valid)
456-
});
457-
entry.insert(ShardValiditySubscription {
458-
subscribers: vec![response],
459-
_aborter: aborter,
460-
});
461-
}
462-
}
463-
}
464-
Message::Notarize { notarization } => {
465-
let commitment = notarization.proposal.payload;
466-
if !shards.has_block(&commitment) {
467-
let start = Instant::now();
468-
let _ = shards.try_reconstruct(commitment).await;
469-
let elapsed = start.elapsed();
470-
tracing::info!(?elapsed, "Attempted reconstruction");
471-
}
472-
}
473400
Message::Notarization { notarization } => {
474401
tracing::warn!(?notarization, "Received notarization");
475402
let round = notarization.round();
@@ -535,7 +462,7 @@ where
535462

536463
// Prune archives
537464
self.cache.prune(prune_round).await;
538-
shards.evict_block(&commitment);
465+
shards.finalized(commitment).await;
539466

540467
// Update the last processed round
541468
let round = finalization.round();
@@ -735,23 +662,6 @@ where
735662
}
736663
}
737664

738-
// Notify any subscribers waiting for shard validity.
739-
async fn notify_shard_validity_subscribers(
740-
&mut self,
741-
commitment: B::Commitment,
742-
index: usize,
743-
valid: bool,
744-
) {
745-
if let Some(mut cs) = self
746-
.shard_validity_subscriptions
747-
.remove(&(commitment, index))
748-
{
749-
for subscriber in cs.subscribers.drain(..) {
750-
let _ = subscriber.send(valid);
751-
}
752-
}
753-
}
754-
755665
// -------------------- Prunable Storage --------------------
756666

757667
/// Add a notarized block to the prunable archive.
@@ -856,15 +766,17 @@ where
856766
// -------------------- Mixed Storage --------------------
857767

858768
/// Looks for a block anywhere in local storage.
859-
async fn find_block<H: Hasher>(
769+
async fn find_block<P: PublicKey>(
860770
&mut self,
861-
buffer: &mut ShardMailbox<S, H, B, P>,
771+
buffer: &mut coding::Mailbox<V, B, S, P>,
862772
commitment: B::Commitment,
863773
) -> Option<CodedBlock<B, S>> {
864774
// Check shard mailbox.
865775
if let Some(block) = buffer
866776
.try_reconstruct(commitment)
867777
.await
778+
.await
779+
.expect("mailbox closed")
868780
.expect("reconstruction error not yet handled")
869781
{
870782
return Some(block);

0 commit comments

Comments
 (0)