@@ -39,11 +39,6 @@ struct BlockSubscription<B: Block> {
3939 subscribers : Vec < oneshot:: Sender < B > > ,
4040}
4141
42- /// A subscription for a chunk by its commitment and index.
43- struct ChunkSubscription < H : Hasher > {
44- subscribers : Vec < oneshot:: Sender < Chunk < H > > > ,
45- }
46-
4742/// A layer that handles receiving erasure coded [Block]s from the [Actor](super::super::actor::Actor),
4843/// broadcasting them to peers, and reassembling them from received [Shard]s.
4944pub struct ShardLayer < P , B , H >
@@ -131,6 +126,9 @@ where
131126 . map ( |c| c. chunk )
132127 . collect :: < Vec < _ > > ( ) ;
133128
129+ // TODO: Make sure min is all valid chunks
130+ // TODO: If we do encounter a block that's invalid, block the peer.
131+
134132 if coded_chunks. len ( ) < min as usize {
135133 // Not enough chunks to recover the block yet.
136134 debug ! (
@@ -149,9 +147,9 @@ where
149147 // Attempt to decode the block from the recovered data.
150148 let block = B :: decode_cfg ( & mut recovered. as_slice ( ) , & self . block_codec_cfg ) ?;
151149
152- // Attempt to resolve any open subscriptions for this block.
153- if let Some ( mut subs ) = self . block_subscriptions . remove ( & commitment) {
154- for sub in subs . subscribers . drain ( ..) {
150+ // Notify any subscribers that have been waiting for this block.
151+ if let Some ( mut sub ) = self . block_subscriptions . remove ( & commitment) {
152+ for sub in sub . subscribers . drain ( ..) {
155153 let _ = sub. send ( block. clone ( ) ) ;
156154 }
157155 }
@@ -198,10 +196,7 @@ where
198196 commitment : B :: Commitment ,
199197 index : u16 ,
200198 ) -> Option < Shard < B , H > > {
201- let mut buf = vec ! [ 0u8 ; <H :: Digest as FixedSize >:: SIZE + 2 ] ;
202- buf[ ..<H :: Digest as FixedSize >:: SIZE ] . copy_from_slice ( commitment. as_ref ( ) ) ;
203- buf[ <H :: Digest as FixedSize >:: SIZE ..] . copy_from_slice ( index. to_le_bytes ( ) . as_ref ( ) ) ;
204- let index_hash = H :: hash ( buf. as_ref ( ) ) ;
199+ let index_hash = shard_uuid :: < B , H > ( commitment, index) ;
205200 self . mailbox
206201 . get ( None , commitment, Some ( index_hash) )
207202 . await
@@ -221,10 +216,7 @@ where
221216 index : u16 ,
222217 responder : oneshot:: Sender < Shard < B , H > > ,
223218 ) {
224- let mut buf = vec ! [ 0u8 ; <H :: Digest as FixedSize >:: SIZE + 2 ] ;
225- buf[ ..<H :: Digest as FixedSize >:: SIZE ] . copy_from_slice ( commitment. as_ref ( ) ) ;
226- buf[ <H :: Digest as FixedSize >:: SIZE ..] . copy_from_slice ( index. to_le_bytes ( ) . as_ref ( ) ) ;
227- let index_hash = H :: hash ( buf. as_ref ( ) ) ;
219+ let index_hash = shard_uuid :: < B , H > ( commitment, index) ;
228220 self . mailbox
229221 . subscribe_prepared ( None , commitment, Some ( index_hash) , responder)
230222 . await ;
@@ -320,11 +312,7 @@ where
320312 type Digest = H :: Digest ;
321313
322314 fn digest ( & self ) -> Self :: Digest {
323- let mut buf = vec ! [ 0u8 ; <H :: Digest as FixedSize >:: SIZE + 2 ] ;
324- buf[ ..<H :: Digest as FixedSize >:: SIZE ] . copy_from_slice ( self . commitment . as_ref ( ) ) ;
325- buf[ <H :: Digest as FixedSize >:: SIZE ..]
326- . copy_from_slice ( self . chunk . index . to_le_bytes ( ) . as_ref ( ) ) ;
327- H :: hash ( buf. as_ref ( ) )
315+ shard_uuid :: < B , H > ( self . commitment , self . chunk . index )
328316 }
329317}
330318
@@ -407,31 +395,32 @@ where
407395 config : ( u16 , u16 ) ,
408396 /// The erasure coding commitment.
409397 commitment : H :: Digest ,
398+ /// The coded chunks.
399+ chunks : Vec < Chunk < H > > ,
410400}
411401
412402impl < B , H > CodedBlock < B , H >
413403where
414404 B : Block < Digest = H :: Digest , Commitment = H :: Digest > ,
415405 H : Hasher ,
416406{
417- /// Erasure codes the block to create the commitment .
418- fn commit ( inner : & B , config : ( u16 , u16 ) ) -> H :: Digest {
407+ /// Erasure codes the block.
408+ fn encode ( inner : & B , config : ( u16 , u16 ) ) -> ( H :: Digest , Vec < Chunk < H > > ) {
419409 let mut buf = Vec :: with_capacity ( config. encode_size ( ) + inner. encode_size ( ) ) ;
420410 inner. write ( & mut buf) ;
421411 config. write ( & mut buf) ;
422412
423- let ( commitment, _) =
424- reed_solomon:: encode :: < H > ( config. 0 , config. 1 , buf) . expect ( "failed to commit to block" ) ;
425- commitment
413+ reed_solomon:: encode :: < H > ( config. 0 , config. 1 , buf) . expect ( "failed to commit to block" )
426414 }
427415
428416 /// Create a new [CodedBlock] from a [Block] and a configuration.
429417 pub fn new ( inner : B , config : ( u16 , u16 ) ) -> Self {
430- let commitment = Self :: commit ( & inner, config) ;
418+ let ( commitment, chunks ) = Self :: encode ( & inner, config) ;
431419 Self {
432420 inner,
433421 config,
434422 commitment,
423+ chunks,
435424 }
436425 }
437426
@@ -444,6 +433,16 @@ where
444433 pub fn take_inner ( self ) -> B {
445434 self . inner
446435 }
436+
437+ /// Returns the erasure coding configuration.
438+ pub fn config ( & self ) -> ( u16 , u16 ) {
439+ self . config
440+ }
441+
442+ /// Returns a reference to the coded chunks.
443+ pub fn chunks ( & self ) -> & [ Chunk < H > ] {
444+ self . chunks . as_slice ( )
445+ }
447446}
448447
449448impl < B , H > Write for CodedBlock < B , H >
@@ -470,12 +469,13 @@ where
470469 ) -> Result < Self , commonware_codec:: Error > {
471470 let inner = B :: read_cfg ( buf, cfg) ?;
472471 let config = <( u16 , u16 ) >:: read_cfg ( buf, & ( ( ) , ( ) ) ) ?;
473- let commitment = Self :: commit ( & inner, config) ;
472+ let ( commitment, chunks ) = Self :: encode ( & inner, config) ;
474473
475474 Ok ( Self {
476475 inner,
477476 config,
478477 commitment,
478+ chunks,
479479 } )
480480 }
481481}
@@ -547,6 +547,18 @@ where
547547{
548548}
549549
550+ /// Creates a unique identifier for a shard based on the block commitment and shard index.
551+ fn shard_uuid < B , H > ( commitment : B :: Commitment , index : u16 ) -> H :: Digest
552+ where
553+ B : Block < Digest = H :: Digest , Commitment = H :: Digest > ,
554+ H : Hasher ,
555+ {
556+ let mut buf = vec ! [ 0u8 ; H :: Digest :: SIZE + u16 :: SIZE ] ;
557+ buf[ ..H :: Digest :: SIZE ] . copy_from_slice ( commitment. as_ref ( ) ) ;
558+ buf[ H :: Digest :: SIZE ..] . copy_from_slice ( index. to_le_bytes ( ) . as_ref ( ) ) ;
559+ H :: hash ( buf. as_ref ( ) )
560+ }
561+
550562#[ cfg( test) ]
551563mod test {
552564 use super :: * ;
0 commit comments