Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1157,13 +1157,13 @@ bool CSigSharesManager::SendMessages()
llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId());
msgs.emplace_back(sigSesAnn);
if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs));
didSend = true;
}
}
Expand All @@ -1177,13 +1177,13 @@ bool CSigSharesManager::SendMessages()
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
didSend = true;
}
}
Expand All @@ -1197,7 +1197,7 @@ bool CSigSharesManager::SendMessages()
LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToInvString(), pnode->GetId());
if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs));
msgs.clear();
totalSigsCount = 0;
didSend = true;
Expand All @@ -1206,7 +1206,7 @@ bool CSigSharesManager::SendMessages()
msgs.emplace_back(std::move(p.second));
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)));
didSend = true;
}
}
Expand All @@ -1220,13 +1220,13 @@ bool CSigSharesManager::SendMessages()
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
didSend = true;
}
}
Expand All @@ -1239,13 +1239,13 @@ bool CSigSharesManager::SendMessages()
sigShare.GetSignHash().ToString(), pnode->GetId());
msgs.emplace_back(std::move(sigShare));
if (msgs.size() == MAX_MSGS_SIG_SHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs));
didSend = true;
}
}
Expand Down
58 changes: 45 additions & 13 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,9 +1172,33 @@ void CConnman::DisconnectNodes()
}

// Disconnect unused nodes
std::vector<CNode*> vNodesCopy = vNodes;
for (CNode* pnode : vNodesCopy) {
if (pnode->fDisconnect) {
for (auto it = vNodes.begin(); it != vNodes.end(); )
{
CNode* pnode = *it;
if (pnode->fDisconnect)
{
if (pnode->nDisconnectLingerTime == 0) {
// let's not immediately close the socket but instead wait for at least 100ms so that there is a
// chance to flush all/some pending data. Otherwise the other side might not receive REJECT messages
// that were pushed right before setting fDisconnect=true
// Flushing must happen in two places to ensure data can be received by the other side:
// 1. vSendMsg must be empty and all messages sent via send(). This is ensured by SocketHandler()
// being called before DisconnectNodes and also by the linger time
// 2. Internal socket send buffers must be flushed. This is ensured solely by the linger time
pnode->nDisconnectLingerTime = GetTimeMillis() + 100;
continue;
} else if (GetTimeMillis() < pnode->nDisconnectLingerTime) {
continue;
}

if (fLogIPs) {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d fMasternode=%d\n",
pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, fMasterNode);
} else {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d fMasternode=%d\n",
pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, fMasterNode);
}

// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());

Expand Down Expand Up @@ -1483,7 +1507,8 @@ void CConnman::SocketHandler()
sendSet = send_set.count(pnode->hSocket) > 0;
errorSet = error_set.count(pnode->hSocket) > 0;
}
if (recvSet || errorSet) {
if (!pnode->fDisconnect && (recvSet || errorSet))
{
// typical socket buffer is 8K-64K
char pchBuf[0x10000];
int nBytes = 0;
Expand Down Expand Up @@ -1547,10 +1572,21 @@ void CConnman::SocketHandler()

void CConnman::ThreadSocketHandler()
{
while (!interruptNet) {
DisconnectNodes();
NotifyNumConnectionsChanged();
int64_t nLastCleanupNodes = 0;

while (!interruptNet)
{
// Handle sockets before we do the next round of disconnects. This allows us to flush send buffers one last time
// before actually closing sockets. Receiving is however skipped in case a peer is pending to be disconnected
SocketHandler();
if (GetTimeMillis() - nLastCleanupNodes > 1000) {
ForEachNode(AllNodes, [&](CNode* pnode) {
InactivityCheck(pnode);
});
DisconnectNodes();
nLastCleanupNodes = GetTimeMillis();
}
NotifyNumConnectionsChanged();
}
}

Expand Down Expand Up @@ -2750,7 +2786,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode)
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
}

void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
Expand All @@ -2768,7 +2804,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti
{
LOCK(pnode->cs_vSend);
bool hasPendingData = !pnode->vSendMsg.empty();
bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty());

//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
Expand All @@ -2780,11 +2815,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOpti
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));

