diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index d23c2f7291f42..9d3761f5c4d7f 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -1157,13 +1157,13 @@ bool CSigSharesManager::SendMessages() llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); msgs.emplace_back(sigSesAnn); if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); didSend = true; } } @@ -1177,13 +1177,13 @@ bool CSigSharesManager::SendMessages() p.first.ToString(), p.second.ToString(), pnode->GetId()); msgs.emplace_back(std::move(p.second)); if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); didSend = true; } } @@ -1197,7 +1197,7 @@ bool CSigSharesManager::SendMessages() LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToInvString(), pnode->GetId()); if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); msgs.clear(); totalSigsCount = 0; didSend = true; @@ -1206,7 +1206,7 @@ bool CSigSharesManager::SendMessages() msgs.emplace_back(std::move(p.second)); } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs))); didSend = true; } } @@ -1220,13 +1220,13 @@ bool CSigSharesManager::SendMessages() p.first.ToString(), p.second.ToString(), pnode->GetId()); msgs.emplace_back(std::move(p.second)); if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); didSend = true; } } @@ -1239,13 +1239,13 @@ bool CSigSharesManager::SendMessages() sigShare.GetSignHash().ToString(), pnode->GetId()); msgs.emplace_back(std::move(sigShare)); if (msgs.size() == MAX_MSGS_SIG_SHARES) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); msgs.clear(); didSend = true; } } if (!msgs.empty()) { - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs)); didSend = true; } } diff --git a/src/net.cpp b/src/net.cpp index a04687b5b36e5..fbdfa9e7bb69f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1172,9 +1172,33 @@ void CConnman::DisconnectNodes() } // Disconnect unused nodes - std::vector vNodesCopy = vNodes; - for (CNode* pnode : vNodesCopy) { - if (pnode->fDisconnect) { + for (auto it = vNodes.begin(); it != vNodes.end(); ) + { + CNode* pnode = *it; + if (pnode->fDisconnect) + { + if (pnode->nDisconnectLingerTime == 0) { + // let's not immediately close the socket but instead wait for at least 100ms so that there is a + // chance to flush all/some pending data. Otherwise the other side might not receive REJECT messages + // that were pushed right before setting fDisconnect=true + // Flushing must happen in two places to ensure data can be received by the other side: + // 1. vSendMsg must be empty and all messages sent via send(). This is ensured by SocketHandler() + // being called before DisconnectNodes and also by the linger time + // 2. Internal socket send buffers must be flushed. This is ensured solely by the linger time + pnode->nDisconnectLingerTime = GetTimeMillis() + 100; + continue; + } else if (GetTimeMillis() < pnode->nDisconnectLingerTime) { + continue; + } + + if (fLogIPs) { + LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d fMasternode=%d\n", + pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, fMasterNode); + } else { + LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d fMasternode=%d\n", + pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, fMasterNode); + } + // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -1483,7 +1507,8 @@ void CConnman::SocketHandler() sendSet = send_set.count(pnode->hSocket) > 0; errorSet = error_set.count(pnode->hSocket) > 0; } - if (recvSet || errorSet) { + if (!pnode->fDisconnect && (recvSet || errorSet)) + { // typical socket buffer is 8K-64K char pchBuf[0x10000]; int nBytes = 0; @@ -1547,10 +1572,21 @@ void CConnman::SocketHandler() void CConnman::ThreadSocketHandler() { - while (!interruptNet) { - DisconnectNodes(); - NotifyNumConnectionsChanged(); + int64_t nLastCleanupNodes = 0; + + while (!interruptNet) + { + // Handle sockets before we do the next round of disconnects. This allows us to flush send buffers one last time + // before actually closing sockets. Receiving is however skipped in case a peer is pending to be disconnected SocketHandler(); + if (GetTimeMillis() - nLastCleanupNodes > 1000) { + ForEachNode(AllNodes, [&](CNode* pnode) { + InactivityCheck(pnode); + }); + DisconnectNodes(); + nLastCleanupNodes = GetTimeMillis(); + } + NotifyNumConnectionsChanged(); } } @@ -2750,7 +2786,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } -void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend) +void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { size_t nMessageSize = msg.data.size(); size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; @@ -2768,7 +2804,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti { LOCK(pnode->cs_vSend); bool hasPendingData = !pnode->vSendMsg.empty(); - bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty()); //log total amount of bytes per command pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; @@ -2780,11 +2815,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); - // If write queue empty, attempt "optimistic write" - if (optimisticSend == true) - nBytesSent = SocketSendData(pnode); // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - else if (!hasPendingData && wakeupSelectNeeded) + if (!hasPendingData && wakeupSelectNeeded) WakeSelect(); } if (nBytesSent) diff --git a/src/net.h b/src/net.h index 2b25df53f4dd2..5c4338b90bdf5 100644 --- a/src/net.h +++ b/src/net.h @@ -36,17 +36,8 @@ #include #endif -// "Optimistic send" was introduced in the beginning of the Bitcoin project. I assume this was done because it was -// thought that "send" would be very cheap when the send buffer is empty. This is not true, as shown by profiling. -// When a lot of load is seen on the network, the "send" call done in the message handler thread can easily use up 20% -// of time, effectively blocking things that could be done in parallel. We have introduced a way to wake up the select() -// call in the network thread, which allows us to disable optimistic send without introducing an artificial latency/delay -// when sending data. This however only works on non-WIN32 platforms for now. When we add support for WIN32 platforms, -// we can completely remove optimistic send. -#ifdef WIN32 -#define DEFAULT_ALLOW_OPTIMISTIC_SEND true -#else -#define DEFAULT_ALLOW_OPTIMISTIC_SEND false + +#ifndef WIN32 #define USE_WAKEUP_PIPE #endif @@ -224,7 +215,7 @@ class CConnman bool ForNode(NodeId id, std::function func); bool ForNode(const CService& addr, const std::function& cond, const std::function& func); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = DEFAULT_ALLOW_OPTIMISTIC_SEND); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); template bool ForEachNodeContinueIf(Callable&& func) @@ -273,6 +264,16 @@ class CConnman } }; + template + void ForEachNode(const Condition& cond, Callable&& func) const + { + LOCK(cs_vNodes); + for (auto&& node : vNodes) { + if (cond(node)) + func(node); + } + }; + template void ForEachNodeThen(Callable&& pre, CallableAfter&& post) { @@ -720,6 +721,7 @@ class CNode std::atomic_bool m_wants_addrv2{false}; std::atomic_bool fSuccessfullyConnected; std::atomic_bool fDisconnect; + std::atomic nDisconnectLingerTime{0}; // We use fRelayTxes for two purposes - // a) it allows us to not relay tx invs before receiving the peer's version message // b) the peer may tell us in their version message that we should not relay tx invs diff --git a/src/net_processing.cpp b/src/net_processing.cpp index bdda9fdf27268..b38d5103c31fc 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -191,7 +191,11 @@ class CNodeBlocks size_t maxAvg; }; - +struct CBlockReject { + unsigned char chRejectCode; + std::string strRejectReason; + uint256 hashBlock; +}; /** * Maintain validation-specific state about nodes, protected by cs_main, instead @@ -210,6 +214,8 @@ struct CNodeState { bool fShouldBan; //! String name of this peer (debugging/logging purposes). const std::string name; + //! List of asynchronously-determined block rejections to notify this peer about. + std::vector rejects; //! The best known block we know this peer has announced. const CBlockIndex* pindexBestKnownBlock; //! The hash of the last unknown block this peer has announced. @@ -2232,6 +2238,11 @@ static bool DisconnectIfBanned(CNode* pnode, CConnman* connman) AssertLockHeld(cs_main); CNodeState &state = *State(pnode->GetId()); + for (const CBlockReject& reject : state.rejects) { + connman->PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, (std::string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock)); + } + state.rejects.clear(); + if (state.fShouldBan) { state.fShouldBan = false; if (pnode->fWhitelisted) { diff --git a/src/protocol.cpp b/src/protocol.cpp index f42735c358588..64ec67ffc8359 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -18,6 +18,7 @@ namespace NetMsgType { const char* VERSION = "version"; const char* VERACK = "verack"; +const char* REJECT = "reject"; const char* ADDR = "addr"; const char* ADDRV2="addrv2"; const char* SENDADDRV2="sendaddrv2"; @@ -76,6 +77,7 @@ const char* CLSIG = "clsig"; const static std::string allNetMessageTypes[] = { NetMsgType::VERSION, NetMsgType::VERACK, + NetMsgType::REJECT, NetMsgType::ADDR, NetMsgType::ADDRV2, NetMsgType::SENDADDRV2, diff --git a/src/protocol.h b/src/protocol.h index fe50c53a8e518..2b073cb19f921 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -75,6 +75,7 @@ extern const char* VERSION; * @see https://bitcoin.org/en/developer-reference#verack */ extern const char* VERACK; +extern const char* REJECT; /** * The addr (IP address) message relays connection information for peers on the * network.