Skip to content

Commit 947b863

Browse files
committed
fix(network): dedupe concurrent connection attempts
1 parent a7bfd06 commit 947b863

File tree

1 file changed

+135
-38
lines changed

1 file changed

+135
-38
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 135 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ impl P2pConnManager {
397397
.await?;
398398

399399
// Wait for connection to be established (with timeout)
400-
match timeout(Duration::from_secs(5), result.recv()).await {
400+
match timeout(Duration::from_secs(20), result.recv()).await {
401401
Ok(Some(Ok(_))) => {
402402
// Connection established, try sending again
403403
// IMPORTANT: Use single get() call to avoid TOCTOU race
@@ -492,13 +492,15 @@ impl P2pConnManager {
492492
"Cleaning up in-progress connection reservations"
493493
);
494494

495-
for (addr, mut callback) in state.awaiting_connection.drain() {
496-
tracing::debug!(%addr, "Notifying awaiting connection of shutdown");
495+
for (addr, mut callbacks) in state.awaiting_connection.drain() {
496+
tracing::debug!(%addr, callbacks = callbacks.len(), "Notifying awaiting connection of shutdown");
497497
// Best effort notification - ignore errors since we're shutting down anyway
498498
// The callback sender will handle cleanup on their side
499-
let _ = callback
500-
.send_result(Err(HandshakeError::ChannelClosed))
501-
.await;
499+
for mut callback in callbacks.drain(..) {
500+
let _ = callback
501+
.send_result(Err(HandshakeError::ChannelClosed))
502+
.await;
503+
}
502504
}
503505

504506
tracing::info!("Cleanup complete, exiting event loop");
@@ -1020,24 +1022,107 @@ impl P2pConnManager {
10201022
}
10211023
tracing::debug!(tx = %tx, "Blocked addresses: {:?}, peer addr: {}", blocked_addrs, peer.addr);
10221024
}
1023-
state.awaiting_connection.insert(peer.addr, callback);
1025+
match state.awaiting_connection.entry(peer.addr) {
1026+
std::collections::hash_map::Entry::Occupied(mut callbacks) => {
1027+
tracing::debug!(
1028+
tx = %tx,
1029+
remote = %peer.addr,
1030+
pending = callbacks.get().len(),
1031+
"Connection already pending, queuing additional requester"
1032+
);
1033+
callbacks.get_mut().push(callback);
1034+
return Ok(());
1035+
}
1036+
std::collections::hash_map::Entry::Vacant(entry) => {
1037+
entry.insert(vec![callback]);
1038+
}
1039+
}
10241040
let res = timeout(
10251041
Duration::from_secs(10),
10261042
handshake_handler_msg.establish_conn(peer.clone(), tx, is_gw),
10271043
)
1028-
.await
1029-
.inspect_err(|error| {
1030-
tracing::error!(tx = %tx, "Failed to establish connection: {:?}", error);
1031-
})?;
1044+
.await;
10321045
match res {
1033-
Ok(()) => {
1034-
tracing::debug!(tx = %tx,
1046+
Ok(Ok(())) => {
1047+
tracing::debug!(
1048+
tx = %tx,
10351049
"Successfully initiated connection process for peer: {:?}",
10361050
peer
10371051
);
10381052
Ok(())
10391053
}
1040-
Err(e) => Err(anyhow::Error::msg(e)),
1054+
Ok(Err(e)) => {
1055+
tracing::warn!(
1056+
tx = %tx,
1057+
remote = %peer.addr,
1058+
error = ?e,
1059+
"Handshake establish_conn returned error"
1060+
);
1061+
if let Some(callbacks) = state.awaiting_connection.remove(&peer.addr) {
1062+
let mut callbacks = callbacks.into_iter();
1063+
if let Some(mut cb) = callbacks.next() {
1064+
cb.send_result(Err(e))
1065+
.await
1066+
.inspect_err(|send_err| {
1067+
tracing::debug!(
1068+
remote = %peer.addr,
1069+
error = ?send_err,
1070+
"Failed to deliver handshake error to awaiting callback"
1071+
);
1072+
})
1073+
.ok();
1074+
}
1075+
for mut cb in callbacks {
1076+
cb.send_result(Err(HandshakeError::ConnectionClosed(peer.addr)))
1077+
.await
1078+
.inspect_err(|send_err| {
1079+
tracing::debug!(
1080+
remote = %peer.addr,
1081+
error = ?send_err,
1082+
"Failed to deliver fallback handshake error to awaiting callback"
1083+
);
1084+
})
1085+
.ok();
1086+
}
1087+
}
1088+
Ok(())
1089+
}
1090+
Err(elapsed) => {
1091+
tracing::warn!(
1092+
tx = %tx,
1093+
remote = %peer.addr,
1094+
elapsed = ?elapsed,
1095+
"Timeout while establishing connection"
1096+
);
1097+
if let Some(callbacks) = state.awaiting_connection.remove(&peer.addr) {
1098+
let mut iter = callbacks.into_iter();
1099+
if let Some(mut cb) = iter.next() {
1100+
cb.send_result(Err(HandshakeError::ConnectionClosed(peer.addr)))
1101+
.await
1102+
.inspect_err(|send_err| {
1103+
tracing::debug!(
1104+
remote = %peer.addr,
1105+
error = ?send_err,
1106+
"Failed to deliver connection timeout to awaiting callback"
1107+
);
1108+
})
1109+
.ok();
1110+
}
1111+
for mut cb in iter {
1112+
cb.send_result(Err(HandshakeError::ChannelClosed))
1113+
.await
1114+
.inspect_err(|send_err| {
1115+
tracing::debug!(
1116+
remote = %peer.addr,
1117+
error = ?send_err,
1118+
"Failed to deliver fallback connection timeout to awaiting callback"
1119+
);
1120+
})
1121+
.ok();
1122+
}
1123+
}
1124+
Ok(())
1125+
}
10411126
}
10421127
}
10431128

@@ -1172,26 +1257,36 @@ impl P2pConnManager {
11721257
return Err(error.into());
11731258
}
11741259
}
1175-
if let Some(mut r) = state.awaiting_connection.remove(&peer_id.addr) {
1176-
// Don't propagate channel closed errors - just log and continue
1177-
// The receiver may have timed out or been cancelled, which shouldn't crash the node
1178-
r.send_result(Err(error))
1179-
.await
1180-
.inspect_err(|e| {
1181-
tracing::warn!(%peer_id, "Failed to send connection error notification - receiver may have timed out: {:?}", e);
1182-
})
1183-
.ok();
1260+
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
1261+
let mut callbacks = callbacks.into_iter();
1262+
if let Some(mut r) = callbacks.next() {
1263+
// Don't propagate channel closed errors - just log and continue
1264+
// The receiver may have timed out or been cancelled, which shouldn't crash the node
1265+
r.send_result(Err(error))
1266+
.await
1267+
.inspect_err(|e| {
1268+
tracing::warn!(%peer_id, "Failed to send connection error notification - receiver may have timed out: {:?}", e);
1269+
})
1270+
.ok();
1271+
}
1272+
for mut r in callbacks {
1273+
if let Err(e) = r.send_result(Err(HandshakeError::ChannelClosed)).await {
1274+
tracing::debug!(%peer_id, "Failed to send fallback connection error notification: {:?}", e);
1275+
}
1276+
}
11841277
}
11851278
}
11861279
HandshakeEvent::RemoveTransaction(tx) => {
11871280
state.transient_conn.remove(&tx);
11881281
}
11891282
HandshakeEvent::OutboundGatewayConnectionRejected { peer_id } => {
11901283
tracing::info!(%peer_id, "Connection rejected by peer");
1191-
if let Some(mut r) = state.awaiting_connection.remove(&peer_id.addr) {
1192-
// Don't propagate channel closed errors - just log and continue
1193-
if let Err(e) = r.send_result(Err(HandshakeError::ChannelClosed)).await {
1194-
tracing::debug!(%peer_id, "Failed to send rejection notification: {:?}", e);
1284+
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
1285+
for mut r in callbacks {
1286+
// Don't propagate channel closed errors - just log and continue
1287+
if let Err(e) = r.send_result(Err(HandshakeError::ChannelClosed)).await {
1288+
tracing::debug!(%peer_id, "Failed to send rejection notification: {:?}", e);
1289+
}
11951290
}
11961291
}
11971292
}
@@ -1225,8 +1320,8 @@ impl P2pConnManager {
12251320
select_stream: &mut priority_select::ProductionPrioritySelectStream,
12261321
remaining_checks: Option<usize>,
12271322
) -> anyhow::Result<()> {
1228-
if let Some(mut cb) = state.awaiting_connection.remove(&peer_id.addr) {
1229-
let peer_id = if let Some(peer_id) = self
1323+
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
1324+
let resolved_peer_id = if let Some(peer_id) = self
12301325
.bridge
12311326
.op_manager
12321327
.ring
@@ -1241,14 +1336,16 @@ impl P2pConnManager {
12411336
let key = (*self.bridge.op_manager.ring.connection_manager.pub_key).clone();
12421337
PeerId::new(self_addr, key)
12431338
};
1244-
timeout(
1245-
Duration::from_secs(60),
1246-
cb.send_result(Ok((peer_id, remaining_checks))),
1247-
)
1248-
.await
1249-
.inspect_err(|error| {
1250-
tracing::error!("Failed to send connection result: {:?}", error);
1251-
})??;
1339+
for mut cb in callbacks {
1340+
timeout(
1341+
Duration::from_secs(60),
1342+
cb.send_result(Ok((resolved_peer_id.clone(), remaining_checks))),
1343+
)
1344+
.await
1345+
.inspect_err(|error| {
1346+
tracing::error!("Failed to send connection result: {:?}", error);
1347+
})??;
1348+
}
12521349
} else {
12531350
tracing::warn!(%peer_id, "No callback for connection established");
12541351
}
@@ -1527,7 +1624,7 @@ struct EventListenerState {
15271624
tx_to_client: HashMap<Transaction, HashSet<ClientId>>,
15281625
client_waiting_transaction: Vec<(WaitingTransaction, HashSet<ClientId>)>,
15291626
transient_conn: HashMap<Transaction, SocketAddr>,
1530-
awaiting_connection: HashMap<SocketAddr, Box<dyn ConnectResultSender>>,
1627+
awaiting_connection: HashMap<SocketAddr, Vec<Box<dyn ConnectResultSender>>>,
15311628
pending_op_results: HashMap<Transaction, Sender<NetMessage>>,
15321629
}
15331630

0 commit comments

Comments
 (0)