From a9e90055e5a30f0b697e085b1a450ba29964c797 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Mon, 25 Nov 2024 07:34:59 -0600 Subject: [PATCH 1/4] Handle sockets one last time before closing sockets --- src/net.cpp | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index a04687b5b36e5..4679069c173b6 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1483,7 +1483,8 @@ void CConnman::SocketHandler() sendSet = send_set.count(pnode->hSocket) > 0; errorSet = error_set.count(pnode->hSocket) > 0; } - if (recvSet || errorSet) { + if (!pnode->fDisconnect && (recvSet || errorSet)) + { // typical socket buffer is 8K-64K char pchBuf[0x10000]; int nBytes = 0; @@ -1547,10 +1548,21 @@ void CConnman::SocketHandler() void CConnman::ThreadSocketHandler() { - while (!interruptNet) { - DisconnectNodes(); - NotifyNumConnectionsChanged(); + int64_t nLastCleanupNodes = 0; + + while (!interruptNet) + { + // Handle sockets before we do the next round of disconnects. This allows us to flush send buffers one last time + // before actually closing sockets. Receiving is however skipped in case a peer is pending to be disconnected SocketHandler(); + if (GetTimeMillis() - nLastCleanupNodes > 1000) { + ForEachNode(AllNodes, [&](CNode* pnode) { + InactivityCheck(pnode); + }); + DisconnectNodes(); + nLastCleanupNodes = GetTimeMillis(); + } + NotifyNumConnectionsChanged(); } } From 72c4f5fc60299310bc2e91ae3364e6a092abf9e9 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Mon, 25 Nov 2024 07:36:43 -0600 Subject: [PATCH 2/4] Remove support for optimistic send This was only used in only one remaining place and only to ensure that reject messages are sent before closing sockets. This is solved by the previous commit now. --- src/llmq/quorums_signing_shares.cpp | 35 ++++++++++++++++++++++------- src/net.cpp | 8 ++----- src/net.h | 13 ++--------- src/net_processing.cpp | 5 +++++ 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index d23c2f7291f42..c9ca2e2c4f7f9 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -1157,13 +1157,13 @@ bool CSigSharesManager::SendMessages() llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); msgs.emplace_back(sigSesAnn); if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); didSend = true; } } @@ -1177,13 +1177,13 @@ bool CSigSharesManager::SendMessages() p.first.ToString(), p.second.ToString(), pnode->GetId()); msgs.emplace_back(std::move(p.second)); if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); didSend = true; } } @@ -1197,7 +1197,7 @@ bool CSigSharesManager::SendMessages() LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToInvString(), pnode->GetId()); if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); msgs.clear(); totalSigsCount = 0; didSend = true; @@ -1206,7 +1206,7 @@ bool CSigSharesManager::SendMessages() msgs.emplace_back(std::move(p.second)); } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs))); didSend = true; } } @@ -1220,13 +1220,32 @@ bool CSigSharesManager::SendMessages() p.first.ToString(), p.second.ToString(), pnode->GetId()); msgs.emplace_back(std::move(p.second)); if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); + didSend = true; + } + } + + auto lt = sigSharesToSend.find(pnode->GetId()); + if (lt != sigSharesToSend.end()) { + std::vector msgs; + for (auto& sigShare : lt->second) { + LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QSIGSHARE signHash=%s, node=%d\n", + sigShare.GetSignHash().ToString(), pnode->GetId()); + msgs.emplace_back(std::move(sigShare)); + if (msgs.size() == MAX_MSGS_SIG_SHARES) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); didSend = true; } } diff --git a/src/net.cpp b/src/net.cpp index 4679069c173b6..7ace78432b72a 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2762,7 +2762,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } -void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend) +void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { size_t nMessageSize = msg.data.size(); size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; @@ -2780,7 +2780,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti { LOCK(pnode->cs_vSend); bool hasPendingData = !pnode->vSendMsg.empty(); - bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty()); //log total amount of bytes per command pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; @@ -2792,11 +2791,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); - // If write queue empty, attempt "optimistic write" - if (optimisticSend == true) - nBytesSent = SocketSendData(pnode); // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - else if (!hasPendingData && wakeupSelectNeeded) + if (!hasPendingData && wakeupSelectNeeded) WakeSelect(); } if (nBytesSent) diff --git a/src/net.h b/src/net.h index 2b25df53f4dd2..3b7a9f56d96aa 100644 --- a/src/net.h +++ b/src/net.h @@ -36,17 +36,8 @@ #include #endif -// "Optimistic send" was introduced in the beginning of the Bitcoin project. I assume this was done because it was -// thought that "send" would be very cheap when the send buffer is empty. This is not true, as shown by profiling. -// When a lot of load is seen on the network, the "send" call done in the message handler thread can easily use up 20% -// of time, effectively blocking things that could be done in parallel. We have introduced a way to wake up the select() -// call in the network thread, which allows us to disable optimistic send without introducing an artificial latency/delay -// when sending data. This however only works on non-WIN32 platforms for now. When we add support for WIN32 platforms, -// we can completely remove optimistic send. -#ifdef WIN32 -#define DEFAULT_ALLOW_OPTIMISTIC_SEND true -#else -#define DEFAULT_ALLOW_OPTIMISTIC_SEND false + +#ifndef WIN32 #define USE_WAKEUP_PIPE #endif diff --git a/src/net_processing.cpp b/src/net_processing.cpp index bdda9fdf27268..47eaec379ccc2 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2232,6 +2232,11 @@ static bool DisconnectIfBanned(CNode* pnode, CConnman* connman) AssertLockHeld(cs_main); CNodeState &state = *State(pnode->GetId()); + for (const CBlockReject& reject : state.rejects) { + connman->PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, (std::string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock)); + } + state.rejects.clear(); + if (state.fShouldBan) { state.fShouldBan = false; if (pnode->fWhitelisted) { From 1f738689c2236eae8c018245533aaa14893689e5 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Mon, 25 Nov 2024 07:37:39 -0600 Subject: [PATCH 3/4] Add some linger time between fDisconnect=true and actually closing the socket --- src/net.cpp | 30 +++++++++++++++++++++++++++--- src/net.h | 1 + 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 7ace78432b72a..02b4a750d0362 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1172,9 +1172,33 @@ void CConnman::DisconnectNodes() } // Disconnect unused nodes - std::vector vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) { - if (pnode->fDisconnect) { + for (auto it = vNodes.begin(); it != vNodes.end(); ) + { + CNode* pnode = *it; + if (pnode->fDisconnect) + { + if (pnode->nDisconnectLingerTime == 0) { + // let's not immediately close the socket but instead wait for at least 100ms so that there is a + // chance to flush all/some pending data. Otherwise the other side might not receive REJECT messages + // that were pushed right before setting fDisconnect=true + // Flushing must happen in two places to ensure data can be received by the other side: + // 1. vSendMsg must be empty and all messages sent via send(). This is ensured by SocketHandler() + // being called before DisconnectNodes and also by the linger time + // 2. Internal socket send buffers must be flushed. This is ensured solely by the linger time + pnode->nDisconnectLingerTime = GetTimeMillis() + 100; + continue; + } else if (GetTimeMillis() < pnode->nDisconnectLingerTime) { + continue; + } + + if (fLogIPs) { + LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d fMasternode=%d\n", + pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode); + } else { + LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d fMasternode=%d\n", + pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode); + } + // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); diff --git a/src/net.h b/src/net.h index 3b7a9f56d96aa..1a4702ce26d60 100644 --- a/src/net.h +++ b/src/net.h @@ -711,6 +711,7 @@ class CNode std::atomic_bool m_wants_addrv2{false}; std::atomic_bool fSuccessfullyConnected; std::atomic_bool fDisconnect; + std::atomic nDisconnectLingerTime{0}; // We use fRelayTxes for two purposes - // a) it allows us to not relay tx invs before receiving the peer's version message // b) the peer may tell us in their version message that we should not relay tx invs From 588c04c97959b7470d1256db5ff1a26da16c169b Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Tue, 14 Jan 2025 15:49:53 -0600 Subject: [PATCH 4/4] Missing from commit ab14f19 cherrypick --- src/llmq/quorums_signing_shares.cpp | 23 ++--------------------- src/net.cpp | 4 ++-- src/net.h | 12 +++++++++++- src/net_processing.cpp | 8 +++++++- src/protocol.cpp | 2 ++ src/protocol.h | 1 + 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index c9ca2e2c4f7f9..9d3761f5c4d7f 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -1231,25 +1231,6 @@ bool CSigSharesManager::SendMessages() } } - auto lt = sigSharesToSend.find(pnode->GetId()); - if (lt != sigSharesToSend.end()) { - std::vector msgs; - for (auto& sigShare : lt->second) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QSIGSHARE signHash=%s, node=%d\n", - sigShare.GetSignHash().ToString(), pnode->GetId()); - msgs.emplace_back(std::move(sigShare)); - if (msgs.size() == MAX_MSGS_SIG_SHARES) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); - msgs.clear(); - didSend = true; - } - } - if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); - didSend = true; - } - } - auto lt = sigSharesToSend.find(pnode->GetId()); if (lt != sigSharesToSend.end()) { std::vector msgs; @@ -1258,13 +1239,13 @@ bool CSigSharesManager::SendMessages() sigShare.GetSignHash().ToString(), pnode->GetId()); msgs.emplace_back(std::move(sigShare)); if (msgs.size() == MAX_MSGS_SIG_SHARES) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); didSend = true; } } diff --git a/src/net.cpp b/src/net.cpp index 02b4a750d0362..fbdfa9e7bb69f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1193,10 +1193,10 @@ void CConnman::DisconnectNodes() if (fLogIPs) { LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d fMasternode=%d\n", - pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode); + pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, fMasterNode); } else { LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d fMasternode=%d\n", - pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode); + pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, fMasterNode); } // remove from vNodes diff --git a/src/net.h b/src/net.h index 1a4702ce26d60..5c4338b90bdf5 100644 --- a/src/net.h +++ b/src/net.h @@ -215,7 +215,7 @@ class CConnman bool ForNode(NodeId id, std::function func); bool ForNode(const CService& addr, const std::function& cond, const std::function& func); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = DEFAULT_ALLOW_OPTIMISTIC_SEND); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); template bool ForEachNodeContinueIf(Callable&& func) @@ -264,6 +264,16 @@ class CConnman } }; + template + void ForEachNode(const Condition& cond, Callable&& func) const + { + LOCK(cs_vNodes); + for (auto&& node : vNodes) { + if (cond(node)) + func(node); + } + }; + template void ForEachNodeThen(Callable&& pre, CallableAfter&& post) { diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 47eaec379ccc2..b38d5103c31fc 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -191,7 +191,11 @@ class CNodeBlocks size_t maxAvg; }; - +struct CBlockReject { + unsigned char chRejectCode; + std::string strRejectReason; + uint256 hashBlock; +}; /** * Maintain validation-specific state about nodes, protected by cs_main, instead @@ -210,6 +214,8 @@ struct CNodeState { bool fShouldBan; //! String name of this peer (debugging/logging purposes). const std::string name; + //! List of asynchronously-determined block rejections to notify this peer about. + std::vector rejects; //! The best known block we know this peer has announced. const CBlockIndex* pindexBestKnownBlock; //! The hash of the last unknown block this peer has announced. diff --git a/src/protocol.cpp b/src/protocol.cpp index f42735c358588..64ec67ffc8359 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -18,6 +18,7 @@ namespace NetMsgType { const char* VERSION = "version"; const char* VERACK = "verack"; +const char* REJECT = "reject"; const char* ADDR = "addr"; const char* ADDRV2="addrv2"; const char* SENDADDRV2="sendaddrv2"; @@ -76,6 +77,7 @@ const char* CLSIG = "clsig"; const static std::string allNetMessageTypes[] = { NetMsgType::VERSION, NetMsgType::VERACK, + NetMsgType::REJECT, NetMsgType::ADDR, NetMsgType::ADDRV2, NetMsgType::SENDADDRV2, diff --git a/src/protocol.h b/src/protocol.h index fe50c53a8e518..2b073cb19f921 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -75,6 +75,7 @@ extern const char* VERSION; * @see https://bitcoin.org/en/developer-reference#verack */ extern const char* VERACK; +extern const char* REJECT; /** * The addr (IP address) message relays connection information for peers on the * network.