Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ void CSigSharesManager::SendMessages()
CollectSigSharesToAnnounce(sigSharesToAnnounce);
}

bool didSend = false;

g_connman->ForEachNode([&](CNode* pnode) {
CNetMsgMaker msgMaker(pnode->GetSendVersion());

Expand All @@ -905,7 +907,8 @@ void CSigSharesManager::SendMessages()
assert(p.second.CountSet() != 0);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n",
p.second.ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second), false);
didSend = true;
}
}

Expand All @@ -915,7 +918,8 @@ void CSigSharesManager::SendMessages()
assert(!p.second.sigShares.empty());
LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n",
p.second.ToInv().ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second), false);
didSend = true;
}
}

Expand All @@ -925,12 +929,17 @@ void CSigSharesManager::SendMessages()
assert(p.second.CountSet() != 0);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n",
p.second.ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second));
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second), false);
didSend = true;
}
}

return true;
});

if (didSend) {
g_connman->WakeSelect();
}
}

void CSigSharesManager::Cleanup()
Expand Down
67 changes: 65 additions & 2 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,18 @@ void CConnman::ThreadSocketHandler()
SOCKET hSocketMax = 0;
bool have_fds = false;

#ifndef WIN32
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is available for sending and at the same time optimistic sending was disabled
// when pushing the data.
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
FD_SET(wakeupPipe[0], &fdsetRecv);
hSocketMax = std::max(hSocketMax, (SOCKET)wakeupPipe[0]);
have_fds = true;
#endif

BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
Expand Down Expand Up @@ -1276,6 +1288,20 @@ void CConnman::ThreadSocketHandler()
return;
}

#ifndef WIN32
// drain the wakeup pipe
if (FD_ISSET(wakeupPipe[0], &fdsetRecv)) {
LogPrint("net", "woke up select()\n");
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
}
#endif

//
// Accept new connections
//
Expand Down Expand Up @@ -1426,6 +1452,21 @@ void CConnman::WakeMessageHandler()
condMsgProc.notify_one();
}

void CConnman::WakeSelect()
{
#ifndef WIN32
if (wakeupPipe[1] == -1) {
return;
}

LogPrint("net", "waking up select()\n");

char buf[1];
if (write(wakeupPipe[1], buf, 1) != 1) {
LogPrint("net", "write to wakeupPipe failed\n");
}
#endif
}



Expand Down Expand Up @@ -2387,6 +2428,22 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
fMsgProcWake = false;
}

#ifndef WIN32
if (pipe(wakeupPipe) != 0) {
wakeupPipe[0] = wakeupPipe[1] = -1;
LogPrint("net", "pipe() for wakeupPipe failed\n");
} else {
int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0);
if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
fFlags = fcntl(wakeupPipe[1], F_GETFL, 0);
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
}
#endif

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

Expand Down Expand Up @@ -2512,6 +2569,12 @@ void CConnman::Stop()
semAddnode = NULL;
delete semMasternodeOutbound;
semMasternodeOutbound = NULL;

#ifndef WIN32
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;
#endif
}

void CConnman::DeleteNode(CNode* pnode)
Expand Down Expand Up @@ -3054,7 +3117,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode)
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
}

void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend)
{
size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
Expand All @@ -3071,7 +3134,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
bool optimisticSend(pnode->vSendMsg.empty());
bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty());

//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
Expand Down
9 changes: 8 additions & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class CConnman

bool IsMasternodeOrDisconnectRequested(const CService& addr);

void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true);

template<typename Condition, typename Callable>
bool ForEachNodeContinueIf(const Condition& cond, Callable&& func)
Expand Down Expand Up @@ -408,6 +408,8 @@ class CConnman
unsigned int GetReceiveFloodSize() const;

void WakeMessageHandler();
void WakeSelect();

private:
struct ListenSocket {
SOCKET socket;
Expand Down Expand Up @@ -525,6 +527,11 @@ class CConnman

CThreadInterrupt interruptNet;

#ifndef WIN32
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1,-1};
#endif

std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
Expand Down