Skip to content

Commit 4e5b83d

Browse files
committed
[consensus/marshal] Add shard subscription support
1 parent 74223d9 commit 4e5b83d

File tree

3 files changed

+181
-59
lines changed

3 files changed

+181
-59
lines changed

consensus/src/marshal/actor.rs

Lines changed: 116 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ struct BlockSubscription<B: Block> {
4646
_aborter: Aborter,
4747
}
4848

49+
/// A struct that holds multiple subscriptions for a shard's validity check.
50+
struct ShardValiditySubscription {
51+
/// The subscribers that are waiting for the chunk
52+
subscribers: Vec<oneshot::Sender<bool>>,
53+
/// Aborter that aborts the waiter future when dropped
54+
_aborter: Aborter,
55+
}
56+
4957
/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
5058
/// receiving notarizations and finalizations from consensus, and reconstructing a total order
5159
/// of blocks.
@@ -95,6 +103,8 @@ where
95103

96104
// Outstanding subscriptions for blocks
97105
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
106+
// Outstanding subscriptions for shard validity checks
107+
shard_validity_subscriptions: BTreeMap<(B::Commitment, usize), ShardValiditySubscription>,
98108

99109
// ---------- Storage ----------
100110
// Prunable cache
@@ -235,6 +245,7 @@ where
235245
codec_config: config.codec_config,
236246
last_processed_round: Round::new(0, 0),
237247
block_subscriptions: BTreeMap::new(),
248+
shard_validity_subscriptions: BTreeMap::new(),
238249
cache,
239250
finalizations_by_height,
240251
finalized_blocks,
@@ -287,7 +298,8 @@ where
287298
.spawn(|_| finalizer.run());
288299

289300
// Create a local pool for waiter futures
290-
let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
301+
let mut block_waiters = AbortablePool::<(B::Commitment, B)>::default();
302+
let mut shard_validity_waiters = AbortablePool::<((B::Commitment, usize), bool)>::default();
291303

292304
// Handle messages
293305
loop {
@@ -296,15 +308,25 @@ where
296308
bs.subscribers.retain(|tx| !tx.is_canceled());
297309
!bs.subscribers.is_empty()
298310
});
311+
self.shard_validity_subscriptions.retain(|_, cs| {
312+
cs.subscribers.retain(|tx| !tx.is_canceled());
313+
!cs.subscribers.is_empty()
314+
});
299315

