Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1106,32 +1106,38 @@ where
// However, we still respect max_connections - this only applies when there's capacity.
const EARLY_NETWORK_THRESHOLD: usize = 4;
let has_capacity = num_connections + num_reserved < max_connections;
let is_early_network = is_gateway && accepted && num_connections < EARLY_NETWORK_THRESHOLD;

if num_connections == 0 || (is_early_network && has_capacity) {
if num_reserved == 1 && is_gateway && accepted {
tracing::info!(
tx = %id,
joiner = %joiner.peer,
connections = num_connections,
has_capacity = %has_capacity,
"Gateway early network: accepting connection directly (will register immediately)",
);
let connectivity_info = ConnectivityInfo::new_bootstrap(
joiner.clone(),
1, // Single check for direct connection
);
return Ok(Some(ConnectState::AwaitingConnectivity(connectivity_info)));
} else if num_connections == 0 {
if is_gateway
&& accepted
&& (num_connections == 0 || (num_connections < EARLY_NETWORK_THRESHOLD && has_capacity))
{
if num_reserved != 1 {
tracing::debug!(
tx = %id,
joiner = %joiner.peer,
is_gateway = %is_gateway,
num_reserved = %num_reserved,
"Cannot forward or accept: no existing connections, or reserved connections pending",
num_reserved,
"Gateway bootstrap registration proceeding despite reserved count"
);
return Ok(None);
}
tracing::info!(
tx = %id,
joiner = %joiner.peer,
connections = num_connections,
has_capacity = %has_capacity,
"Gateway early network: accepting connection directly (will register immediately)",
);
let connectivity_info = ConnectivityInfo::new_bootstrap(joiner.clone(), 1); // Single check for direct connection
return Ok(Some(ConnectState::AwaitingConnectivity(connectivity_info)));
}

if num_connections == 0 {
tracing::debug!(
tx = %id,
joiner = %joiner.peer,
is_gateway = %is_gateway,
num_reserved = %num_reserved,
"Cannot forward or accept: no existing connections, or reserved connections pending",
);
return Ok(None);
}

// Try to forward the connection request to an existing peer
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/ring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,6 @@ impl Ring {
);
error
})?;

if live_tx.is_none() {
let conns = self.connection_manager.get_open_connections();
tracing::warn!(
Expand Down
86 changes: 36 additions & 50 deletions crates/core/tests/connectivity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::bail;
use anyhow::{bail, Context};
use freenet::test_utils::{self, make_get, make_put, TestContext};
use freenet_macros::freenet_test;
use freenet_stdlib::{
Expand Down Expand Up @@ -273,6 +273,18 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu
let peer1 = ctx.node("peer1")?;
let peer2 = ctx.node("peer2")?;

let peer1_public_port = peer1.network_port.context(
"peer1 missing network port; auto_connect_peers requires public_port for mesh connectivity",
)?;
let peer2_public_port = peer2.network_port.context(
"peer2 missing network port; auto_connect_peers requires public_port for mesh connectivity",
)?;
tracing::info!(
peer1_port = peer1_public_port,
peer2_port = peer2_public_port,
"Verified peer network ports for direct connectivity"
);

let gateway_ws_port = gateway.ws_port;
let peer1_ws_port = peer1.ws_port;
let peer2_ws_port = peer2.ws_port;
Expand All @@ -298,34 +310,14 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu

// Retry loop to wait for full mesh connectivity
const MAX_RETRIES: usize = 30;
const DIRECT_WAIT_ATTEMPTS: usize = 3;
const RETRY_DELAY: Duration = Duration::from_secs(1);
let mut retry_count = 0;
let mut direct_mesh_established = false;
let mut fell_back_to_gateway = false;
let mut minimal_connectivity_ready = false;

loop {
retry_count += 1;
if retry_count > MAX_RETRIES {
if minimal_connectivity_ready {
tracing::warn!(
"Max retries ({}) reached; continuing with gateway-mediated topology.",
MAX_RETRIES
);
fell_back_to_gateway = true;
break;
} else {
bail!(
"Failed to establish minimum connectivity after {} seconds",
MAX_RETRIES * 2
);
}
}
let mut mesh_established = false;
let mut last_snapshot = (String::new(), String::new(), String::new());

for attempt in 1..=MAX_RETRIES {
tracing::info!(
"Attempt {}/{}: Querying all nodes for connected peers...",
retry_count,
attempt,
MAX_RETRIES
);

Expand Down Expand Up @@ -367,45 +359,39 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu
tracing::debug!("Peer1 peers: {:?}", peer1_peers);
tracing::debug!("Peer2 peers: {:?}", peer2_peers);

last_snapshot = (
format!("{:?}", gw_peers),
format!("{:?}", peer1_peers),
format!("{:?}", peer2_peers),
);

let gateway_sees_all = gw_peers.len() >= 2;
let peer1_connected = !peer1_peers.is_empty();
let peer2_connected = !peer2_peers.is_empty();
let peer1_direct = peer1_peers.len() >= 2;
let peer2_direct = peer2_peers.len() >= 2;

if gateway_sees_all && peer1_connected && peer2_connected {
minimal_connectivity_ready = true;
if peer1_direct && peer2_direct {
tracing::info!("✅ Full mesh connectivity established!");
direct_mesh_established = true;
break;
}
}

if !direct_mesh_established
&& minimal_connectivity_ready
&& retry_count >= DIRECT_WAIT_ATTEMPTS
{
tracing::warn!(
"Peer topology stabilized via gateway only (peer1 direct: {}, peer2 direct: {}). Proceeding with fallback.",
peer1_direct,
peer2_direct
);
fell_back_to_gateway = true;
if gateway_sees_all && peer1_direct && peer2_direct {
tracing::info!("✅ Full mesh connectivity established!");
mesh_established = true;
break;
}

tracing::info!("Network not fully connected yet, waiting...");
tokio::time::sleep(RETRY_DELAY).await;
}

if !mesh_established {
bail!(
"Failed to establish full mesh connectivity after {} attempts. Gateway peers: {}; peer1 peers: {}; peer2 peers: {}",
MAX_RETRIES,
last_snapshot.0,
last_snapshot.1,
last_snapshot.2
);
}

// Verify functionality with PUT/GET
tracing::info!("Verifying network functionality with PUT/GET operations");

if fell_back_to_gateway && !direct_mesh_established {
tracing::warn!("Gateway-mediated routing is being exercised; direct peer links were not observed within {} attempts.", DIRECT_WAIT_ATTEMPTS);
}

make_put(&mut client1, wrapped_state.clone(), contract.clone(), false).await?;
let resp = tokio::time::timeout(Duration::from_secs(60), client1.recv()).await;
match resp {
Expand Down
Loading