@@ -35,7 +35,7 @@ use libp2p_swarm::{
3535 ConnectionHandler , ConnectionHandlerEvent , Stream , StreamUpgradeError , SubstreamProtocol ,
3636 SupportedProtocols ,
3737} ;
38- use std:: collections:: VecDeque ;
38+ use std:: collections:: { HashMap , VecDeque } ;
3939use std:: task:: Waker ;
4040use std:: time:: Duration ;
4141use std:: { error, fmt, io, marker:: PhantomData , pin:: Pin , task:: Context , task:: Poll } ;
@@ -98,6 +98,7 @@ struct ProtocolStatus {
9898}
9999
100100/// State of an active inbound substream.
101+ #[ derive( Debug ) ]
101102enum InboundSubstreamState {
102103 /// Waiting for a request from the remote.
103104 WaitingMessage {
@@ -173,6 +174,23 @@ impl InboundSubstreamState {
173174 InboundSubstreamState :: Poisoned { .. } => unreachable ! ( ) ,
174175 }
175176 }
177+
178+ /// Returns the `KadInStreamSink` associated with this state, or `None` if the state has no substream.
179+ fn sink ( & self ) -> Option < & KadInStreamSink < Stream > > {
180+ match self {
181+ InboundSubstreamState :: WaitingMessage { substream, .. }
182+ | InboundSubstreamState :: WaitingBehaviour ( _, substream, _)
183+ | InboundSubstreamState :: PendingSend ( _, substream, _)
184+ | InboundSubstreamState :: PendingFlush ( _, substream)
185+ | InboundSubstreamState :: Closing ( substream) => Some ( substream) ,
186+ InboundSubstreamState :: Cancelled | InboundSubstreamState :: Poisoned { .. } => None ,
187+ }
188+ }
189+
190+ /// Returns the `Stream` associated with this state, or `None` if the state has no substream.
191+ fn stream ( & self ) -> Option < & Stream > {
192+ self . sink ( ) . map ( |v| & * * v)
193+ }
176194}
177195
178196/// Event produced by the Kademlia handler.
@@ -524,15 +542,23 @@ impl Handler {
524542 InboundSubstreamState :: WaitingMessage { first: false , .. }
525543 )
526544 } ) {
545+ let prev_state = format ! ( "{s:?}" ) ;
546+
527547 * s = InboundSubstreamState :: Cancelled ;
548+
528549 tracing:: debug!(
529550 peer=?self . remote_peer_id,
551+ ?prev_state,
552+ ?MAX_NUM_STREAMS ,
553+ active_streams=?self . debug_inbound_substreams( ) ,
530554 "New inbound substream to peer exceeds inbound substream limit. \
531555 Removed older substream waiting to be reused."
532- )
556+ ) ;
533557 } else {
534558 tracing:: warn!(
535559 peer=?self . remote_peer_id,
560+ ?MAX_NUM_STREAMS ,
561+ active_streams=?self . debug_inbound_substreams( ) ,
536562 "New inbound substream to peer exceeds inbound substream limit. \
537563 No older substream waiting to be reused. Dropping new substream."
538564 ) ;
@@ -550,6 +576,30 @@ impl Handler {
550576 } ) ;
551577 }
552578
579+ /// Returns a summary of the protocols of the inbound substreams.
580+ fn debug_inbound_substreams ( & self ) -> String {
581+ use libp2p_core:: muxing:: AsyncReadWrite ;
582+
583+ let mut protocols = HashMap :: < _ , usize > :: new ( ) ;
584+ let mut stream_types = HashMap :: < _ , usize > :: new ( ) ;
585+
586+ for substream in & self . inbound_substreams {
587+ let state = substream. stream ( ) . map ( |s| & s. stream . state ) ;
588+ let protocol = state. and_then ( |s| s. protocol ( ) ) ;
589+ let stream_type = state. and_then ( |s| s. inner ( ) ) . map ( |i| i. type_name ( ) ) ;
590+
591+ if let Some ( protocol) = protocol {
592+ * protocols. entry ( protocol) . or_default ( ) += 1 ;
593+ }
594+
595+ if let Some ( stream_type) = stream_type {
596+ * stream_types. entry ( stream_type) . or_default ( ) += 1 ;
597+ }
598+ }
599+
600+ format ! ( "protocols: {protocols:?}, stream types: {stream_types:?}" )
601+ }
602+
553603 /// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol handshake using a [`oneshot::channel`].
554604 fn queue_new_stream ( & mut self , id : QueryId , msg : KadRequestMsg ) {
555605 let ( sender, receiver) = oneshot:: channel ( ) ;
0 commit comments