300316
// Select messages
301317
select! {
302318
// Handle waiter completions first
303-
result = waiters.next_completed() => {
319+
result = block_waiters.next_completed() => {
304320
let Ok((commitment, block)) = result else {
305321
continue; // Aborted future
306322
};
307-
self.notify_subscribers(commitment, &block).await;
323+
self.notify_block_subscribers(commitment, &block).await;
324+
},
325+
result = shard_validity_waiters.next_completed() => {
326+
let Ok(((commitment, index), valid)) = result else {
327+
continue; // Aborted future
328+
};
329+
self.notify_shard_validity_subscribers(commitment, index, valid).await;
308330
},
309331
// Handle consensus before finalizer or backfiller
310332
mailbox_message = self.mailbox.next() => {
@@ -313,53 +335,14 @@ where
313335
return;
314336
};
315337
match message {
316-
Message::Broadcast { block, peers } => {
317-
shards.broadcast_shards(block, peers).await;
318-
}
319-
Message::Notarize { notarization_vote } => {
320-
let commitment = notarization_vote.proposal.payload;
321-
let index = notarization_vote.proposal_signature.index as usize;
322-
shards.try_broadcast_shard(commitment, index).await;
323-
}
324-
Message::Notarization { notarization } => {
325-
let round = notarization.round();
326-
let commitment = notarization.proposal.payload;
327-
328-
// Store notarization by round
329-
self.cache.put_notarization(round, commitment, notarization.clone()).await;
330-
331-
// Search for block locally, otherwise fetch it remotely
332-
if let Some(block) = self.find_block(&mut shards, commitment).await {
333-
// If found, persist the block
334-
self.cache_block(round, commitment, block).await;
335-
} else {
336-
debug!(?round, "notarized block missing");
337-
resolver.fetch(Request::<CodedBlock<B, S>>::Notarized { round }).await;
338-
}
339-
}
340-
Message::Finalization { finalization } => {
341-
// Cache finalization by round
342-
let round = finalization.round();
343-
let commitment = finalization.proposal.payload;
344-
self.cache.put_finalization(round, commitment, finalization.clone()).await;
345-
346-
// Search for block locally, otherwise fetch it remotely
347-
if let Some(block) = self.find_block(&mut shards, commitment).await {
348-
// If found, persist the block
349-
let height = block.height();
350-
self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await;
351-
debug!(?round, height, "finalized block stored");
352-
} else {
353-
// Otherwise, fetch the block from the network.
354-
debug!(?round, ?commitment, "finalized block missing");
355-
resolver.fetch(Request::<CodedBlock<B, S>>::Block(commitment)).await;
356-
}
357-
}
358338
Message::Get { commitment, response } => {
359339
// Check for block locally
360340
let result = self.find_block(&mut shards, commitment).await;
361341
let _ = response.send(result.map(CodedBlock::into_inner));
362342
}
343+
Message::Broadcast { block, peers } => {
344+
shards.broadcast_shards(block, peers).await;
345+
}
363346
Message::Subscribe { round, commitment, response } => {
364347
// Check for block locally
365348
if let Some(block) = self.find_block(&mut shards, commitment).await {
@@ -396,7 +379,7 @@ where
396379
Entry::Vacant(entry) => {
397380
let (tx, rx) = oneshot::channel();
398381
shards.subscribe_block(commitment, tx).await.expect("Reconstruction error not yet handled");
399-
let aborter = waiters.push(async move {
382+
let aborter = block_waiters.push(async move {
400383
(commitment, rx.await.expect("buffer subscriber closed").into_inner())
401384
});
402385
entry.insert(BlockSubscription {
@@ -406,6 +389,71 @@ where
406389
}
407390
}
408391
}
392+
Message::VerifyShard { commitment, index, response } => {
393+
// Check for shard locally
394+
if let Some(shard) = shards.get_shard(commitment, index).await {
395+
let _ = response.send(shard.verify());
396+
continue;
397+
}
398+
399+
match self.shard_validity_subscriptions.entry((commitment, index)) {
400+
Entry::Occupied(mut entry) => {
401+
entry.get_mut().subscribers.push(response);
402+
}
403+
Entry::Vacant(entry) => {
404+
let (tx, rx) = oneshot::channel();
405+
shards.subscribe_shard(commitment, index, tx).await;
406+
let aborter = shard_validity_waiters.push(async move {
407+
let shard = rx.await.expect("shard subscriber closed");
408+
let valid = shard.verify();
409+
((commitment, index), valid)
410+
});
411+
entry.insert(ShardValiditySubscription {
412+
subscribers: vec![response],
413+
_aborter: aborter,
414+
});
415+
}
416+
}
417+
}
418+
Message::Notarize { notarization_vote } => {
419+
let commitment = notarization_vote.proposal.payload;
420+
let index = notarization_vote.proposal_signature.index as usize;
421+
shards.try_broadcast_shard(commitment, index).await;
422+
}
423+
Message::Notarization { notarization } => {
424+
let round = notarization.round();
425+
let commitment = notarization.proposal.payload;
426+
427+
// Store notarization by round
428+
self.cache.put_notarization(round, commitment, notarization.clone()).await;
429+
430+
// Search for block locally, otherwise fetch it remotely
431+
if let Some(block) = self.find_block(&mut shards, commitment).await {
432+
// If found, persist the block
433+
self.cache_block(round, commitment, block).await;
434+
} else {
435+
debug!(?round, "notarized block missing");
436+
resolver.fetch(Request::<CodedBlock<B, S>>::Notarized { round }).await;
437+
}
438+
}
439+
Message::Finalization { finalization } => {
440+
// Cache finalization by round
441+
let round = finalization.round();
442+
let commitment = finalization.proposal.payload;
443+
self.cache.put_finalization(round, commitment, finalization.clone()).await;
444+
445+
// Search for block locally, otherwise fetch it remotely
446+
if let Some(block) = self.find_block(&mut shards, commitment).await {
447+
// If found, persist the block
448+
let height = block.height();
449+
self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await;
450+
debug!(?round, height, "finalized block stored");
451+
} else {
452+
// Otherwise, fetch the block from the network.
453+
debug!(?round, ?commitment, "finalized block missing");
454+
resolver.fetch(Request::<CodedBlock<B, S>>::Block(commitment)).await;
455+
}
456+
}
409457
}
410458
},
411459
// Handle finalizer messages next
@@ -627,14 +675,31 @@ where
627675
// -------------------- Waiters --------------------
628676

629677
/// Notify any subscribers for the given commitment with the provided block.
630-
async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
678+
async fn notify_block_subscribers(&mut self, commitment: B::Commitment, block: &B) {
631679
if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
632680
for subscriber in bs.subscribers.drain(..) {
633681
let _ = subscriber.send(block.clone());
634682
}
635683
}
636684
}
637685

686+
// Notify any subscribers waiting for shard validity.
687+
async fn notify_shard_validity_subscribers(
688+
&mut self,
689+
commitment: B::Commitment,
690+
index: usize,
691+
valid: bool,
692+
) {
693+
if let Some(mut cs) = self
694+
.shard_validity_subscriptions
695+
.remove(&(commitment, index))
696+
{
697+
for subscriber in cs.subscribers.drain(..) {
698+
let _ = subscriber.send(valid);
699+
}
700+
}
701+
}
702+
638703
// -------------------- Prunable Storage --------------------
639704

640705
/// Add a notarized block to the prunable archive.
@@ -644,7 +709,8 @@ where
644709
commitment: B::Commitment,
645710
block: CodedBlock<B, S>,
646711
) {
647-
self.notify_subscribers(commitment, block.inner()).await;
712+
self.notify_block_subscribers(commitment, block.inner())
713+
.await;
648714
self.cache.put_block(round, commitment, block).await;
649715
}
650716

@@ -685,7 +751,8 @@ where
685751
finalization: Option<Finalization<V, B::Commitment>>,
686752
notifier: &mut mpsc::Sender<()>,
687753
) {
688-
self.notify_subscribers(commitment, block.inner()).await;
754+
self.notify_block_subscribers(commitment, block.inner())
755+
.await;
689756

690757
// In parallel, update the finalized blocks and finalizations archives
691758
if let Err(e) = try_join!(

consensus/src/marshal/ingress/coding/types.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,23 @@ impl<S: Scheme, H: Hasher> Shard<S, H> {
136136
self.inner
137137
}
138138

139+
/// Verifies that this shard is valid for the given commitment and index.
140+
///
141+
/// NOTE: If the inner shard is a weak shard, this will always return false, as weak shards
142+
/// cannot be verified in isolation.
143+
pub fn verify(&self) -> bool {
144+
match self.inner {
145+
DistributionShard::Strong(ref shard) => S::reshard(
146+
&self.config,
147+
&self.commitment,
148+
self.index as u16,
149+
shard.clone(),
150+
)
151+
.is_ok(),
152+
DistributionShard::Weak(_) => false,
153+
}
154+
}
155+
139156
/// Returns the UUID of a shard with the given commitment and index.
140157
pub fn uuid(commitment: S::Commitment, index: usize) -> H::Digest {
141158
let mut buf = vec![0u8; S::Commitment::SIZE + u32::SIZE];

consensus/src/marshal/ingress/mailbox.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ pub(crate) enum Message<V: Variant, B: Block, S: Scheme, P: PublicKey> {
2525
/// A channel to send the retrieved block.
2626
response: oneshot::Sender<Option<B>>,
2727
},
28+
/// A request to broadcast a block to all peers.
29+
Broadcast {
30+
/// The erasure coded block to broadcast.
31+
block: CodedBlock<B, S>,
32+
/// The peers to broadcast the shards to.
33+
peers: Vec<P>,
34+
},
2835
/// A request to retrieve a block by its digest.
2936
Subscribe {
3037
/// The view in which the block was notarized. This is an optimization
@@ -35,12 +42,14 @@ pub(crate) enum Message<V: Variant, B: Block, S: Scheme, P: PublicKey> {
3542
/// A channel to send the retrieved block.
3643
response: oneshot::Sender<B>,
3744
},
38-
/// A request to broadcast a block to all peers.
39-
Broadcast {
40-
/// The erasure coded block to broadcast.
41-
block: CodedBlock<B, S>,
42-
/// The peers to broadcast the shards to.
43-
peers: Vec<P>,
45+
/// A request to verify a a shard's inclusion within a commitment.
46+
VerifyShard {
47+
/// The commitment to verify against.
48+
commitment: B::Commitment,
49+
/// The index of the shard to verify.
50+
index: usize,
51+
/// The response channel to send the result to.
52+
response: oneshot::Sender<bool>,
4453
},
4554

4655
// -------------------- Consensus Engine Messages --------------------
@@ -91,6 +100,18 @@ impl<V: Variant, B: Block, S: Scheme, P: PublicKey> Mailbox<V, B, S, P> {
91100
rx
92101
}
93102

103+
/// Broadcast indicates that an erasure coded block should be sent to a given set of peers.
104+
pub async fn broadcast(&mut self, block: CodedBlock<B, S>, peers: Vec<P>) {
105+
if self
106+
.sender
107+
.send(Message::Broadcast { block, peers })
108+
.await
109+
.is_err()
110+
{
111+
error!("failed to send broadcast message to actor: receiver dropped");
112+
}
113+
}
114+
94115
/// Subscribe is a request to retrieve a block by its commitment.
95116
///
96117
/// If the block is found available locally, the block will be returned immediately.
@@ -121,16 +142,33 @@ impl<V: Variant, B: Block, S: Scheme, P: PublicKey> Mailbox<V, B, S, P> {
121142
rx
122143
}
123144

124-
/// Broadcast indicates that an erasure coded block should be sent to a given set of peers.
125-
pub async fn broadcast(&mut self, block: CodedBlock<B, S>, peers: Vec<P>) {
145+
/// Verify a shard's inclusion within a commitment.
146+
///
147+
/// If the shard is available locally, the result will be returned immediately.
148+
///
149+
/// If the shard is not available locally, the request will be registered and the caller will
150+
/// be notified when the shard is available.
151+
///
152+
/// The oneshot receiver should be dropped to cancel the request.
153+
pub async fn verify_shard(
154+
&mut self,
155+
commitment: B::Commitment,
156+
index: usize,
157+
) -> oneshot::Receiver<bool> {
158+
let (tx, rx) = oneshot::channel();
126159
if self
127160
.sender
128-
.send(Message::Broadcast { block, peers })
161+
.send(Message::VerifyShard {
162+
commitment,
163+
index,
164+
response: tx,
165+
})
129166
.await
130167
.is_err()
131168
{
132-
error!("failed to send broadcast message to actor: receiver dropped");
169+
error!("failed to send verify shard message to actor: receiver dropped");
133170
}
171+
rx
134172
}
135173
}
136174

0 commit comments

Comments
 (0)