@@ -339,101 +339,75 @@ where
339339 let change_set = self
340340 . relay_client
341341 . rpc ( )
342- . query_storage ( keys. clone ( ) , start, Some ( latest_finalized_hash) )
343- . await ?;
344-
345- let mut change_set_join_set: JoinSet < Result < Option < _ > , anyhow:: Error > > = JoinSet :: new ( ) ;
346- let mut parachain_headers_with_proof = BTreeMap :: < H256 , ParachainHeaderProofs > :: default ( ) ;
347- log:: debug!( target: "hyperspace" , "Got {} authority set changes" , change_set. len( ) ) ;
348-
349- fn clone_storage_change_sets < T : light_client_common:: config:: Config + Send + Sync > (
350- changes : & [ StorageChangeSet < T :: Hash > ] ,
351- ) -> Vec < StorageChangeSet < T :: Hash > > {
352- changes
353- . iter ( )
354- . map ( |change| StorageChangeSet {
355- block : change. block . clone ( ) ,
356- changes : change. changes . clone ( ) ,
357- } )
358- . collect ( )
359- }
342+ . query_storage ( keys. clone ( ) , latest_finalized_hash, Some ( latest_finalized_hash) )
343+ . await ?
344+ . pop ( )
345+ . unwrap ( ) ;
346+
347+ log:: debug!( target: "hyperspace" , "Got {} authority set changes" , change_set. changes. len( ) ) ;
348+
349+ let change = StorageChangeSet {
350+ block : change_set. block . clone ( ) ,
351+ changes : change_set. changes . clone ( ) ,
352+ } ;
360353 let latest_para_height = Arc :: new ( AtomicU32 :: new ( 0u32 ) ) ;
361- for changes in change_set. chunks ( PROCESS_CHANGES_SET_BATCH_SIZE ) {
362- for change in clone_storage_change_sets :: < T > ( changes) {
363- let header_numbers = header_numbers. clone ( ) ;
364- let keys = vec ! [ para_storage_key. clone( ) ] ;
365- let client = self . clone ( ) ;
366- let to = self . rpc_call_delay . as_millis ( ) ;
367- let duration1 = Duration :: from_millis ( rand:: thread_rng ( ) . gen_range ( 1 ..to) as u64 ) ;
368- let latest_para_height = latest_para_height. clone ( ) ;
369- change_set_join_set. spawn ( async move {
370- sleep ( duration1) . await ;
371- let header = client
372- . relay_client
373- . rpc ( )
374- . header ( Some ( change. block ) )
375- . await ?
376- . ok_or_else ( || anyhow ! ( "block not found {:?}" , change. block) ) ?;
377-
378- let parachain_header_bytes = {
379- let key = T :: Storage :: paras_heads ( client. para_id ) ;
380- let data = client
381- . relay_client
382- . storage ( )
383- . at ( header. hash ( ) )
384- . fetch ( & key)
385- . await ?
386- . expect ( "Header exists in its own changeset; qed" ) ;
387- <T :: Storage as RuntimeStorage >:: HeadData :: from_inner ( data)
388- } ;
389-
390- let para_header: T :: Header =
391- Decode :: decode ( & mut parachain_header_bytes. as_ref ( ) ) ?;
392- let para_block_number = para_header. number ( ) ;
393- // skip genesis header or any unknown headers
394- if para_block_number == Zero :: zero ( ) ||
395- !header_numbers. contains ( & para_block_number)
396- {
397- return Ok ( None )
398- }
399-
400- let state_proof = client
401- . relay_client
402- . rpc ( )
403- . read_proof ( keys. iter ( ) . map ( AsRef :: as_ref) , Some ( header. hash ( ) ) )
404- . await ?
405- . proof
406- . into_iter ( )
407- . map ( |p| p. 0 )
408- . collect ( ) ;
409-
410- let TimeStampExtWithProof { ext : extrinsic, proof : extrinsic_proof } =
411- fetch_timestamp_extrinsic_with_proof (
412- & client. para_client ,
413- Some ( para_header. hash ( ) ) ,
414- )
415- . await
416- . map_err ( |err| anyhow ! ( "Error fetching timestamp with proof: {err:?}" ) ) ?;
417- let proofs = ParachainHeaderProofs { state_proof, extrinsic, extrinsic_proof } ;
418- latest_para_height. fetch_max ( u32:: from ( para_block_number) , Ordering :: SeqCst ) ;
419- Ok ( Some ( ( H256 :: from ( header. hash ( ) ) , proofs) ) )
420- } ) ;
421- }
422354
423- while let Some ( res) = change_set_join_set. join_next ( ) . await {
424- if let Some ( ( hash, proofs) ) = res?? {
425- parachain_headers_with_proof. insert ( hash, proofs) ;
426- }
427- }
355+ let header_numbers = header_numbers. clone ( ) ;
356+ let keys = vec ! [ para_storage_key. clone( ) ] ;
357+ let client = self . clone ( ) ;
358+ let to = self . rpc_call_delay . as_millis ( ) ;
359+ let duration1 = Duration :: from_millis ( rand:: thread_rng ( ) . gen_range ( 1 ..to) as u64 ) ;
360+ let latest_para_height = latest_para_height. clone ( ) ;
361+ let header = client
362+ . relay_client
363+ . rpc ( )
364+ . header ( Some ( change. block ) )
365+ . await ?
366+ . ok_or_else ( || anyhow ! ( "block not found {:?}" , change. block) ) ?;
367+
368+ let parachain_header_bytes = {
369+ let key = T :: Storage :: paras_heads ( client. para_id ) ;
370+ let data = client
371+ . relay_client
372+ . storage ( )
373+ . at ( header. hash ( ) )
374+ . fetch ( & key)
375+ . await ?
376+ . expect ( "Header exists in its own changeset; qed" ) ;
377+ <T :: Storage as RuntimeStorage >:: HeadData :: from_inner ( data)
378+ } ;
379+
380+ let para_header: T :: Header = Decode :: decode ( & mut parachain_header_bytes. as_ref ( ) ) ?;
381+ let para_block_number = para_header. number ( ) ;
382+ // skip genesis header or any unknown headers
383+ if para_block_number == Zero :: zero ( ) || !header_numbers. contains ( & para_block_number) {
384+ return Err ( anyhow ! ( "genesis header or unknown header" ) )
428385 }
429386
387+ let state_proof = client
388+ . relay_client
389+ . rpc ( )
390+ . read_proof ( keys. iter ( ) . map ( AsRef :: as_ref) , Some ( header. hash ( ) ) )
391+ . await ?
392+ . proof
393+ . into_iter ( )
394+ . map ( |p| p. 0 )
395+ . collect ( ) ;
396+
397+ let TimeStampExtWithProof { ext : extrinsic, proof : extrinsic_proof } =
398+ fetch_timestamp_extrinsic_with_proof ( & client. para_client , Some ( para_header. hash ( ) ) )
399+ . await
400+ . map_err ( |err| anyhow ! ( "Error fetching timestamp with proof: {err:?}" ) ) ?;
401+ let proofs = ParachainHeaderProofs { state_proof, extrinsic, extrinsic_proof } ;
402+ latest_para_height. fetch_max ( u32:: from ( para_block_number) , Ordering :: SeqCst ) ;
403+
430404 unknown_headers. sort_by_key ( |header| header. number ( ) ) ;
431405 // overwrite unknown headers
432406 finality_proof. unknown_headers = unknown_headers;
433407
434408 Ok ( ParachainHeadersWithFinalityProof {
435409 finality_proof,
436- parachain_headers : parachain_headers_with_proof ,
410+ parachain_header : ( H256 :: from ( header . hash ( ) ) , proofs ) ,
437411 latest_para_height : latest_para_height. load ( Ordering :: SeqCst ) ,
438412 } )
439413 }
0 commit comments