Skip to content

Commit 0f674fe

Browse files
committed
cont.d
1 parent 6f04cbe commit 0f674fe

File tree

3 files changed

+373
-333
lines changed

3 files changed

+373
-333
lines changed

consensus/src/marshal/actor.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,9 @@ where
315315
self.cache_verified(round, block.commitment(), block).await;
316316
}
317317
Message::Notarize { notarization } => {
318-
// DEBUG - For initial testing, we just send out _all_ of our chunks for the commitment,
319-
// to side-step self-identification.
320-
//
321-
// We should just be sending out the chunk that was sent to us by the proposer for perf.
322318
let commitment = notarization.proposal.payload;
323-
shard_layer.try_broadcast_mine(commitment).await;
319+
let index = notarization.proposal_signature.index as u16;
320+
shard_layer.try_broadcast_shard(commitment, index).await;
324321
}
325322
Message::Notarization { notarization } => {
326323
let round = notarization.round();
@@ -339,12 +336,9 @@ where
339336
}
340337
}
341338
Message::Finalize { finalization } => {
342-
// DEBUG - For initial testing, we just send out _all_ of our chunks for the commitment,
343-
// to side-step self-identification.
344-
//
345-
// We should just be sending out the chunk that was sent to us by the proposer for perf.
346339
let commitment = finalization.proposal.payload;
347-
shard_layer.try_broadcast_mine(commitment).await;
340+
let index = finalization.proposal_signature.index as u16;
341+
shard_layer.try_broadcast_shard(commitment, index).await;
348342
}
349343
Message::Finalization { finalization } => {
350344
// Cache finalization by round
@@ -592,7 +586,13 @@ where
592586
};
593587

594588
// Persist the commitment.
595-
shard_layer.put_commitment(height, digest, commitment).await;
589+
//
590+
// This operation is unsafe at the moment; we trust our peer sent us the correct
591+
// commitment for the given digest. We should instead ask for the erasure coded
592+
// chunks and reproduce the commitment ourselves.
593+
unsafe {
594+
shard_layer.put_commitment(height, digest, commitment).await;
595+
}
596596

597597
// If we have the block, persist it and its finalization.
598598
if let Some(block) = self.find_block(&mut shard_layer, commitment).await {

consensus/src/marshal/ingress/coding.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,21 @@ where
126126
}
127127
}
128128

129-
/// Broadcasts the local [Shard] of a block to all peers.
130-
///
131-
/// TODO: This should only send out the shard that was assigned to the local validator.
132-
pub async fn try_broadcast_mine(&mut self, commitment: B::Commitment) {
133-
let available_shards = self.mailbox.get(None, commitment, None).await;
129+
/// Broadcasts a local [Shard] of a block to all peers, if the shard is present.
130+
pub async fn try_broadcast_shard(&mut self, commitment: B::Commitment, index: u16) {
131+
let shard = self
132+
.mailbox
133+
.get(None, commitment, None)
134+
.await
135+
.iter()
136+
.find(|c| c.chunk.index == index)
137+
.cloned();
134138

135-
for shard in available_shards {
139+
if let Some(shard) = shard {
140+
debug!(%commitment, index, "broadcasted local shard to all peers");
136141
let _peers = self.mailbox.broadcast(Recipients::All, shard).await;
142+
} else {
143+
debug!(%commitment, index, "no local shard to broadcast" );
137144
}
138145
}
139146

@@ -178,10 +185,20 @@ where
178185
// Attempt to decode the block from the recovered data.
179186
let block = B::decode_cfg(&mut recovered.as_slice(), &self.block_codec_cfg)?;
180187

181-
self.put_commitment(block.height(), block.digest(), commitment)
182-
.await;
188+
// Persist the digest -> commitment mapping for future lookups.
189+
//
190+
// SAFETY: We just verified the block's integrity by reconstructing it from the chunks.
191+
unsafe {
192+
self.put_commitment(block.height(), block.digest(), commitment)
193+
.await;
194+
}
183195

184-
info!(%commitment, ?block, "successfully reconstructed block");
196+
info!(
197+
%commitment,
198+
digest = %block.digest(),
199+
height = block.height(),
200+
"successfully reconstructed block"
201+
);
185202

186203
Ok(Some(block))
187204
}
@@ -196,11 +213,16 @@ where
196213
_commitment: B::Commitment,
197214
_responder: oneshot::Sender<B>,
198215
) -> Result<(), ReconstructionError> {
199-
todo!("Subscribe to all chunks, reconstruct block when enough are available.");
216+
todo!("Create subscription");
200217
}
201218

202219
/// Puts a coding commitment in the store, keyed by digest and block height.
203-
pub async fn put_commitment(
220+
///
221+
/// # Safety
222+
///
223+
/// Callers of this function must ensure that the provided commitment is correct for the
224+
/// block with the given height and digest.
225+
pub async unsafe fn put_commitment(
204226
&mut self,
205227
height: u64,
206228
digest: B::Digest,

0 commit comments

Comments
 (0)