Skip to content

Commit ad6e255

Browse files
committed
fix shard sub
1 parent 4637881 commit ad6e255

File tree

3 files changed

+68
-46
lines changed

3 files changed

+68
-46
lines changed

consensus/src/marshal/actor.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::{
99
},
1010
};
1111
use crate::{
12-
marshal::ingress::coding::{CodedBlock, ShardLayer},
12+
marshal::ingress::coding::{CodedBlock, Shard, ShardLayer},
1313
threshold_simplex::types::{Finalization, Notarization},
1414
types::Round,
1515
Block, Reporter,
@@ -47,9 +47,13 @@ struct BlockSubscription<B: Block> {
4747
}
4848

4949
/// A struct that holds multiple subscriptions for a chunk.
50-
struct ChunkSubscription<H: Hasher> {
50+
struct ChunkSubscription<B, H>
51+
where
52+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
53+
H: Hasher,
54+
{
5155
/// The subscribers that are waiting for the chunk
52-
subscribers: Vec<oneshot::Sender<Chunk<H>>>,
56+
subscribers: Vec<oneshot::Sender<Shard<CodedBlock<B, H>, H>>>,
5357
/// Aborter that aborts the waiter future when dropped
5458
_aborter: Aborter,
5559
}
@@ -104,7 +108,7 @@ where
104108
// Outstanding subscriptions for blocks
105109
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
106110
// Outstanding subscriptions for chunks
107-
chunk_subscriptions: BTreeMap<B::Commitment, ChunkSubscription<H>>,
111+
chunk_subscriptions: BTreeMap<B::Commitment, ChunkSubscription<B, H>>,
108112

109113
// ---------- Storage ----------
110114
// Prunable cache
@@ -297,7 +301,8 @@ where
297301

298302
// Create a local pool for waiter futures
299303
let mut block_waiters = AbortablePool::<(B::Commitment, CodedBlock<B, H>)>::default();
300-
let mut chunk_waiters = AbortablePool::<((B::Commitment, u16), Chunk<H>)>::default();
304+
let mut chunk_waiters =
305+
AbortablePool::<((B::Commitment, u16), Shard<CodedBlock<B, H>, H>)>::default();
301306

302307
// Handle messages
303308
loop {
@@ -381,6 +386,12 @@ where
381386
let _ = response.send(result.map(CodedBlock::take_inner));
382387
}
383388
Message::SubscribeChunk { commitment, index, response } => {
389+
// Check for chunk locally
390+
if let Some(shard) = shard_layer.get_chunk(commitment, index).await {
391+
let _ = response.send(shard);
392+
continue;
393+
}
394+
384395
match self.chunk_subscriptions.entry(commitment) {
385396
Entry::Occupied(mut entry) => {
386397
entry.get_mut().subscribers.push(response);

consensus/src/marshal/ingress/coding.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
1010
use crate::Block;
1111
use commonware_broadcast::{buffered, Broadcaster};
12-
use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
12+
use commonware_codec::{EncodeSize, Error as CodecError, FixedSize, Read, ReadExt, Write};
1313
use commonware_coding::reed_solomon::{self, decode, Chunk, Error as ReedSolomonError};
1414
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
1515
use commonware_p2p::Recipients;
@@ -60,9 +60,6 @@ where
6060

6161
/// Open subscriptions for blocks by commitment.
6262
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
63-
64-
/// Open subscriptions for chunks by commitment and index.
65-
chunk_subscriptions: BTreeMap<(B::Commitment, u16), ChunkSubscription<H>>,
6663
}
6764

6865
impl<P, B, H> ShardLayer<P, B, H>
@@ -77,7 +74,6 @@ where
7774
mailbox,
7875
block_codec_cfg,
7976
block_subscriptions: BTreeMap::new(),
80-
chunk_subscriptions: BTreeMap::new(),
8177
}
8278
}
8379

@@ -135,15 +131,6 @@ where
135131
.map(|c| c.chunk)
136132
.collect::<Vec<_>>();
137133

138-
// Attempt to resolve any open subscriptions for the chunks we have.
139-
for chunk in coded_chunks.iter() {
140-
if let Some(mut subs) = self.chunk_subscriptions.remove(&(commitment, chunk.index)) {
141-
for sub in subs.subscribers.drain(..) {
142-
let _ = sub.send(chunk.clone());
143-
}
144-
}
145-
}
146-
147134
if coded_chunks.len() < min as usize {
148135
// Not enough chunks to recover the block yet.
149136
debug!(
@@ -206,6 +193,23 @@ where
206193
Ok(())
207194
}
208195

196+
pub async fn get_chunk(
197+
&mut self,
198+
commitment: B::Commitment,
199+
index: u16,
200+
) -> Option<Shard<B, H>> {
201+
let mut buf = vec![0u8; <H::Digest as FixedSize>::SIZE + 2];
202+
buf[..<H::Digest as FixedSize>::SIZE].copy_from_slice(commitment.as_ref());
203+
buf[<H::Digest as FixedSize>::SIZE..].copy_from_slice(index.to_le_bytes().as_ref());
204+
let index_hash = H::hash(buf.as_ref());
205+
self.mailbox
206+
.get(None, commitment, Some(index_hash))
207+
.await
208+
.iter()
209+
.cloned()
210+
.next()
211+
}
212+
209213
/// Subscribes to a chunk by commitment and index with an externally prepared responder.
210214
///
211215
/// The responder will be sent the chunk when it is available; either instantly (if cached)
@@ -215,25 +219,15 @@ where
215219
&mut self,
216220
commitment: B::Commitment,
217221
index: u16,
218-
responder: oneshot::Sender<Chunk<H>>,
222+
responder: oneshot::Sender<Shard<B, H>>,
219223
) {
220-
let available_chunks = self.mailbox.get(None, commitment, None).await;
221-
222-
if let Some(shard) = available_chunks.iter().find(|s| s.chunk.index == index) {
223-
let _ = responder.send(shard.chunk.clone());
224-
return;
225-
}
226-
227-
match self.chunk_subscriptions.entry((commitment, index)) {
228-
Entry::Vacant(entry) => {
229-
entry.insert(ChunkSubscription {
230-
subscribers: vec![responder],
231-
});
232-
}
233-
Entry::Occupied(mut entry) => {
234-
entry.get_mut().subscribers.push(responder);
235-
}
236-
}
224+
let mut buf = vec![0u8; <H::Digest as FixedSize>::SIZE + 2];
225+
buf[..<H::Digest as FixedSize>::SIZE].copy_from_slice(commitment.as_ref());
226+
buf[<H::Digest as FixedSize>::SIZE..].copy_from_slice(index.to_le_bytes().as_ref());
227+
let index_hash = H::hash(buf.as_ref());
228+
self.mailbox
229+
.subscribe_prepared(None, commitment, Some(index_hash), responder)
230+
.await;
237231
}
238232
}
239233

@@ -326,9 +320,11 @@ where
326320
type Digest = H::Digest;
327321

328322
fn digest(&self) -> Self::Digest {
329-
// NOTE: This is a lil weird; only doing this to namespace the shard within the buffered mailbox, such that
330-
// shards from separate validators can be enqueued without replacing each other.
331-
H::hash(self.chunk.encode().as_ref())
323+
let mut buf = vec![0u8; <H::Digest as FixedSize>::SIZE + 2];
324+
buf[..<H::Digest as FixedSize>::SIZE].copy_from_slice(self.commitment.as_ref());
325+
buf[<H::Digest as FixedSize>::SIZE..]
326+
.copy_from_slice(self.chunk.index.to_le_bytes().as_ref());
327+
H::hash(buf.as_ref())
332328
}
333329
}
334330

