Skip to content

Commit 349cd08

Browse files
committed
fix: keep peers running after handshake drop
1 parent f12ecf5 commit 349cd08

File tree

1 file changed

+104
-8
lines changed

1 file changed

+104
-8
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,7 @@ impl P2pConnManager {
465465
}
466466
ConnEvent::ClosedChannel(reason) => {
467467
match reason {
468-
ChannelCloseReason::Handshake
469-
| ChannelCloseReason::Bridge
468+
ChannelCloseReason::Bridge
470469
| ChannelCloseReason::Controller
471470
| ChannelCloseReason::Notification
472471
| ChannelCloseReason::OpExecution => {
@@ -942,10 +941,13 @@ impl P2pConnManager {
942941
Ok(EventResult::Continue)
943942
}
944943
Err(handshake_error) => {
945-
tracing::error!(?handshake_error, "Handshake handler error");
946-
Ok(EventResult::Event(
947-
ConnEvent::ClosedChannel(ChannelCloseReason::Handshake).into(),
948-
))
944+
tracing::warn!(
945+
?handshake_error,
946+
"Handshake handler yielded error; cleaning up pending connections"
947+
);
948+
self.handle_handshake_failure(handshake_error, state)
949+
.await?;
950+
Ok(EventResult::Continue)
949951
}
950952
}
951953
}
@@ -1462,6 +1464,102 @@ impl P2pConnManager {
14621464
Ok(())
14631465
}
14641466

1467+
async fn handle_handshake_failure(
1468+
&mut self,
1469+
handshake_error: HandshakeError,
1470+
state: &mut EventListenerState,
1471+
) -> anyhow::Result<()> {
1472+
match handshake_error {
1473+
HandshakeError::ConnectionClosed(addr) => {
1474+
let pending_txs = state
1475+
.awaiting_connection_txs
1476+
.remove(&addr)
1477+
.unwrap_or_default();
1478+
if let Some(callbacks) = state.awaiting_connection.remove(&addr) {
1479+
tracing::info!(
1480+
remote = %addr,
1481+
callbacks = callbacks.len(),
1482+
pending_txs = ?pending_txs,
1483+
"Notifying callbacks after handshake connection closed"
1484+
);
1485+
1486+
let mut callbacks = callbacks.into_iter();
1487+
if let Some(mut cb) = callbacks.next() {
1488+
cb.send_result(Err(HandshakeError::ConnectionClosed(addr)))
1489+
.await
1490+
.inspect_err(|err| {
1491+
tracing::debug!(
1492+
remote = %addr,
1493+
error = ?err,
1494+
"Failed to notify primary handshake callback"
1495+
);
1496+
})
1497+
.ok();
1498+
}
1499+
1500+
for mut cb in callbacks {
1501+
cb.send_result(Err(HandshakeError::ChannelClosed))
1502+
.await
1503+
.inspect_err(|err| {
1504+
tracing::debug!(
1505+
remote = %addr,
1506+
error = ?err,
1507+
"Failed to notify fallback handshake callback"
1508+
);
1509+
})
1510+
.ok();
1511+
}
1512+
}
1513+
1514+
// Drop any pending transient transactions bound to this address
1515+
state
1516+
.transient_conn
1517+
.retain(|_, socket_addr| socket_addr != &addr);
1518+
}
1519+
HandshakeError::ChannelClosed => {
1520+
if !state.awaiting_connection.is_empty() {
1521+
tracing::warn!(
1522+
awaiting = state.awaiting_connection.len(),
1523+
"Handshake channel closed; notifying all pending callbacks"
1524+
);
1525+
}
1526+
1527+
let awaiting = std::mem::take(&mut state.awaiting_connection);
1528+
let awaiting_txs = std::mem::take(&mut state.awaiting_connection_txs);
1529+
1530+
for (addr, callbacks) in awaiting {
1531+
let pending_txs = awaiting_txs.get(&addr).cloned().unwrap_or_default();
1532+
tracing::debug!(
1533+
remote = %addr,
1534+
callbacks = callbacks.len(),
1535+
pending_txs = ?pending_txs,
1536+
"Delivering channel-closed notification to pending callbacks"
1537+
);
1538+
for mut cb in callbacks {
1539+
cb.send_result(Err(HandshakeError::ChannelClosed))
1540+
.await
1541+
.inspect_err(|err| {
1542+
tracing::debug!(
1543+
remote = %addr,
1544+
error = ?err,
1545+
"Failed to deliver channel-closed handshake notification"
1546+
);
1547+
})
1548+
.ok();
1549+
}
1550+
}
1551+
}
1552+
other => {
1553+
tracing::warn!(
1554+
?other,
1555+
"Unhandled handshake error without socket association"
1556+
);
1557+
}
1558+
}
1559+
1560+
Ok(())
1561+
}
1562+
14651563
async fn try_to_forward(&mut self, forward_to: &PeerId, msg: NetMessage) -> anyhow::Result<()> {
14661564
if let Some(peer) = self.connections.get(forward_to) {
14671565
tracing::debug!(%forward_to, %msg, "Forwarding message to peer");
@@ -1851,8 +1949,6 @@ pub(super) enum ConnEvent {
18511949

18521950
#[derive(Debug)]
18531951
pub(super) enum ChannelCloseReason {
1854-
/// Handshake channel closed - potentially transient, continue operation
1855-
Handshake,
18561952
/// Internal bridge channel closed - critical, must shutdown gracefully
18571953
Bridge,
18581954
/// Node controller channel closed - critical, must shutdown gracefully

0 commit comments

Comments
 (0)