Skip to content

Commit

Permalink
Merge pull request #3450 from codablock/pr_fix_3
Browse files Browse the repository at this point in the history
Fix handling of disconnected lingering nodes
  • Loading branch information
codablock authored Apr 22, 2020
2 parents 96faa81 + d6b69db commit 7c4ebe4
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ typedef unsigned int SOCKET;
#define WSAENOTSOCK EBADF
#define INVALID_SOCKET (SOCKET)(~0)
#define SOCKET_ERROR -1
#define SD_SEND SHUT_WR
#endif

#ifdef WIN32
Expand Down
86 changes: 55 additions & 31 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,43 +323,55 @@ bool IsReachable(const CNetAddr& addr)
}


CNode* CConnman::FindNode(const CNetAddr& ip)
CNode* CConnman::FindNode(const CNetAddr& ip, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
if ((CNetAddr)pnode->addr == ip) {
return pnode;
}
}
return nullptr;
}

CNode* CConnman::FindNode(const CSubNet& subNet)
CNode* CConnman::FindNode(const CSubNet& subNet, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
if (subNet.Match((CNetAddr)pnode->addr)) {
return pnode;
}
}
return nullptr;
}

CNode* CConnman::FindNode(const std::string& addrName)
CNode* CConnman::FindNode(const std::string& addrName, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
if (pnode->GetAddrName() == addrName) {
return pnode;
}
}
return nullptr;
}

CNode* CConnman::FindNode(const CService& addr)
CNode* CConnman::FindNode(const CService& addr, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
if ((CService)pnode->addr == addr) {
return pnode;
}
Expand Down Expand Up @@ -1263,7 +1275,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::DisconnectNodes()
{
{
LOCK2(cs_vNodes, cs_vNodesDisconnected);
LOCK(cs_vNodes);

if (!fNetworkActive) {
// Disconnect any connected nodes
Expand All @@ -1281,18 +1293,34 @@ void CConnman::DisconnectNodes()
CNode* pnode = *it;
if (pnode->fDisconnect)
{
if (pnode->nDisconnectLingerTime == 0) {
// let's not immediately close the socket but instead wait for at least 100ms so that there is a
// chance to flush all/some pending data. Otherwise the other side might not receive REJECT messages
// that were pushed right before setting fDisconnect=true
// Flushing must happen in two places to ensure data can be received by the other side:
// 1. vSendMsg must be empty and all messages sent via send(). This is ensured by SocketHandler()
// being called before DisconnectNodes and also by the linger time
// 2. Internal socket send buffers must be flushed. This is ensured solely by the linger time
pnode->nDisconnectLingerTime = GetTimeMillis() + 100;
continue;
} else if (GetTimeMillis() < pnode->nDisconnectLingerTime) {
continue;
// If we were the ones who initiated the disconnect, we must assume that the other side wants to see
// pending messages. If the other side initiated the disconnect (or disconnected after we've shutdown
// the socket), we can be pretty sure that they are not interested in any pending messages anymore and
// thus can immediately close the socket.
if (!pnode->fOtherSideDisconnected) {
if (pnode->nDisconnectLingerTime == 0) {
// let's not immediately close the socket but instead wait for at least 100ms so that there is a
// chance to flush all/some pending data. Otherwise the other side might not receive REJECT messages
// that were pushed right before setting fDisconnect=true
// Flushing must happen in two places to ensure data can be received by the other side:
// 1. vSendMsg must be empty and all messages sent via send(). This is ensured by SocketHandler()
// being called before DisconnectNodes and also by the linger time
// 2. Internal socket send buffers must be flushed. This is ensured solely by the linger time
pnode->nDisconnectLingerTime = GetTimeMillis() + 100;
}
if (GetTimeMillis() < pnode->nDisconnectLingerTime) {
// everything flushed to the kernel?
if (!pnode->fSocketShutdown && pnode->nSendMsgSize == 0) {
LOCK(pnode->cs_hSocket);
if (pnode->hSocket != INVALID_SOCKET) {
// Give the other side a chance to detect the disconnect as early as possible (recv() will return 0)
::shutdown(pnode->hSocket, SD_SEND);
}
pnode->fSocketShutdown = true;
}
++it;
continue;
}
}

if (fLogIPs) {
Expand Down Expand Up @@ -1320,10 +1348,7 @@ void CConnman::DisconnectNodes()
}
}
}
std::vector<CNode*> vNodesToDelete;
{
LOCK(cs_vNodesDisconnected);

// Delete disconnected nodes
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
for (auto it = vNodesDisconnected.begin(); it != vNodesDisconnected.end(); )
Expand All @@ -1343,18 +1368,14 @@ void CConnman::DisconnectNodes()
}
if (fDelete) {
it = vNodesDisconnected.erase(it);
vNodesToDelete.emplace_back(pnode);
DeleteNode(pnode);
}
}
if (!fDelete) {
++it;
}
}
}
// Call DeleteNode without any locks held
for (auto pnode : vNodesToDelete) {
DeleteNode(pnode);
}
}