consensus/src/marshal/ingress/mailbox.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
marshal::ingress::coding::{CodedBlock, Shard},
23
threshold_simplex::types::{Activity, Finalization, Finalize, Notarization, Notarize},
34
types::Round,
45
Block, Reporter,
@@ -15,7 +16,12 @@ use tracing::error;
1516
///
1617
/// These messages are sent from the consensus engine and other parts of the
1718
/// system to drive the state of the marshal.
18-
pub(crate) enum Message<V: Variant, B: Block, P: PublicKey, H: Hasher> {
19+
pub(crate) enum Message<
20+
V: Variant,
21+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
22+
P: PublicKey,
23+
H: Hasher,
24+
> {
1925
// -------------------- Application Messages --------------------
2026
/// A request to retrieve a block by its commitment.
2127
Get {
@@ -41,7 +47,7 @@ pub(crate) enum Message<V: Variant, B: Block, P: PublicKey, H: Hasher> {
4147
/// The index of the chunk to retrieve.
4248
index: u16,
4349
/// A channel to send the retrieved chunk.
44-
response: oneshot::Sender<Chunk<H>>,
50+
response: oneshot::Sender<Shard<CodedBlock<B, H>, H>>,
4551
},
4652
/// A request to broadcast an erasure coded block to all peers.
4753
Broadcast {
@@ -78,11 +84,18 @@ pub(crate) enum Message<V: Variant, B: Block, P: PublicKey, H: Hasher> {
7884

7985
/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
8086
#[derive(Clone)]
81-
pub struct Mailbox<V: Variant, B: Block, P: PublicKey, H: Hasher> {
87+
pub struct Mailbox<
88+
V: Variant,
89+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
90+
P: PublicKey,
91+
H: Hasher,
92+
> {
8293
sender: mpsc::Sender<Message<V, B, P, H>>,
8394
}
8495

85-
impl<V: Variant, B: Block, P: PublicKey, H: Hasher> Mailbox<V, B, P, H> {
96+
impl<V: Variant, B: Block<Digest = H::Digest, Commitment = H::Digest>, P: PublicKey, H: Hasher>
97+
Mailbox<V, B, P, H>
98+
{
8699
/// Creates a new mailbox.
87100
pub(crate) fn new(sender: mpsc::Sender<Message<V, B, P, H>>) -> Self {
88101
Self { sender }
@@ -149,7 +162,7 @@ impl<V: Variant, B: Block, P: PublicKey, H: Hasher> Mailbox<V, B, P, H> {
149162
&mut self,
150163
commitment: B::Commitment,
151164
index: u16,
152-
) -> oneshot::Receiver<Chunk<H>> {
165+
) -> oneshot::Receiver<Shard<CodedBlock<B, H>, H>> {
153166
let (tx, rx) = oneshot::channel();
154167
if self
155168
.sender
@@ -188,7 +201,9 @@ impl<V: Variant, B: Block, P: PublicKey, H: Hasher> Mailbox<V, B, P, H> {
188201
}
189202
}
190203

191-
impl<V: Variant, B: Block, P: PublicKey, H: Hasher> Reporter for Mailbox<V, B, P, H> {
204+
impl<V: Variant, B: Block<Digest = H::Digest, Commitment = H::Digest>, P: PublicKey, H: Hasher>
205+
Reporter for Mailbox<V, B, P, H>
206+
{
192207
type Activity = Activity<V, B::Commitment>;
193208

194209
async fn report(&mut self, activity: Self::Activity) {

0 commit comments

Comments
 (0)