diff --git a/Cargo.lock b/Cargo.lock index c1e2bd967b0..7442687ac02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2760,7 +2760,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" -version = "0.17.1" +version = "0.18.0" dependencies = [ "futures", "libp2p-core", @@ -2959,7 +2959,7 @@ dependencies = [ [[package]] name = "libp2p-relay" -version = "0.21.1" +version = "0.22.0" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index e51faa159be..3fb6d17ba3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ libp2p-identity = { version = "0.2.12" } libp2p-kad = { version = "0.49.0", path = "protocols/kad" } libp2p-mdns = { version = "0.48.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.5.0", path = "misc/memory-connection-limits" } -libp2p-metrics = { version = "0.17.1", path = "misc/metrics" } +libp2p-metrics = { version = "0.18.0", path = "misc/metrics" } libp2p-mplex = { version = "0.43.1", path = "muxers/mplex" } libp2p-noise = { version = "0.46.1", path = "transports/noise" } libp2p-peer-store = { version = "0.1.0", path = "misc/peer-store" } @@ -97,7 +97,7 @@ libp2p-ping = { version = "0.47.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.43.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.26.0", path = "transports/pnet" } libp2p-quic = { version = "0.13.0", path = "transports/quic" } -libp2p-relay = { version = "0.21.1", path = "protocols/relay" } +libp2p-relay = { version = "0.22.0", path = "protocols/relay" } libp2p-rendezvous = { version = "0.17.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.29.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.7", path = "misc/server" } diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index bab690c4d98..b2979917456 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,8 +1,11 @@ -## 0.17.1 +## 0.18.0 - Fix panic in swarm metrics when `ConnectionClosed` events are received for connections that were established before metrics collection started. See [PR 6158](https://github.com/libp2p/rust-libp2p/pull/6158). +- Add `StatusChanged` as a relay metric. + See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154). + ## 0.17.0 - Update `prometheus-client` to `0.23.0`. diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 32be1e8fdac..76379e5dbb0 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-metrics" edition.workspace = true rust-version = { workspace = true } description = "Metrics for libp2p" -version = "0.17.1" +version = "0.18.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/misc/metrics/src/relay.rs b/misc/metrics/src/relay.rs index c6b3827743c..c5ee6ec1a65 100644 --- a/misc/metrics/src/relay.rs +++ b/misc/metrics/src/relay.rs @@ -62,6 +62,7 @@ enum EventType { CircuitReqAccepted, CircuitReqAcceptFailed, CircuitClosed, + StatusChanged, } impl From<&libp2p_relay::Event> for EventType { @@ -90,6 +91,7 @@ impl From<&libp2p_relay::Event> for EventType { #[allow(deprecated)] libp2p_relay::Event::CircuitReqAcceptFailed { .. } => EventType::CircuitReqAcceptFailed, libp2p_relay::Event::CircuitClosed { .. } => EventType::CircuitClosed, + libp2p_relay::Event::StatusChanged { .. } => EventType::StatusChanged, } } } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index fde8a2a6807..e6033d5da5b 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,5 +1,9 @@ -## 0.21.1 +## 0.22.0 +- Automatically configure HOP protocol advertisement based on external addresses, with the ability to override this + functionality using `Behaviour::set_status` to explicitly set `Status::{Enable,Disable}` to enable or disable + protocol advertisement. + See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154). - reduce allocations by replacing `get_or_insert` with `get_or_insert_with` See [PR 6136](https://github.com/libp2p/rust-libp2p/pull/6136) diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 3871abbcf8a..bba481db11f 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition.workspace = true rust-version = { workspace = true } description = "Communications relaying for libp2p" -version = "0.21.1" +version = "0.22.0" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index fe6b08c876a..e9bf04adb88 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -23,10 +23,10 @@ pub(crate) mod handler; pub(crate) mod rate_limiter; use std::{ - collections::{hash_map, HashMap, HashSet, VecDeque}, + collections::{hash_map, HashMap, VecDeque}, num::NonZeroU32, ops::Add, - task::{Context, Poll}, + task::{Context, Poll, Waker}, time::Duration, }; @@ -35,6 +35,7 @@ use libp2p_core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpo use libp2p_identity::PeerId; use libp2p_swarm::{ behaviour::{ConnectionClosed, FromSwarm}, + derive_prelude::ConnectionEstablished, dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; @@ -244,6 +245,11 @@ pub enum Event { dst_peer_id: PeerId, error: Option, }, + /// Status has been changed. + /// + /// This is triggered based on if the external address + /// has been added or removed. + StatusChanged { status: Status }, } /// [`NetworkBehaviour`] implementation of the relay server @@ -253,13 +259,40 @@ pub struct Behaviour { local_peer_id: PeerId, - reservations: HashMap>, + connections: HashMap>, circuits: CircuitsTracker, /// Queue of actions to return when polled. queued_actions: VecDeque>>, external_addresses: ExternalAddresses, + + status: Status, + + auto_status_change: bool, + + waker: Option, +} + +#[derive(PartialEq, Copy, Clone, Debug)] +pub enum Status { + /// Enables advertisement of the HOP protocol + Enable, + + /// Disables advertisement of the HOP protocol + Disable, +} + +#[derive(Debug, PartialEq, Eq)] +enum Reservation { + Active, + None, +} + +impl Reservation { + pub(crate) fn is_active(&self) -> bool { + *self == Reservation::Active + } } impl Behaviour { @@ -267,13 +300,105 @@ impl Behaviour { Self { config, local_peer_id, - reservations: Default::default(), + connections: Default::default(), circuits: Default::default(), queued_actions: Default::default(), external_addresses: Default::default(), + status: Status::Disable, + auto_status_change: true, + waker: None, } } + pub fn set_status(&mut self, status: Option) { + match status { + Some(status) => { + self.auto_status_change = false; + if self.status != status { + self.status = status; + self.reconfigure_relay_status(); + } + } + None => { + self.auto_status_change = true; + self.determine_relay_status_from_external_address(); + } + } + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } + + fn reconfigure_relay_status(&mut self) { + if self.connections.is_empty() { + return; + } + + for (peer_id, connections) in self.connections.iter() { + self.queued_actions + .extend(connections.keys().map(|id| ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(*id), + event: Either::Left(handler::In::SetStatus { + status: self.status, + }), + })); + } + } + + fn determine_relay_status_from_external_address(&mut self) { + let old = self.status; + + self.status = match (self.external_addresses.as_slice(), self.status) { + ([], Status::Enable) => { + tracing::debug!("disabling protocol advertisement because we no longer have any confirmed external addresses"); + Status::Disable + } + ([], Status::Disable) => { + // Previously disabled because of no external addresses. + Status::Disable + } + (confirmed_external_addresses, Status::Disable) => { + debug_assert!( + !confirmed_external_addresses.is_empty(), + "Previous match arm handled empty list" + ); + tracing::debug!("advertising protocol because we are now externally reachable"); + Status::Enable + } + (confirmed_external_addresses, Status::Enable) => { + debug_assert!( + !confirmed_external_addresses.is_empty(), + "Previous match arm handled empty list" + ); + + Status::Enable + } + }; + + if self.status != old { + self.reconfigure_relay_status(); + self.queued_actions + .push_back(ToSwarm::GenerateEvent(Event::StatusChanged { + status: self.status, + })); + } + } + + fn on_connection_established( + &mut self, + ConnectionEstablished { + peer_id, + connection_id, + .. + }: ConnectionEstablished, + ) { + self.connections + .entry(peer_id) + .or_default() + .insert(connection_id, Reservation::None); + } + fn on_connection_closed( &mut self, ConnectionClosed { @@ -282,8 +407,8 @@ impl Behaviour { .. }: ConnectionClosed, ) { - if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) { - if peer.get_mut().remove(&connection_id) { + if let hash_map::Entry::Occupied(mut peer) = self.connections.entry(peer_id) { + if peer.get_mut().remove(&connection_id).is_some() { self.queued_actions .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed { src_peer_id: peer_id, @@ -294,6 +419,13 @@ impl Behaviour { } } + if let hash_map::Entry::Occupied(mut peer) = self.connections.entry(peer_id) { + peer.get_mut().remove(&connection_id); + if peer.get().is_empty() { + peer.remove(); + } + } + for circuit in self .circuits .remove_by_connection(peer_id, connection_id) @@ -337,6 +469,7 @@ impl NetworkBehaviour for Behaviour { local_addr: local_addr.clone(), send_back_addr: remote_addr.clone(), }, + self.status, ))) } @@ -364,14 +497,25 @@ impl NetworkBehaviour for Behaviour { role_override, port_use, }, + self.status, ))) } fn on_swarm_event(&mut self, event: FromSwarm) { - self.external_addresses.on_swarm_event(&event); + let changed = self.external_addresses.on_swarm_event(&event); - if let FromSwarm::ConnectionClosed(connection_closed) = event { - self.on_connection_closed(connection_closed) + if self.auto_status_change && changed { + self.determine_relay_status_from_external_address(); + } + + match event { + FromSwarm::ConnectionEstablished(connection_established) => { + self.on_connection_established(connection_established) + } + FromSwarm::ConnectionClosed(connection_closed) => { + self.on_connection_closed(connection_closed) + } + _ => {} } } @@ -405,16 +549,16 @@ impl NetworkBehaviour for Behaviour { // `max_reservations_per_peer`. (!renewed && self - .reservations + .connections .get(&event_source) - .map(|cs| cs.len()) + .map(|cs| cs.get(&connection).iter().filter(|status| status.is_active()).count()) .unwrap_or(0) > self.config.max_reservations_per_peer) // Deny if it exceeds `max_reservations`. || self - .reservations + .connections .values() - .map(|cs| cs.len()) + .map(|cs| cs.get(&connection).iter().filter(|status| status.is_active()).count()) .sum::() >= self.config.max_reservations // Deny if it exceeds the allowed rate of reservations. @@ -435,10 +579,10 @@ impl NetworkBehaviour for Behaviour { } } else { // Accept reservation. - self.reservations + self.connections .entry(event_source) .or_default() - .insert(connection); + .insert(connection, Reservation::Active); ToSwarm::NotifyHandler { handler: NotifyHandler::One(connection), @@ -464,10 +608,10 @@ impl NetworkBehaviour for Behaviour { handler::Event::ReservationReqAccepted { renewed } => { // Ensure local eventual consistent reservation state matches handler (source of // truth). - self.reservations - .entry(event_source) - .or_default() - .insert(connection); + self.connections + .get_mut(&event_source) + .expect("valid connection") + .insert(connection, Reservation::Active); self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::ReservationReqAccepted { @@ -503,7 +647,7 @@ impl NetworkBehaviour for Behaviour { )); } handler::Event::ReservationTimedOut {} => { - match self.reservations.entry(event_source) { + match self.connections.entry(event_source) { hash_map::Entry::Occupied(mut peer) => { peer.get_mut().remove(&connection); if peer.get().is_empty() { @@ -556,11 +700,13 @@ impl NetworkBehaviour for Behaviour { status: proto::Status::RESOURCE_LIMIT_EXCEEDED, }), } - } else if let Some(dst_conn) = self - .reservations + } else if let Some((dst_conn, status)) = self + .connections .get(&inbound_circuit_req.dst()) .and_then(|cs| cs.iter().next()) { + assert_eq!(*status, Reservation::Active); + // Accept circuit request if reservation present. let circuit_id = self.circuits.insert(Circuit { status: CircuitStatus::Accepting, @@ -718,11 +864,16 @@ impl NetworkBehaviour for Behaviour { } #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))] - fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { if let Some(to_swarm) = self.queued_actions.pop_front() { return Poll::Ready(to_swarm); } + self.waker = Some(cx.waker().clone()); + Poll::Pending } } diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index af130c35516..2560900b0bc 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -33,7 +33,10 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; use futures_timer::Delay; -use libp2p_core::{upgrade::ReadyUpgrade, ConnectedPoint, Multiaddr}; +use libp2p_core::{ + upgrade::{DeniedUpgrade, ReadyUpgrade}, + ConnectedPoint, Multiaddr, +}; use libp2p_identity::PeerId; use libp2p_swarm::{ handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound}, @@ -43,7 +46,7 @@ use libp2p_swarm::{ use web_time::Instant; use crate::{ - behaviour::CircuitId, + behaviour::{self, CircuitId}, copy_future::CopyFuture, proto, protocol::{inbound_hop, outbound_stop}, @@ -87,6 +90,9 @@ pub enum In { dst_stream: Stream, dst_pending_data: Bytes, }, + SetStatus { + status: behaviour::Status, + }, } impl fmt::Debug for In { @@ -137,6 +143,10 @@ impl fmt::Debug for In { .field("circuit_id", circuit_id) .field("dst_peer_id", dst_peer_id) .finish(), + In::SetStatus { status } => f + .debug_struct("In::SetStatus") + .field("status", status) + .finish(), } } } @@ -385,10 +395,12 @@ pub struct Handler { CircuitId, Result, >, + + status: behaviour::Status, } impl Handler { - pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler { + pub fn new(config: Config, endpoint: ConnectedPoint, status: behaviour::Status) -> Handler { Handler { inbound_workers: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, @@ -409,6 +421,7 @@ impl Handler { active_reservation: Default::default(), pending_connect_requests: Default::default(), active_connect_requests: Default::default(), + status, } } @@ -496,13 +509,18 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; - type InboundProtocol = ReadyUpgrade; + type InboundProtocol = Either, DeniedUpgrade>; type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()) + match self.status { + behaviour::Status::Enable => { + SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(HOP_PROTOCOL_NAME)), ()) + } + behaviour::Status::Disable => SubstreamProtocol::new(Either::Right(DeniedUpgrade), ()), + } } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { @@ -594,6 +612,7 @@ impl ConnectionHandler for Handler { .boxed(), ); } + In::SetStatus { status } => self.status = status, } } @@ -890,7 +909,7 @@ impl ConnectionHandler for Handler { ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol: stream, + protocol: futures::future::Either::Left(stream), .. }) => { self.on_fully_negotiated_inbound(stream); diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 515fb40ef4b..13bcee45204 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -39,7 +39,9 @@ mod proto { }; } -pub use behaviour::{rate_limiter::RateLimiter, Behaviour, CircuitId, Config, Event, StatusCode}; +pub use behaviour::{ + rate_limiter::RateLimiter, Behaviour, CircuitId, Config, Event, Status, StatusCode, +}; pub use protocol::{HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; /// Types related to the relay protocol inbound.