void CConnman::NotifyNumConnectionsChanged()
Expand Down Expand Up @@ -1816,6 +1837,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
LogPrint(BCLog::NET, "socket closed\n");
}
LOCK(cs_vNodes);
pnode->fOtherSideDisconnected = true; // avoid lingering
pnode->CloseSocketDisconnect(this);
}
else if (nBytes < 0)
Expand All @@ -1827,6 +1849,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
if (!pnode->fDisconnect)
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
LOCK(cs_vNodes);
pnode->fOtherSideDisconnected = true; // avoid lingering
pnode->CloseSocketDisconnect(this);
}
}
Expand All @@ -1849,9 +1872,9 @@ void CConnman::ThreadSocketHandler()
ForEachNode(AllNodes, [&](CNode* pnode) {
InactivityCheck(pnode);
});
DisconnectNodes();
nLastCleanupNodes = GetTimeMillis();
}
DisconnectNodes();
NotifyNumConnectionsChanged();
}
}
Expand Down Expand Up @@ -2567,9 +2590,6 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
if (!fNetworkActive) {
return;
}
// Ensure nodes with fDisconnect==true are actually disconnected and evicted, otherwise we might end up finding that
// node here when we're re-connecting, which would cause OpenNetworkConnection to bail out
DisconnectNodes();
if (!pszDest) {
// banned or exact match?
if (IsBanned(addrConnect) || FindNode(addrConnect.ToStringIPPort()))
Expand Down Expand Up @@ -3286,11 +3306,12 @@ void CConnman::AddPendingProbeConnections(const std::set<uint256> &proTxHashes)
size_t CConnman::GetNodeCount(NumConnections flags)
{
LOCK(cs_vNodes);
if (flags == CConnman::CONNECTIONS_ALL) // Shortcut if we want total
return vNodes.size();

int nNum = 0;
for (const auto& pnode : vNodes) {
if (pnode->fDisconnect) {
continue;
}
if (flags & (pnode->fInbound ? CONNECTIONS_IN : CONNECTIONS_OUT)) {
nNum++;
}
Expand All @@ -3310,6 +3331,9 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats)
LOCK(cs_vNodes);
vstats.reserve(vNodes.size());
for (CNode* pnode : vNodes) {
if (pnode->fDisconnect) {
continue;
}
vstats.emplace_back();
pnode->copyStats(vstats.back());
}
Expand Down
13 changes: 7 additions & 6 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ friend class CNode;

void WakeMessageHandler();
void WakeSelect();
void DisconnectNodes();

/** Attempts to obfuscate tx time through exponentially distributed emitting.
Works assuming that a single interval is used.
Expand All @@ -482,6 +481,7 @@ friend class CNode;
void ThreadOpenConnections(std::vector<std::string> connect);
void ThreadMessageHandler();
void AcceptConnection(const ListenSocket& hListenSocket);
void DisconnectNodes();
void NotifyNumConnectionsChanged();
void InactivityCheck(CNode *pnode);
bool GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
Expand All @@ -497,10 +497,10 @@ friend class CNode;

uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;

CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CSubNet& subNet);
CNode* FindNode(const std::string& addrName);
CNode* FindNode(const CService& addr);
CNode* FindNode(const CNetAddr& ip, bool fExcludeDisconnecting = true);
CNode* FindNode(const CSubNet& subNet, bool fExcludeDisconnecting = true);
CNode* FindNode(const std::string& addrName, bool fExcludeDisconnecting = true);
CNode* FindNode(const CService& addr, bool fExcludeDisconnecting = true);

bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest = nullptr, bool fCountFailure = false);
Expand Down Expand Up @@ -567,7 +567,6 @@ friend class CNode;
std::list<CNode*> vNodesDisconnected;
std::unordered_map<SOCKET, CNode*> mapSocketToNode;
mutable CCriticalSection cs_vNodes;
mutable CCriticalSection cs_vNodesDisconnected;
std::atomic<NodeId> nLastNodeId;
unsigned int nPrevNodeCount;

Expand Down Expand Up @@ -841,6 +840,8 @@ class CNode
std::atomic_bool fSuccessfullyConnected;
std::atomic_bool fDisconnect;
std::atomic<int64_t> nDisconnectLingerTime{0};
std::atomic_bool fSocketShutdown{false};
std::atomic_bool fOtherSideDisconnected { false };
// We use fRelayTxes for two purposes -
// a) it allows us to not relay tx invs before receiving the peer's version message
// b) the peer may tell us in its version message that we should not relay tx invs
Expand Down
7 changes: 0 additions & 7 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ UniValue getconnectioncount(const JSONRPCRequest& request)
if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");

g_connman->DisconnectNodes();
return (int)g_connman->GetNodeCount(CConnman::CONNECTIONS_ALL);
}

Expand Down Expand Up @@ -127,8 +126,6 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");

g_connman->DisconnectNodes();

std::vector<CNodeStats> vstats;
g_connman->GetNodeStats(vstats);

Expand Down Expand Up @@ -470,10 +467,6 @@ UniValue getnetworkinfo(const JSONRPCRequest& request)
+ HelpExampleRpc("getnetworkinfo", "")
);

if (g_connman) {
g_connman->DisconnectNodes();
}

LOCK(cs_main);
UniValue obj(UniValue::VOBJ);
obj.push_back(Pair("version", CLIENT_VERSION));
Expand Down
2 changes: 1 addition & 1 deletion test/functional/p2p-timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def run_test(self):
no_verack_node.send_message(msg_ping())
no_version_node.send_message(msg_ping())

sleep(31)
sleep(32)

assert not no_verack_node.connected
assert not no_version_node.connected
Expand Down

0 comments on commit 7c4ebe4

Please sign in to comment.