Skip to content

Commit e44f142

Browse files
committed
support subscriptions
1 parent 0f674fe commit e44f142

File tree

4 files changed

+176
-19
lines changed

4 files changed

+176
-19
lines changed

consensus/src/marshal/actor.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::{
1515
Block, Reporter,
1616
};
1717
use commonware_codec::{Decode, Encode};
18+
use commonware_coding::reed_solomon::Chunk;
1819
use commonware_cryptography::{bls12381::primitives::variant::Variant, Hasher, PublicKey};
1920
use commonware_macros::select;
2021
use commonware_resolver::Resolver;
@@ -37,9 +38,17 @@ use tracing::{debug, info, warn};
3738

3839
/// A struct that holds multiple subscriptions for a block.
3940
struct BlockSubscription<B: Block> {
40-
// The subscribers that are waiting for the block
41+
/// The subscribers that are waiting for the block
4142
subscribers: Vec<oneshot::Sender<B>>,
42-
// Aborter that aborts the waiter future when dropped
43+
/// Aborter that aborts the waiter future when dropped
44+
_aborter: Aborter,
45+
}
46+
47+
/// A struct that holds multiple subscriptions for a chunk.
48+
struct ChunkSubscription<H: Hasher> {
49+
/// The subscribers that are waiting for the chunk
50+
subscribers: Vec<oneshot::Sender<Chunk<H>>>,
51+
/// Aborter that aborts the waiter future when dropped
4352
_aborter: Aborter,
4453
}
4554

@@ -92,6 +101,8 @@ where
92101

93102
// Outstanding subscriptions for blocks
94103
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
104+
// Outstanding subscriptions for chunks
105+
chunk_subscriptions: BTreeMap<B::Commitment, ChunkSubscription<H>>,
95106

96107
// ---------- Storage ----------
97108
// Prunable cache
@@ -232,6 +243,7 @@ where
232243
codec_config: config.codec_config,
233244
last_processed_round: Round::new(0, 0),
234245
block_subscriptions: BTreeMap::new(),
246+
chunk_subscriptions: BTreeMap::new(),
235247
cache,
236248
finalizations_by_height,
237249
finalized_blocks,
@@ -282,7 +294,8 @@ where
282294
.spawn(|_| finalizer.run());
283295

284296
// Create a local pool for waiter futures
285-
let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
297+
let mut block_waiters = AbortablePool::<(B::Commitment, B)>::default();
298+
let mut chunk_waiters = AbortablePool::<((B::Commitment, u16), Chunk<H>)>::default();
286299