// If write queue empty, attempt "optimistic write"
if (optimisticSend == true)
nBytesSent = SocketSendData(pnode);
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
else if (!hasPendingData && wakeupSelectNeeded)
if (!hasPendingData && wakeupSelectNeeded)
WakeSelect();
}
if (nBytesSent)
Expand Down
26 changes: 14 additions & 12 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,8 @@
#include <arpa/inet.h>
#endif

// "Optimistic send" was introduced in the beginning of the Bitcoin project. I assume this was done because it was
// thought that "send" would be very cheap when the send buffer is empty. This is not true, as shown by profiling.
// When a lot of load is seen on the network, the "send" call done in the message handler thread can easily use up 20%
// of time, effectively blocking things that could be done in parallel. We have introduced a way to wake up the select()
// call in the network thread, which allows us to disable optimistic send without introducing an artificial latency/delay
// when sending data. This however only works on non-WIN32 platforms for now. When we add support for WIN32 platforms,
// we can completely remove optimistic send.
#ifdef WIN32
#define DEFAULT_ALLOW_OPTIMISTIC_SEND true
#else
#define DEFAULT_ALLOW_OPTIMISTIC_SEND false

#ifndef WIN32
#define USE_WAKEUP_PIPE
#endif

Expand Down Expand Up @@ -224,7 +215,7 @@ class CConnman
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
bool ForNode(const CService& addr, const std::function<bool(const CNode* pnode)>& cond, const std::function<bool(CNode* pnode)>& func);

void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = DEFAULT_ALLOW_OPTIMISTIC_SEND);
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);

template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
Expand Down Expand Up @@ -273,6 +264,16 @@ class CConnman
}
};

template<typename Condition, typename Callable>
void ForEachNode(const Condition& cond, Callable&& func) const
{
LOCK(cs_vNodes);
for (auto&& node : vNodes) {
if (cond(node))
func(node);
}
};

template<typename Callable, typename CallableAfter>
void ForEachNodeThen(Callable&& pre, CallableAfter&& post)
{
Expand Down Expand Up @@ -720,6 +721,7 @@ class CNode
std::atomic_bool m_wants_addrv2{false};
std::atomic_bool fSuccessfullyConnected;
std::atomic_bool fDisconnect;
std::atomic<int64_t> nDisconnectLingerTime{0};
// We use fRelayTxes for two purposes -
// a) it allows us to not relay tx invs before receiving the peer's version message
// b) the peer may tell us in their version message that we should not relay tx invs
Expand Down
13 changes: 12 additions & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ class CNodeBlocks
size_t maxAvg;
};


struct CBlockReject {
unsigned char chRejectCode;
std::string strRejectReason;
uint256 hashBlock;
};

/**
* Maintain validation-specific state about nodes, protected by cs_main, instead
Expand All @@ -210,6 +214,8 @@ struct CNodeState {
bool fShouldBan;
//! String name of this peer (debugging/logging purposes).
const std::string name;
//! List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
//! The best known block we know this peer has announced.
const CBlockIndex* pindexBestKnownBlock;
//! The hash of the last unknown block this peer has announced.
Expand Down Expand Up @@ -2232,6 +2238,11 @@ static bool DisconnectIfBanned(CNode* pnode, CConnman* connman)
AssertLockHeld(cs_main);
CNodeState &state = *State(pnode->GetId());

for (const CBlockReject& reject : state.rejects) {
connman->PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, (std::string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock));
}
state.rejects.clear();

if (state.fShouldBan) {
state.fShouldBan = false;
if (pnode->fWhitelisted) {
Expand Down
2 changes: 2 additions & 0 deletions src/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace NetMsgType
{
const char* VERSION = "version";
const char* VERACK = "verack";
const char* REJECT = "reject";
const char* ADDR = "addr";
const char* ADDRV2="addrv2";
const char* SENDADDRV2="sendaddrv2";
Expand Down Expand Up @@ -76,6 +77,7 @@ const char* CLSIG = "clsig";
const static std::string allNetMessageTypes[] = {
NetMsgType::VERSION,
NetMsgType::VERACK,
NetMsgType::REJECT,
NetMsgType::ADDR,
NetMsgType::ADDRV2,
NetMsgType::SENDADDRV2,
Expand Down
1 change: 1 addition & 0 deletions src/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ extern const char* VERSION;
* @see https://bitcoin.org/en/developer-reference#verack
*/
extern const char* VERACK;
extern const char* REJECT;
/**
* The addr (IP address) message relays connection information for peers on the
* network.
Expand Down
Loading