Skip to content

Commit b6738f5

Browse files
committed
fix(network): dedupe concurrent connection attempts
1 parent a4a834f commit b6738f5

File tree

1 file changed

+135
-38
lines changed

1 file changed

+135
-38
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 135 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl P2pConnManager {
396396
.await?;
397397

398398
// Wait for connection to be established (with timeout)
399-
match timeout(Duration::from_secs(5), result.recv()).await {
399+
match timeout(Duration::from_secs(20), result.recv()).await {
400400
Ok(Some(Ok(_))) => {
401401
// Connection established, try sending again
402402
// IMPORTANT: Use single get() call to avoid TOCTOU race
@@ -491,13 +491,15 @@ impl P2pConnManager {
491491
"Cleaning up in-progress connection reservations"
492492
);
493493

494-
for (addr, mut callback) in state.awaiting_connection.drain() {
495-
tracing::debug!(%addr, "Notifying awaiting connection of shutdown");
494+
for (addr, mut callbacks) in state.awaiting_connection.drain() {
495+
tracing::debug!(%addr, callbacks = callbacks.len(), "Notifying awaiting connection of shutdown");
496496
// Best effort notification - ignore errors since we're shutting down anyway
497497
// The callback sender will handle cleanup on their side
498-
let _ = callback
499-
.send_result(Err(HandshakeError::ChannelClosed))
500-
.await;
498+
for mut callback in callbacks.drain(..) {
499+
let _ = callback
500+
.send_result(Err(HandshakeError::ChannelClosed))
501+
.await;
502+
}
501503
}
502504

503505
tracing::info!("Cleanup complete, exiting event loop");
@@ -1019,24 +1021,107 @@ impl P2pConnManager {
10191021
}
10201022
tracing::debug!(tx = %tx, "Blocked addresses: {:?}, peer addr: {}", blocked_addrs, peer.addr);
10211023
}
1022-
state.awaiting_connection.insert(peer.addr, callback);
1024+
match state.awaiting_connection.entry(peer.addr) {
1025+
std::collections::hash_map::Entry::Occupied(mut callbacks) => {
1026+
tracing::debug!(
1027+
tx = %tx,
1028+
remote = %peer.addr,
1029+
pending = callbacks.get().len(),
1030+
"Connection already pending, queuing additional requester"
1031+
);
1032+
callbacks.get_mut().push(callback);
1033+
return Ok(());
1034+
}
1035+
std::collections::hash_map::Entry::Vacant(entry) => {
1036+
entry.insert(vec![callback]);
1037+
}
1038+
}
10231039
let res = timeout(
10241040
Duration::from_secs(10),
10251041
handshake_handler_msg.establish_conn(peer.clone(), tx, is_gw),
10261042
)
1027-
.await
1028-
.inspect_err(|error| {
1029-
tracing::error!(tx = %tx, "Failed to establish connection: {:?}", error);
1030-
})?;
1043+
.await;
10311044
match res {
1032-
Ok(()) => {
1033-
tracing::debug!(tx = %tx,
1045+
Ok(Ok(())) => {
1046+
tracing::debug!(
1047+
tx = %tx,
10341048
"Successfully initiated connection process for peer: {:?}",
10351049
peer
10361050
);
10371051
Ok(())
10381052
}
1039-
Err(e) => Err(anyhow::Error::msg(e)),
1053+
Ok(Err(e)) => {
1054+
tracing::warn!(
1055+
tx = %tx,
1056+
remote = %peer.addr,
1057+
error = ?e,
1058+
"Handshake establish_conn returned error"
1059+
);
1060+
if let Some(callbacks) = state.awaiting_connection.remove(&peer.addr) {
1061+
let mut callbacks = callbacks.into_iter();
1062+
if let Some(mut cb) = callbacks.next() {
1063+
cb.send_result(Err(e))
1064+
.await
1065+
.inspect_err(|send_err| {
1066+
tracing::debug!(
1067+
remote = %peer.addr,
1068+
error = ?send_err,
1069+
"Failed to deliver handshake error to awaiting callback"
1070+
);
1071+
})
1072+
.ok();
1073+
}
1074+
for mut cb in callbacks {
1075+
cb.send_result(Err(HandshakeError::ConnectionClosed(peer.addr)))
1076+
.await
1077+
.inspect_err(|send_err| {
1078+
tracing::debug!(
1079+
remote = %peer.addr,
1080+
error = ?send_err,
1081+
"Failed to deliver fallback handshake error to awaiting callback"
1082+
);
1083+
})
1084+
.ok();
1085+
}
1086+
}
1087+
Ok(())
1088+
}
1089+
Err(elapsed) => {
1090+
tracing::warn!(
1091+
tx = %tx,
1092+
remote = %peer.addr,
1093+
elapsed = ?elapsed,
1094+
"Timeout while establishing connection"
1095+
);
1096+
if let Some(callbacks) = state.awaiting_connection.remove(&peer.addr) {
1097+
let mut iter = callbacks.into_iter();
1098+
if let Some(mut cb) = iter.next() {
1099+
cb.send_result(Err(HandshakeError::ConnectionClosed(peer.addr)))
1100+
.await
1101+
.inspect_err(|send_err| {
1102+
tracing::debug!(
1103+
remote = %peer.addr,
1104+
error = ?send_err,
1105+
"Failed to deliver connection timeout to awaiting callback"
1106+
);
1107+
})
1108+
.ok();
1109+
}
1110+
for mut cb in iter {
1111+
cb.send_result(Err(HandshakeError::ChannelClosed))
1112+
.await
1113+
.inspect_err(|send_err| {
1114+
tracing::debug!(
1115+
remote = %peer.addr,
1116+
error = ?send_err,
1117+
"Failed to deliver fallback connection timeout to awaiting callback"
1118+
);
1119+
})
1120+
.ok();
1121+
}
1122+
}
1123+
Ok(())
1124+
}
10401125
}
10411126
}
10421127

@@ -1171,26 +1256,36 @@ impl P2pConnManager {
11711256
return Err(error.into());
11721257
}
11731258
}
1174-
if let Some(mut r) = state.awaiting_connection.remove(&peer_id.addr) {
1175-
// Don't propagate channel closed errors - just log and continue
1176-
// The receiver may have timed out or been cancelled, which shouldn't crash the node
1177-
r.send_result(Err(error))
1178-
.await
1179-
.inspect_err(|e| {
1180-
tracing::warn!(%peer_id, "Failed to send connection error notification - receiver may have timed out: {:?}", e);
1181-
})
1182-
.ok();
1259+
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
1260+
let mut callbacks = callbacks.into_iter();
1261+
if let Some(mut r) = callbacks.next() {
1262+
// Don't propagate channel closed errors - just log and continue
1263+
// The receiver may have timed out or been cancelled, which shouldn't crash the node
1264+
r.send_result(Err(error))
1265+
.await
1266+
.inspect_err(|e| {
1267+
tracing::warn!(%peer_id, "Failed to send connection error notification - receiver may have timed out: {:?}", e);
1268+
})
1269+
.ok();
1270+
}
1271+
for mut r in callbacks {
1272+
if let Err(e) = r.send_result(Err(HandshakeError::ChannelClosed)).await {
1273+
tracing::debug!(%peer_id, "Failed to send fallback connection error notification: {:?}", e);
1274+
}
1275+
}
11831276
}
11841277
}
11851278
HandshakeEvent::RemoveTransaction(tx) => {
11861279
state.transient_conn.remove(&tx);
11871280
}
11881281
HandshakeEvent::OutboundGatewayConnectionRejected { peer_id } => {
11891282
tracing::info!(%peer_id, "Connection rejected by peer");
1190-
if let Some(mut r) = state.awaiting_connection.remove(&peer_id.addr) {
1191-
// Don't propagate channel closed errors - just log and continue
1192-
if let Err(e) = r.send_result(Err(HandshakeError::ChannelClosed)).await {
1193-
tracing::debug!(%peer_id, "Failed to send rejection notification: {:?}", e);
1283+
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
1284+
for mut r in callbacks {
1285+
// Don't propagate channel closed errors - just log and continue
1286+
if let Err(e) = r.send_result(Err(HandshakeError::ChannelClosed)).await {
1287+
tracing::debug!(%peer_id, "Failed to send rejection notification: {:?}", e);
1288+
}
11941289
}
11951290
}
11961291
}
@@ -1224,8 +1319,8 @@ impl P2pConnManager {
12241319
select_stream: &mut priority_select::ProductionPrioritySelectStream,
12251320
remaining_checks: Option<usize>,
12261321
) -> anyhow::Result<()> {
1227-
if let Some(mut cb) = state.awaiting_connection.remove(&peer_id.addr) {
1228-
let peer_id = if let Some(peer_id) = self
1322+
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
1323+
let resolved_peer_id = if let Some(peer_id) = self
12291324
.bridge
12301325
.op_manager
12311326
.ring
@@ -1240,14 +1335,16 @@ impl P2pConnManager {
12401335
let key = (*self.bridge.op_manager.ring.connection_manager.pub_key).clone();
12411336
PeerId::new(self_addr, key)
12421337
};
1243-
timeout(
1244-
Duration::from_secs(60),
1245-
cb.send_result(Ok((peer_id, remaining_checks))),
1246-
)
1247-
.await
1248-
.inspect_err(|error| {
1249-
tracing::error!("Failed to send connection result: {:?}", error);
1250-
})??;
1338+
for mut cb in callbacks {
1339+
timeout(
1340+
Duration::from_secs(60),
1341+
cb.send_result(Ok((resolved_peer_id.clone(), remaining_checks))),
1342+
)
1343+
.await
1344+
.inspect_err(|error| {
1345+
tracing::error!("Failed to send connection result: {:?}", error);
1346+
})??;
1347+
}
12511348
} else {
12521349
tracing::warn!(%peer_id, "No callback for connection established");
12531350
}
@@ -1526,7 +1623,7 @@ struct EventListenerState {
15261623
tx_to_client: HashMap<Transaction, HashSet<ClientId>>,
15271624
client_waiting_transaction: Vec<(WaitingTransaction, HashSet<ClientId>)>,
15281625
transient_conn: HashMap<Transaction, SocketAddr>,
1529-
awaiting_connection: HashMap<SocketAddr, Box<dyn ConnectResultSender>>,
1626+
awaiting_connection: HashMap<SocketAddr, Vec<Box<dyn ConnectResultSender>>>,
15301627
pending_op_results: HashMap<Transaction, Sender<NetMessage>>,
15311628
}
15321629

0 commit comments

Comments
 (0)