287300
// Handle messages
288301
loop {
@@ -295,7 +308,7 @@ where
295308
// Select messages
296309
select! {
297310
// Handle waiter completions first
298-
result = waiters.next_completed() => {
311+
result = block_waiters.next_completed() => {
299312
let Ok((commitment, block)) = result else {
300313
continue; // Aborted future
301314
};
@@ -364,6 +377,24 @@ where
364377
let result = self.find_block(&mut shard_layer, commitment).await;
365378
let _ = response.send(result);
366379
}
380+
Message::SubscribeChunk { commitment, index, response } => {
381+
match self.chunk_subscriptions.entry(commitment) {
382+
Entry::Occupied(mut entry) => {
383+
entry.get_mut().subscribers.push(response);
384+
}
385+
Entry::Vacant(entry) => {
386+
let (tx, rx) = oneshot::channel();
387+
shard_layer.subscribe_chunk(commitment, index, tx).await;
388+
let aborter = chunk_waiters.push(async move {
389+
((commitment, index), rx.await.expect("shard subscriber closed"))
390+
});
391+
entry.insert(ChunkSubscription {
392+
subscribers: vec![response],
393+
_aborter: aborter,
394+
});
395+
}
396+
}
397+
}
367398
Message::Subscribe { round, commitment, response } => {
368399
// Check for block locally
369400
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {
@@ -399,8 +430,8 @@ where
399430
}
400431
Entry::Vacant(entry) => {
401432
let (tx, rx) = oneshot::channel();
402-
shard_layer.subscribe_prepared(commitment, tx).await.unwrap();
403-
let aborter = waiters.push(async move {
433+
shard_layer.subscribe_block(commitment, tx).await.expect("Reconstruction error not yet handled");
434+
let aborter = block_waiters.push(async move {
404435
(commitment, rx.await.expect("shard subscriber closed"))
405436
});
406437
entry.insert(BlockSubscription {

consensus/src/marshal/ingress/coding.rs

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use futures::channel::oneshot;
2222
use governor::clock::Clock as GClock;
2323
use rand::Rng;
2424
use std::{
25+
collections::{btree_map::Entry, BTreeMap},
2526
fmt::Debug,
2627
num::{NonZero, NonZeroUsize},
2728
ops::Deref,
@@ -59,6 +60,16 @@ pub struct Config {
5960
pub buffer_pool: PoolRef,
6061
}
6162

63+
/// A subscription for a block by its commitment.
64+
struct BlockSubscription<B: Block> {
65+
subscribers: Vec<oneshot::Sender<B>>,
66+
}
67+
68+
/// A subscription for a chunk by its commitment and index.
69+
struct ChunkSubscription<H: Hasher> {
70+
subscribers: Vec<oneshot::Sender<Chunk<H>>>,
71+
}
72+
6273
/// A layer that handles receiving erasure coded [Block]s from the [Actor](super::super::actor::Actor),
6374
/// broadcasting them to peers, and reassembling them from received [Shard]s.
6475
pub struct ShardLayer<E, P, B, H>
@@ -76,6 +87,12 @@ where
7687

7788
/// Map of block digests -> coding commitments.
7889
digest_map: prunable::Archive<TwoCap, E, H::Digest, H::Digest>,
90+
91+
/// Open subscriptions for blocks by commitment.
92+
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
93+
94+
/// Open subscriptions for chunks by commitment and index.
95+
chunk_subscriptions: BTreeMap<(B::Commitment, u16), ChunkSubscription<H>>,
7996
}
8097

8198
impl<E, P, B, H> ShardLayer<E, P, B, H>
@@ -110,6 +127,8 @@ where
110127
mailbox,
111128
block_codec_cfg,
112129
digest_map,
130+
block_subscriptions: BTreeMap::new(),
131+
chunk_subscriptions: BTreeMap::new(),
113132
}
114133
}
115134

@@ -167,6 +186,16 @@ where
167186
.map(|c| c.chunk)
168187
.collect::<Vec<_>>();
169188

189+
// Attempt to resolve any open subscriptions for the chunks we have.
190+
for chunk in coded_chunks.iter() {
191+
if let Some(subs) = self.chunk_subscriptions.get_mut(&(commitment, chunk.index)) {
192+
for sub in subs.subscribers.drain(..) {
193+
let _ = sub.send(chunk.clone());
194+
}
195+
self.chunk_subscriptions.remove(&(commitment, chunk.index));
196+
}
197+
}
198+
170199
if coded_chunks.len() < min as usize {
171200
// Not enough chunks to recover the block yet.
172201
debug!(
@@ -193,6 +222,14 @@ where
193222
.await;
194223
}
195224

225+
// Attempt to resolve any open subscriptions for this block.
226+
if let Some(subs) = self.block_subscriptions.get_mut(&commitment) {
227+
for sub in subs.subscribers.drain(..) {
228+
let _ = sub.send(block.clone());
229+
}
230+
self.block_subscriptions.remove(&commitment);
231+
}
232+
196233
info!(
197234
%commitment,
198235
digest = %block.digest(),
@@ -205,15 +242,59 @@ where
205242

206243
/// Subscribes to a block by commitment with an externally prepared responder.
207244
///
208-
/// The responder will be sent the first message for a commitment when it is available; either
209-
/// instantly (if cached) or when it is received from the network. The request can be canceled
210-
/// by dropping the responder.
211-
pub async fn subscribe_prepared(
245+
/// The responder will be sent the block when it is available; either instantly (if cached)
246+
/// or when it is received from the network. The request can be canceled by dropping the
247+
/// responder.
248+
pub async fn subscribe_block(
212249
&mut self,
213-
_commitment: B::Commitment,
214-
_responder: oneshot::Sender<B>,
250+
commitment: B::Commitment,
251+
responder: oneshot::Sender<B>,
215252
) -> Result<(), ReconstructionError> {
216-
todo!("Create subscription");
253+
match self.block_subscriptions.entry(commitment) {
254+
Entry::Vacant(entry) => {
255+
entry.insert(BlockSubscription {
256+
subscribers: vec![responder],
257+
});
258+
}
259+
Entry::Occupied(mut entry) => {
260+
entry.get_mut().subscribers.push(responder);
261+
}
262+
}
263+
264+
// Try to reconstruct the block immediately in case we already have enough chunks.
265+
self.try_reconstruct(commitment).await?;
266+
267+
Ok(())
268+
}
269+
270+
/// Subscribes to a chunk by commitment and index with an externally prepared responder.
271+
///
272+
/// The responder will be sent the chunk when it is available; either instantly (if cached)
273+
/// or when it is received from the network. The request can be canceled by dropping the
274+
/// responder.
275+
pub async fn subscribe_chunk(
276+
&mut self,
277+
commitment: B::Commitment,
278+
index: u16,
279+
responder: oneshot::Sender<Chunk<H>>,
280+
) {
281+
let available_chunks = self.mailbox.get(None, commitment, None).await;
282+
283+
if let Some(shard) = available_chunks.iter().find(|s| s.chunk.index == index) {
284+
let _ = responder.send(shard.chunk.clone());
285+
return;
286+
}
287+
288+
match self.chunk_subscriptions.entry((commitment, index)) {
289+
Entry::Vacant(entry) => {
290+
entry.insert(ChunkSubscription {
291+
subscribers: vec![responder],
292+
});
293+
}
294+
Entry::Occupied(mut entry) => {
295+
entry.get_mut().subscribers.push(responder);
296+
}
297+
}
217298
}
218299

219300
/// Puts a coding commitment in the store, keyed by digest and block height.

consensus/src/marshal/ingress/mailbox.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,27 @@ pub(crate) enum Message<V: Variant, B: Block, P: PublicKey, H: Hasher> {
2424
/// A channel to send the retrieved block.
2525
response: oneshot::Sender<Option<B>>,
2626
},
27-
/// A request to retrieve a block by its digest.
27+
/// A request to retrieve a block by its commitment.
28+
///
29+
/// TODO: For a cleaner API, we should have this be retrieve-by-digest.
2830
Subscribe {
2931
/// The view in which the block was notarized. This is an optimization
3032
/// to help locate the block.
3133
round: Option<Round>,
32-
/// The digest of the block to retrieve.
34+
/// The coding commitment of the block to retrieve.
3335
commitment: B::Commitment,
3436
/// A channel to send the retrieved block.
3537
response: oneshot::Sender<B>,
3638
},
39+
/// A request to retrieve an erasure coded chunk by its commitment and index.
40+
SubscribeChunk {
41+
/// The commitment of the chunk to retrieve.
42+
commitment: B::Commitment,
43+
/// The index of the chunk to retrieve.
44+
index: u16,
45+
/// A channel to send the retrieved chunk.
46+
response: oneshot::Sender<Chunk<H>>,
47+
},
3748
/// A request to broadcast an erasure coded block to all peers.
3849
Broadcast {
3950
/// The coding commitment of the block.
@@ -134,6 +145,36 @@ impl<V: Variant, B: Block, P: PublicKey, H: Hasher> Mailbox<V, B, P, H> {
134145
rx
135146
}
136147

148+
/// Subscribe chunk is a request to receive an erasure coded chunk by its commitment and index.
149+
///
150+
/// If the chunk is found available locally, the chunk will be returned immediately.
151+
///
152+
/// If the chunk is not available locally, the request will be registered and the caller will
153+
/// be notified when the chunk is available. If the chunk is not part of a finalized block, it's
154+
/// possible that it may never become available.
155+
///
156+
/// The oneshot receiver should be dropped to cancel the subscription.
157+
pub async fn subscribe_chunk(
158+
&mut self,
159+
commitment: B::Commitment,
160+
index: u16,
161+
) -> oneshot::Receiver<Chunk<H>> {
162+
let (tx, rx) = oneshot::channel();
163+
if self
164+
.sender
165+
.send(Message::SubscribeChunk {
166+
commitment,
167+
index,
168+
response: tx,
169+
})
170+
.await
171+
.is_err()
172+
{
173+
error!("failed to send subscribe chunk message to actor: receiver dropped");
174+
}
175+
rx
176+
}
177+
137178
/// Broadcast indicates that a block should be sent to all peers.
138179
pub async fn broadcast(
139180
&mut self,

consensus/src/marshal/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ mod tests {
377377
}
378378
}
379379

380-
#[test_traced("WARN")]
380+
#[test_traced("DEBUG")]
381381
fn test_finalize_bad_links() {
382382
for seed in 0..5 {
383383
let result1 = finalize(seed, UNRELIABLE_LINK);
@@ -468,6 +468,10 @@ mod tests {
468468
.await;
469469
}
470470

471+
// Wait for the block chunks to be delivered; Before making a notarization,
472+
// the chunks must be present.
473+
context.sleep(link.latency + link.jitter).await;
474+
471475
// Notarize block by the validator that broadcasted it
472476
let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
473477
actor
@@ -623,9 +627,6 @@ mod tests {
623627
let (commitment1, config1, chunks1) = shard(&block1, &peers);
624628
let (commitment2, config2, chunks2) = shard(&block2, &peers);
625629

626-
actor.broadcast(commitment1, config1, chunks1).await;
627-
actor.broadcast(commitment2, config2, chunks2).await;
628-
629630
let sub1_rx = actor
630631
.subscribe(Some(Round::from((0, 1))), commitment1)
631632
.await;
@@ -636,6 +637,9 @@ mod tests {
636637
.subscribe(Some(Round::from((0, 1))), commitment1)
637638
.await;
638639

640+
actor.broadcast(commitment1, config1, chunks1).await;
641+
actor.broadcast(commitment2, config2, chunks2).await;
642+
639643
for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
640644
let proposal = Proposal {
641645
round: Round::new(0, view),

0 commit comments

Comments
 (0)