@@ -27,8 +27,8 @@ use std::{
2727} ;
2828
2929use either:: Either ;
30- use futures:: { channel:: oneshot, prelude:: * , stream :: SelectAll } ;
31- use futures_bounded:: Delay ;
30+ use futures:: { channel:: oneshot, prelude:: * } ;
31+ use futures_bounded:: { Delay , StreamSet } ;
3232use libp2p_core:: { upgrade, ConnectedPoint } ;
3333use libp2p_identity:: PeerId ;
3434use libp2p_swarm:: {
@@ -78,7 +78,8 @@ pub struct Handler {
7878 pending_messages : VecDeque < ( KadRequestMsg , QueryId ) > ,
7979
8080 /// List of active inbound substreams with the state they are in.
81- inbound_substreams : SelectAll < InboundSubstreamState > ,
81+ /// The streams are typed `InboundSubstreamState`, but the set uses the item type.
82+ inbound_substreams : StreamSet < ConnectionHandlerEvent < ProtocolConfig , ( ) , HandlerEvent > > ,
8283
8384 /// The connected endpoint of the connection that the handler
8485 /// is associated with.
@@ -120,8 +121,6 @@ enum InboundSubstreamState {
120121 PendingFlush ( UniqueConnecId , KadInStreamSink < Stream > ) ,
121122 /// The substream is being closed.
122123 Closing ( KadInStreamSink < Stream > ) ,
123- /// The substream was cancelled in favor of a new one.
124- Cancelled ,
125124
126125 Poisoned {
127126 phantom : PhantomData < QueryId > ,
@@ -174,9 +173,6 @@ impl InboundSubstreamState {
174173 | InboundSubstreamState :: Closing ( substream) => {
175174 * self = InboundSubstreamState :: Closing ( substream) ;
176175 }
177- InboundSubstreamState :: Cancelled => {
178- * self = InboundSubstreamState :: Cancelled ;
179- }
180176 InboundSubstreamState :: Poisoned { .. } => unreachable ! ( ) ,
181177 }
182178 }
@@ -462,7 +458,10 @@ impl Handler {
462458 endpoint,
463459 remote_peer_id,
464460 next_connec_unique_id : UniqueConnecId ( 0 ) ,
465- inbound_substreams : Default :: default ( ) ,
461+ inbound_substreams : StreamSet :: new (
462+ move || Delay :: futures_timer ( substreams_timeout) ,
463+ MAX_NUM_STREAMS ,
464+ ) ,
466465 outbound_substreams : futures_bounded:: FuturesTupleSet :: new (
467466 move || Delay :: futures_timer ( substreams_timeout) ,
468467 MAX_NUM_STREAMS ,
@@ -519,38 +518,45 @@ impl Handler {
519518 } ) ;
520519 }
521520
522- if self . inbound_substreams . len ( ) == MAX_NUM_STREAMS {
523- if let Some ( s) = self . inbound_substreams . iter_mut ( ) . find ( |s| {
524- matches ! (
525- s,
526- // An inbound substream waiting to be reused.
527- InboundSubstreamState :: WaitingMessage { first: false , .. }
528- )
529- } ) {
530- * s = InboundSubstreamState :: Cancelled ;
521+ let connec_unique_id = self . next_connec_unique_id ;
522+ self . next_connec_unique_id . 0 += 1 ;
523+ let new_substream = InboundSubstreamState :: WaitingMessage {
524+ first : true ,
525+ connection_id : connec_unique_id,
526+ substream : protocol,
527+ } ;
528+
529+ if self . inbound_substreams . len ( ) >= MAX_NUM_STREAMS {
530+ if let Some ( s) = self
531+ . inbound_substreams
532+ . iter_mut_of_type :: < InboundSubstreamState > ( )
533+ . find ( |s| {
534+ matches ! (
535+ * * s,
536+ // An inbound substream waiting to be reused.
537+ InboundSubstreamState :: WaitingMessage { first: false , .. }
538+ )
539+ } )
540+ {
541+ * s. get_mut ( ) = new_substream;
531542 tracing:: debug!(
532543 peer=?self . remote_peer_id,
533544 "New inbound substream to peer exceeds inbound substream limit. \
534- Removed older substream waiting to be reused."
545+ Replacing older substream that was waiting to be reused."
535546 )
536547 } else {
537548 tracing:: warn!(
538549 peer=?self . remote_peer_id,
539550 "New inbound substream to peer exceeds inbound substream limit. \
540551 No older substream waiting to be reused. Dropping new substream."
541552 ) ;
542- return ;
543553 }
554+ } else {
555+ self . inbound_substreams
556+ . try_push ( new_substream)
557+ . map_err ( |_| ( ) )
558+ . expect ( "Just checked that stream set is not full; qed" ) ;
544559 }
545-
546- let connec_unique_id = self . next_connec_unique_id ;
547- self . next_connec_unique_id . 0 += 1 ;
548- self . inbound_substreams
549- . push ( InboundSubstreamState :: WaitingMessage {
550- first : true ,
551- connection_id : connec_unique_id,
552- substream : protocol,
553- } ) ;
554560 }
555561
556562 /// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol
@@ -617,15 +623,15 @@ impl ConnectionHandler for Handler {
617623 HandlerIn :: Reset ( request_id) => {
618624 if let Some ( state) = self
619625 . inbound_substreams
620- . iter_mut ( )
621- . find ( |state| match state {
626+ . iter_mut_of_type :: < InboundSubstreamState > ( )
627+ . find ( |state| match * * state {
622628 InboundSubstreamState :: WaitingBehaviour ( conn_id, _, _) => {
623- conn_id == & request_id. connec_unique_id
629+ conn_id == request_id. connec_unique_id
624630 }
625631 _ => false ,
626632 } )
627633 {
628- state. close ( ) ;
634+ state. get_mut ( ) . close ( ) ;
629635 }
630636 }
631637 HandlerIn :: FindNodeReq { key, query_id } => {
@@ -764,8 +770,16 @@ impl ConnectionHandler for Handler {
764770 Poll :: Pending => { }
765771 }
766772
767- if let Poll :: Ready ( Some ( event) ) = self . inbound_substreams . poll_next_unpin ( cx) {
768- return Poll :: Ready ( event) ;
773+ if let Poll :: Ready ( Some ( event_result) ) = self . inbound_substreams . poll_next_unpin ( cx) {
774+ match event_result {
775+ Ok ( event) => return Poll :: Ready ( event) ,
776+ Err ( _stream_set_timeout) => {
777+ tracing:: trace!(
778+ "Inbound substream timed out waiting for peer, send, or close"
779+ ) ;
780+ continue ;
781+ }
782+ }
769783 }
770784
771785 if self . outbound_substreams . len ( ) < MAX_NUM_STREAMS {
@@ -849,8 +863,11 @@ fn compute_new_protocol_status(
849863
850864impl Handler {
851865 fn answer_pending_request ( & mut self , request_id : RequestId , mut msg : KadResponseMsg ) {
852- for state in self . inbound_substreams . iter_mut ( ) {
853- match state. try_answer_with ( request_id, msg) {
866+ for state in self
867+ . inbound_substreams
868+ . iter_mut_of_type :: < InboundSubstreamState > ( )
869+ {
870+ match state. get_mut ( ) . try_answer_with ( request_id, msg) {
854871 Ok ( ( ) ) => return ,
855872 Err ( m) => {
856873 msg = m;
@@ -1007,7 +1024,6 @@ impl futures::Stream for InboundSubstreamState {
10071024 }
10081025 } ,
10091026 InboundSubstreamState :: Poisoned { .. } => unreachable ! ( ) ,
1010- InboundSubstreamState :: Cancelled => return Poll :: Ready ( None ) ,
10111027 }
10121028 }
10131029 }
0 commit comments