Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ libp2p-floodsub = { version = "0.47.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.50.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.47.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.12" }
libp2p-kad = { version = "0.49.0", path = "protocols/kad" }
libp2p-kad = { version = "0.49.1", path = "protocols/kad" }
libp2p-mdns = { version = "0.48.0", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.5.0", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.17.0", path = "misc/metrics" }
Expand Down Expand Up @@ -121,7 +121,8 @@ libp2p-yamux = { version = "0.47.0", path = "muxers/yamux" }
asynchronous-codec = { version = "0.7.0" }
env_logger = "0.11"
futures = "0.3.30"
futures-bounded = { version = "0.2.4" }
# TODO: replace with version = "0.3.1" once released upstream
futures-bounded = { git = "https://github.com/thomaseizinger/rust-futures-bounded", rev = "012803d343b5c604e65d3c238a8cd7a145616447", features = ["futures-timer"] }
futures-rustls = { version = "0.26.0", default-features = false }
getrandom = "0.2"
if-watch = "3.2.1"
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/client/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use futures::channel::oneshot;
use futures_bounded::StreamSet;
use futures_bounded::{Delay, StreamSet};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError},
Expand All @@ -22,7 +22,7 @@ pub struct Handler {
impl Handler {
pub(crate) fn new() -> Self {
Self {
inbound: StreamSet::new(Duration::from_secs(5), 2),
inbound: StreamSet::new(|| Delay::futures_timer(Duration::from_secs(5)), 2),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/client/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use futures::{channel::oneshot, AsyncWrite};
use futures_bounded::FuturesMap;
use futures_bounded::{Delay, FuturesMap};
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
Multiaddr,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Handler {
pub(crate) fn new() -> Self {
Self {
queued_events: VecDeque::new(),
outbound: FuturesMap::new(Duration::from_secs(10), 10),
outbound: FuturesMap::new(|| Delay::futures_timer(Duration::from_secs(10)), 10),
queued_streams: VecDeque::default(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/server/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use futures::{AsyncRead, AsyncWrite};
use futures_bounded::FuturesSet;
use futures_bounded::{Delay, FuturesSet};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedOutbound},
Expand All @@ -33,7 +33,7 @@ impl Handler {
Self {
pending_nonce: Some(cmd),
requested_substream_nonce: None,
outbound: FuturesSet::new(Duration::from_secs(10), 5),
outbound: FuturesSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 5),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/v2/server/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{
channel::{mpsc, oneshot},
AsyncRead, AsyncWrite, SinkExt, StreamExt,
};
use futures_bounded::FuturesSet;
use futures_bounded::{Delay, FuturesSet};
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
Multiaddr,
Expand Down Expand Up @@ -64,7 +64,7 @@ where
observed_multiaddr,
dial_back_cmd_sender,
dial_back_cmd_receiver,
inbound: FuturesSet::new(Duration::from_secs(10), 10),
inbound: FuturesSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 10),
rng,
}
}
Expand Down
11 changes: 9 additions & 2 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{

use either::Either;
use futures::future;
use futures_bounded::Delay;
use libp2p_core::{
multiaddr::Multiaddr,
upgrade::{DeniedUpgrade, ReadyUpgrade},
Expand Down Expand Up @@ -87,8 +88,14 @@ impl Handler {
Self {
endpoint,
queued_events: Default::default(),
inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
inbound_stream: futures_bounded::FuturesSet::new(
|| Delay::futures_timer(Duration::from_secs(10)),
1,
),
outbound_stream: futures_bounded::FuturesSet::new(
|| Delay::futures_timer(Duration::from_secs(10)),
1,
),
holepunch_candidates,
attempts: 0,
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Handler {
remote_peer_id,
events: SmallVec::new(),
active_streams: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
trigger_next_identify: Delay::new(Duration::ZERO),
Expand Down
5 changes: 5 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.49.1

- Enforce an inbound substream timeout in the kad substream handler.
See [PR 6009](https://github.com/libp2p/rust-libp2p/pull/6009).

## 0.49.0

- Remove no longer constructed GetRecordError::QuorumFailed.
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-kad"
edition.workspace = true
rust-version = { workspace = true }
description = "Kademlia protocol for libp2p"
version = "0.49.0"
version = "0.49.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
93 changes: 55 additions & 38 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::{
};

use either::Either;
use futures::{channel::oneshot, prelude::*, stream::SelectAll};
use futures::{channel::oneshot, prelude::*};
use futures_bounded::{Delay, StreamSet};
use libp2p_core::{upgrade, ConnectedPoint};
use libp2p_identity::PeerId;
use libp2p_swarm::{
Expand Down Expand Up @@ -77,7 +78,8 @@ pub struct Handler {
pending_messages: VecDeque<(KadRequestMsg, QueryId)>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState>,
/// The streams are typed `InboundSubstreamState`, but the set uses the item type.
inbound_substreams: StreamSet<ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>>,

/// The connected endpoint of the connection that the handler
/// is associated with.
Expand Down Expand Up @@ -119,8 +121,6 @@ enum InboundSubstreamState {
PendingFlush(UniqueConnecId, KadInStreamSink<Stream>),
/// The substream is being closed.
Closing(KadInStreamSink<Stream>),
/// The substream was cancelled in favor of a new one.
Cancelled,

Poisoned {
phantom: PhantomData<QueryId>,
Expand Down Expand Up @@ -173,9 +173,6 @@ impl InboundSubstreamState {
| InboundSubstreamState::Closing(substream) => {
*self = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::Cancelled => {
*self = InboundSubstreamState::Cancelled;
}
InboundSubstreamState::Poisoned { .. } => unreachable!(),
}
}
Expand Down Expand Up @@ -461,9 +458,12 @@ impl Handler {
endpoint,
remote_peer_id,
next_connec_unique_id: UniqueConnecId(0),
inbound_substreams: Default::default(),
inbound_substreams: StreamSet::new(
move || Delay::futures_timer(substreams_timeout),
MAX_NUM_STREAMS,
),
outbound_substreams: futures_bounded::FuturesTupleSet::new(
substreams_timeout,
move || Delay::futures_timer(substreams_timeout),
MAX_NUM_STREAMS,
),
pending_streams: Default::default(),
Expand Down Expand Up @@ -518,38 +518,45 @@ impl Handler {
});
}

if self.inbound_substreams.len() == MAX_NUM_STREAMS {
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
matches!(
s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage { first: false, .. }
)
}) {
*s = InboundSubstreamState::Cancelled;
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
let new_substream = InboundSubstreamState::WaitingMessage {
first: true,
connection_id: connec_unique_id,
substream: protocol,
};

if self.inbound_substreams.len() >= MAX_NUM_STREAMS {
if let Some(s) = self
.inbound_substreams
.iter_mut_of_type::<InboundSubstreamState>()
.find(|s| {
matches!(
**s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage { first: false, .. }
)
})
{
*s.get_mut() = new_substream;
tracing::debug!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
Removed older substream waiting to be reused."
Replacing older substream that was waiting to be reused."
)
} else {
tracing::warn!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
No older substream waiting to be reused. Dropping new substream."
);
return;
}
} else {
self.inbound_substreams
.try_push(new_substream)
.map_err(|_| ())
.expect("Just checked that stream set is not full; qed");
}

let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.inbound_substreams
.push(InboundSubstreamState::WaitingMessage {
first: true,
connection_id: connec_unique_id,
substream: protocol,
});
}

/// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol
Expand Down Expand Up @@ -616,15 +623,15 @@ impl ConnectionHandler for Handler {
HandlerIn::Reset(request_id) => {
if let Some(state) = self
.inbound_substreams
.iter_mut()
.find(|state| match state {
.iter_mut_of_type::<InboundSubstreamState>()
.find(|state| match **state {
InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => {
conn_id == &request_id.connec_unique_id
conn_id == request_id.connec_unique_id
}
_ => false,
})
{
state.close();
state.get_mut().close();
}
}
HandlerIn::FindNodeReq { key, query_id } => {
Expand Down Expand Up @@ -763,8 +770,16 @@ impl ConnectionHandler for Handler {
Poll::Pending => {}
}

if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
if let Poll::Ready(Some(event_result)) = self.inbound_substreams.poll_next_unpin(cx) {
match event_result {
Ok(event) => return Poll::Ready(event),
Err(_stream_set_timeout) => {
tracing::trace!(
"Inbound substream timed out waiting for peer, send, or close"
);
continue;
}
}
}
Comment on lines -766 to 783
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what to do with an inbound substream timeout.
We could:

  • explicitly close the substream
  • return some synthetic inbound timeout event
  • something else???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current solution to log the event is fine IMO.


if self.outbound_substreams.len() < MAX_NUM_STREAMS {
Expand Down Expand Up @@ -848,8 +863,11 @@ fn compute_new_protocol_status(

impl Handler {
fn answer_pending_request(&mut self, request_id: RequestId, mut msg: KadResponseMsg) {
for state in self.inbound_substreams.iter_mut() {
match state.try_answer_with(request_id, msg) {
for state in self
.inbound_substreams
.iter_mut_of_type::<InboundSubstreamState>()
{
match state.get_mut().try_answer_with(request_id, msg) {
Ok(()) => return,
Err(m) => {
msg = m;
Expand Down Expand Up @@ -1006,7 +1024,6 @@ impl futures::Stream for InboundSubstreamState {
}
},
InboundSubstreamState::Poisoned { .. } => unreachable!(),
InboundSubstreamState::Cancelled => return Poll::Ready(None),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use futures::FutureExt;
use futures_bounded::Delay;
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{
Expand All @@ -49,7 +50,7 @@ impl Handler {
pub fn new() -> Self {
Self {
inbound: futures_bounded::FuturesSet::new(
crate::RUN_TIMEOUT,
|| Delay::futures_timer(crate::RUN_TIMEOUT),
crate::MAX_PARALLEL_RUNS_PER_CONNECTION,
),
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,11 @@ impl Handler {
pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler {
Handler {
inbound_workers: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
outbound_workers: futures_bounded::FuturesMap::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
endpoint,
Expand Down
8 changes: 4 additions & 4 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ impl Handler {
queued_events: Default::default(),
pending_streams: Default::default(),
inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new(
STREAM_TIMEOUT,
|| futures_bounded::Delay::futures_timer(STREAM_TIMEOUT),
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new(
DENYING_CIRCUIT_TIMEOUT,
|| futures_bounded::Delay::futures_timer(DENYING_CIRCUIT_TIMEOUT),
MAX_NUMBER_DENYING_CIRCUIT,
),
reservation: Reservation::None,
Expand Down
Loading