Skip to content

Commit b5abca8

Browse files
committed
api cleanup - broadcast blocks directly
1 parent dcae530 commit b5abca8

File tree

4 files changed

+399
-398
lines changed

4 files changed

+399
-398
lines changed

consensus/src/marshal/actor.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ where
260260
pub fn start<R>(
261261
mut self,
262262
application: impl Reporter<Activity = B>,
263-
shards: ShardLayer<P, CodedBlock<B, H>, H>,
263+
shards: ShardLayer<P, B, H>,
264264
resolver: (mpsc::Receiver<handler::Message<CodedBlock<B, H>>>, R),
265265
) -> Handle<()>
266266
where
@@ -273,7 +273,7 @@ where
273273
async fn run<R>(
274274
mut self,
275275
application: impl Reporter<Activity = B>,
276-
mut shard_layer: ShardLayer<P, CodedBlock<B, H>, H>,
276+
mut shard_layer: ShardLayer<P, B, H>,
277277
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<CodedBlock<B, H>>>, R),
278278
) where
279279
R: Resolver<Key = handler::Request<CodedBlock<B, H>>>,
@@ -383,12 +383,12 @@ where
383383
}
384384
}
385385
}
386-
Message::Broadcast { coding_commitment, config, chunks } => {
387-
shard_layer.broadcast_chunks(coding_commitment, config, chunks).await;
386+
Message::Broadcast { block, participants } => {
387+
shard_layer.broadcast_shards(block, participants).await;
388388
}
389389
Message::VerifyShard { commitment, index, response } => {
390390
// Check for chunk locally
391-
if let Some(shard) = shard_layer.get_chunk(commitment, index).await {
391+
if let Some(shard) = shard_layer.get_shard(commitment, index).await {
392392
let _ = response.send(shard.verify(index, &commitment));
393393
continue;
394394
}
@@ -399,7 +399,7 @@ where
399399
}
400400
Entry::Vacant(entry) => {
401401
let (tx, rx) = oneshot::channel();
402-
shard_layer.subscribe_chunk(commitment, index, tx).await;
402+
shard_layer.subscribe_shard(commitment, index, tx).await;
403403
let aborter = chunk_waiters.push(async move {
404404
let shard = rx.await.expect("shard subscriber closed");
405405
let valid = shard.verify(index, &commitment);
@@ -790,7 +790,7 @@ where
790790
/// Looks for a block anywhere in local storage.
791791
async fn find_block(
792792
&mut self,
793-
shards: &mut ShardLayer<P, CodedBlock<B, H>, H>,
793+
shards: &mut ShardLayer<P, B, H>,
794794
commitment: B::Commitment,
795795
) -> Option<CodedBlock<B, H>> {
796796
// Check shard layer.

consensus/src/marshal/ingress/coding.rs

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
1010
use crate::Block;
1111
use commonware_broadcast::{buffered, Broadcaster};
12-
use commonware_codec::{EncodeSize, Error as CodecError, FixedSize, Read, ReadExt, Write};
12+
use commonware_codec::{Decode, EncodeSize, Error as CodecError, FixedSize, Read, ReadExt, Write};
1313
use commonware_coding::reed_solomon::{self, decode, Chunk, Error as ReedSolomonError};
1414
use commonware_cryptography::{Committable, Digestible, Hasher, PublicKey};
1515
use commonware_p2p::Recipients;
@@ -47,14 +47,14 @@ where
4747
B: Block<Digest = H::Digest, Commitment = H::Digest>,
4848
H: Hasher,
4949
{
50-
/// Inner [`buffered::Mailbox`] for broadcasting and receiving erasure coded chunks.
51-
mailbox: buffered::Mailbox<P, Shard<B, H>>,
50+
/// Inner [`buffered::Mailbox`] for broadcasting and receiving erasure coded shards.
51+
mailbox: buffered::Mailbox<P, Shard<CodedBlock<B, H>, H>>,
5252

5353
/// [`Read`] configuration for the block type.
5454
block_codec_cfg: B::Cfg,
5555

5656
/// Open subscriptions for blocks by commitment.
57-
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
57+
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<CodedBlock<B, H>>>,
5858
}
5959

6060
impl<P, B, H> ShardLayer<P, B, H>
@@ -64,7 +64,10 @@ where
6464
H: Hasher,
6565
{
6666
/// Create a new [ShardLayer] with the given buffered mailbox.
67-
pub fn new(mailbox: buffered::Mailbox<P, Shard<B, H>>, block_codec_cfg: B::Cfg) -> Self {
67+
pub fn new(
68+
mailbox: buffered::Mailbox<P, Shard<CodedBlock<B, H>, H>>,
69+
block_codec_cfg: B::Cfg,
70+
) -> Self {
6871
Self {
6972
mailbox,
7073
block_codec_cfg,
@@ -73,14 +76,9 @@ where
7376
}
7477

7578
/// Broadcasts [Shard]s of a [Block] to a pre-determined set of peers.
76-
pub async fn broadcast_chunks(
77-
&mut self,
78-
coding_commitment: B::Commitment,
79-
config: (u16, u16),
80-
chunks: Vec<(P, Chunk<H>)>,
81-
) {
82-
for (peer, chunk) in chunks {
83-
let message = Shard::new(coding_commitment, config, chunk);
79+
pub async fn broadcast_shards(&mut self, block: CodedBlock<B, H>, participants: Vec<P>) {
80+
for (i, peer) in participants.into_iter().enumerate() {
81+
let message = block.shard(i as u16).expect("invalid shard index");
8482
let _peers = self.mailbox.broadcast(Recipients::One(peer), message).await;
8583
}
8684
}
@@ -113,14 +111,14 @@ where
113111
pub async fn try_reconstruct(
114112
&mut self,
115113
commitment: B::Commitment,
116-
) -> Result<Option<B>, ReconstructionError> {
117-
let available_chunks = self.mailbox.get(None, commitment, None).await;
114+
) -> Result<Option<CodedBlock<B, H>>, ReconstructionError> {
115+
let available_shards = self.mailbox.get(None, commitment, None).await;
118116

119-
let Some((total, min)) = available_chunks.first().map(|c| c.config) else {
120-
// No chunks available.
117+
let Some((total, min)) = available_shards.first().map(|c| c.config) else {
118+
// No shards available.
121119
return Ok(None);
122120
};
123-
let coded_chunks = available_chunks
121+
let coded_shards = available_shards
124122
.iter()
125123
.cloned()
126124
.map(|c| c.chunk)
@@ -129,23 +127,24 @@ where
129127
// TODO: Make sure min is all valid chunks
130128
// TODO: If we do encounter a block that's invalid, block the peer.
131129

132-
if coded_chunks.len() < min as usize {
133-
// Not enough chunks to recover the block yet.
130+
if coded_shards.len() < min as usize {
131+
// Not enough shards to recover the block yet.
134132
debug!(
135133
%commitment,
136-
have = coded_chunks.len(),
134+
have = coded_shards.len(),
137135
need = min,
138-
"not enough chunks to reconstruct block",
136+
"not enough shards to reconstruct block",
139137
);
140138
return Ok(None);
141139
}
142140

143-
// Attempt to recover the block from the available chunks. This process will also
144-
// check the chunks' inclusion within the commitment.
145-
let recovered = decode(total, min, &commitment, coded_chunks)?;
141+
// Attempt to recover the block from the available shards. This process will also
142+
// check the shards' inclusion within the commitment.
143+
let recovered = decode(total, min, &commitment, coded_shards)?;
146144

147145
// Attempt to decode the block from the recovered data.
148-
let block = B::decode_cfg(&mut recovered.as_slice(), &self.block_codec_cfg)?;
146+
let block =
147+
CodedBlock::<B, H>::decode_cfg(&mut recovered.as_slice(), &self.block_codec_cfg)?;
149148

150149
// Notify any subscribers that have been waiting for this block.
151150
if let Some(mut sub) = self.block_subscriptions.remove(&commitment) {
@@ -156,8 +155,8 @@ where
156155

157156
info!(
158157
%commitment,
159-
digest = %block.digest(),
160-
height = block.height(),
158+
digest = %block.inner().digest(),
159+
height = block.inner().height(),
161160
"successfully reconstructed block"
162161
);
163162

@@ -172,7 +171,7 @@ where
172171
pub async fn subscribe_block(
173172
&mut self,
174173
commitment: B::Commitment,
175-
responder: oneshot::Sender<B>,
174+
responder: oneshot::Sender<CodedBlock<B, H>>,
176175
) -> Result<(), ReconstructionError> {
177176
match self.block_subscriptions.entry(commitment) {
178177
Entry::Vacant(entry) => {
@@ -185,36 +184,37 @@ where
185184
}
186185
}
187186

188-
// Try to reconstruct the block immediately in case we already have enough chunks.
187+
// Try to reconstruct the block immediately in case we already have enough shards.
189188
self.try_reconstruct(commitment).await?;
190189

191190
Ok(())
192191
}
193192

194-
pub async fn get_chunk(
193+
/// Performs a best-effort retrieval of a shard by commitment and index. If the mailbox does
194+
/// not have the shard cached, `None` is returned.
195+
pub async fn get_shard(
195196
&mut self,
196197
commitment: B::Commitment,
197198
index: u16,
198-
) -> Option<Shard<B, H>> {
199+
) -> Option<Shard<CodedBlock<B, H>, H>> {
199200
let index_hash = shard_uuid::<B, H>(commitment, index);
200201
self.mailbox
201202
.get(None, commitment, Some(index_hash))
202203
.await
203-
.iter()
204+
.first()
204205
.cloned()
205-
.next()
206206
}
207207

208-
/// Subscribes to a chunk by commitment and index with an externally prepared responder.
208+
/// Subscribes to a shard by commitment and index with an externally prepared responder.
209209
///
210-
/// The responder will be sent the chunk when it is available; either instantly (if cached)
210+
/// The responder will be sent the shard when it is available; either instantly (if cached)
211211
/// or when it is received from the network. The request can be canceled by dropping the
212212
/// responder.
213-
pub async fn subscribe_chunk(
213+
pub async fn subscribe_shard(
214214
&mut self,
215215
commitment: B::Commitment,
216216
index: u16,
217-
responder: oneshot::Sender<Shard<B, H>>,
217+
responder: oneshot::Sender<Shard<CodedBlock<B, H>, H>>,
218218
) {
219219
let index_hash = shard_uuid::<B, H>(commitment, index);
220220
self.mailbox
@@ -225,10 +225,10 @@ where
225225

226226
/// A broadcastable, erasure coded [Chunk] of a [Block].
227227
///
228-
/// Each chunk is associated with a commitment to the full block's
229-
/// erasure coded data. This allows recipients to verify the integrity of the chunk
230-
/// (to varying degrees; For reed-solomon which is currently hard-coded, no guarantee
231-
/// of the chunk's correctness is possible without additional chunks.)
228+
/// Each shard is associated with a commitment to the full block's erasure coded data.
229+
/// This allows recipients to verify the integrity of the shard (to varying degrees; For
230+
/// reed-solomon which is currently hard-coded, no guarantee of the shard's correctness
231+
/// is possible without additional shard.)
232232
#[derive(Debug, Clone)]
233233
pub struct Shard<B, H>
234234
where
@@ -245,7 +245,7 @@ where
245245
B: Block<Digest = H::Digest, Commitment = H::Digest>,
246246
H: Hasher,
247247
{
248-
/// Create a new [Shard] from a block's hash, coding commitment, and a chunk
248+
/// Create a new [Shard] from a block's hash, coding commitment, and a [Chunk]
249249
/// of the coded block.
250250
///
251251
/// ## Panics
@@ -443,6 +443,15 @@ where
443443
pub fn chunks(&self) -> &[Chunk<H>] {
444444
self.chunks.as_slice()
445445
}
446+
447+
/// Returns a [Shard] at the given index, if the index is valid.
448+
pub fn shard(&self, index: u16) -> Option<Shard<CodedBlock<B, H>, H>> {
449+
Some(Shard::new(
450+
self.commitment,
451+
self.config,
452+
self.chunks.get(index as usize)?.clone(),
453+
))
454+
}
446455
}
447456

448457
impl<B, H> Write for CodedBlock<B, H>

consensus/src/marshal/ingress/mailbox.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::{
2+
marshal::ingress::coding::CodedBlock,
23
threshold_simplex::types::{Activity, Finalization, Finalize, Notarization, Notarize},
34
types::Round,
45
Block, Reporter,
56
};
6-
use commonware_coding::reed_solomon::Chunk;
77
use commonware_cryptography::{bls12381::primitives::variant::Variant, Hasher, PublicKey};
88
use futures::{
99
channel::{mpsc, oneshot},
@@ -15,12 +15,13 @@ use tracing::error;
1515
///
1616
/// These messages are sent from the consensus engine and other parts of the
1717
/// system to drive the state of the marshal.
18-
pub(crate) enum Message<
18+
pub(crate) enum Message<V, B, P, H>
19+
where
1920
V: Variant,
2021
B: Block<Digest = H::Digest, Commitment = H::Digest>,
2122
P: PublicKey,
2223
H: Hasher,
23-
> {
24+
{
2425
// -------------------- Application Messages --------------------
2526
/// A request to retrieve a block by its commitment.
2627
Get {
@@ -41,12 +42,10 @@ pub(crate) enum Message<
4142
},
4243
/// A request to broadcast an erasure coded block to all peers.
4344
Broadcast {
44-
/// The coding commitment of the block.
45-
coding_commitment: B::Commitment,
46-
/// The erasure coding configuration.
47-
config: (u16, u16),
48-
/// The chunks and their corresponding participants.
49-
chunks: Vec<(P, Chunk<H>)>,
45+
/// The block to broadcast.
46+
block: CodedBlock<B, H>,
47+
/// The participants to share the block with.
48+
participants: Vec<P>,
5049
},
5150
/// A reqeuest to verify that a shard of a block at a given index is contained within
5251
/// the given commitment.
@@ -84,17 +83,22 @@ pub(crate) enum Message<
8483

8584
/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
8685
#[derive(Clone)]
87-
pub struct Mailbox<
86+
pub struct Mailbox<V, B, P, H>
87+
where
8888
V: Variant,
8989
B: Block<Digest = H::Digest, Commitment = H::Digest>,
9090
P: PublicKey,
9191
H: Hasher,
92-
> {
92+
{
9393
sender: mpsc::Sender<Message<V, B, P, H>>,
9494
}
9595

96-
impl<V: Variant, B: Block<Digest = H::Digest, Commitment = H::Digest>, P: PublicKey, H: Hasher>
97-
Mailbox<V, B, P, H>
96+
impl<V, B, P, H> Mailbox<V, B, P, H>
97+
where
98+
V: Variant,
99+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
100+
P: PublicKey,
101+
H: Hasher,
98102
{
99103
/// Creates a new mailbox.
100104
pub(crate) fn new(sender: mpsc::Sender<Message<V, B, P, H>>) -> Self {
@@ -149,19 +153,13 @@ impl<V: Variant, B: Block<Digest = H::Digest, Commitment = H::Digest>, P: Public
149153
rx
150154
}
151155

152-
/// Broadcast indicates that a block should be sent to all peers.
153-
pub async fn broadcast(
154-
&mut self,
155-
coding_commitment: B::Commitment,
156-
config: (u16, u16),
157-
chunks: Vec<(P, Chunk<H>)>,
158-
) {
156+
/// Broadcast indicates that an erasure coded block should be broadcasted to a set of participants.
157+
pub async fn broadcast(&mut self, block: CodedBlock<B, H>, participants: Vec<P>) {
159158
if self
160159
.sender
161160
.send(Message::Broadcast {
162-
coding_commitment,
163-
config,
164-
chunks,
161+
block,
162+
participants,
165163
})
166164
.await
167165
.is_err()
@@ -192,8 +190,12 @@ impl<V: Variant, B: Block<Digest = H::Digest, Commitment = H::Digest>, P: Public
192190
}
193191
}
194192

195-
impl<V: Variant, B: Block<Digest = H::Digest, Commitment = H::Digest>, P: PublicKey, H: Hasher>
196-
Reporter for Mailbox<V, B, P, H>
193+
impl<V, B, P, H> Reporter for Mailbox<V, B, P, H>
194+
where
195+
V: Variant,
196+
B: Block<Digest = H::Digest, Commitment = H::Digest>,
197+
P: PublicKey,
198+
H: Hasher,
197199
{
198200
type Activity = Activity<V, B::Commitment>;
199201

0 commit comments

Comments
 (0)