diff --git a/src/init.cpp b/src/init.cpp index c3b329a42403f4..34abe2e0f7ce31 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -242,7 +242,11 @@ void PrepareShutdown() } #endif MapPort(false); + + // Because these depend on each-other, we make sure that neither can be + // using the other before destroying them. UnregisterValidationInterface(peerLogic.get()); + g_connman->Stop(); peerLogic.reset(); if (g_connman) { // make sure to stop all threads before g_connman is reset to nullptr as these threads might still be accessing it @@ -262,7 +266,6 @@ void PrepareShutdown() flatdb6.Dump(sporkManager); } - UnregisterNodeSignals(GetNodeSignals()); if (fDumpMempoolLater && gArgs.GetArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) { DumpMempool(); } @@ -1578,7 +1581,6 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) peerLogic.reset(new PeerLogicValidation(&connman)); RegisterValidationInterface(peerLogic.get()); - RegisterNodeSignals(GetNodeSignals()); // sanitize comments per BIP-0014, format user agent and check total size std::vector uacomments; @@ -2118,6 +2120,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) connOptions.nMaxFeeler = 1; connOptions.nBestHeight = chainActive.Height(); connOptions.uiInterface = &uiInterface; + connOptions.m_msgproc = peerLogic.get(); connOptions.nSendBufferMaxSize = 1000*gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER); connOptions.nReceiveFloodSize = 1000*gArgs.GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER); diff --git a/src/net.cpp b/src/net.cpp index 0bb668ba96680a..6228800088ce76 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -99,10 +99,6 @@ std::string strSubVersion; unordered_limitedmap mapAlreadyAskedFor(MAX_INV_SZ, MAX_INV_SZ * 2); -// Signals for message handling -static CNodeSignals g_signals; -CNodeSignals& GetNodeSignals() { return g_signals; } - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -1202,7 +1198,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; - GetNodeSignals().InitializeNode(pnode, *this); + m_msgproc->InitializeNode(pnode); if (fLogIPs) { LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); @@ -2206,7 +2202,7 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai if (fConnectToMasternode) pnode->fMasternode = true; - GetNodeSignals().InitializeNode(pnode, *this); + m_msgproc->InitializeNode(pnode); { LOCK(cs_vNodes); vNodes.push_back(pnode); @@ -2233,16 +2229,16 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; - // Send messages { LOCK(pnode->cs_sendProcessing); - GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); + m_msgproc->SendMessages(pnode, flagInterruptMsgProc); } + if (flagInterruptMsgProc) return; } @@ -2565,6 +2561,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) // // Start threads // + assert(m_msgproc); InterruptSocks5(false); interruptNet.reset(); flagInterruptMsgProc = false; @@ -2735,9 +2732,10 @@ void CConnman::DeleteNode(CNode* pnode) { assert(pnode); bool fUpdateConnectionTime = false; - GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime); - if(fUpdateConnectionTime) + m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime); + if(fUpdateConnectionTime) { addrman.Connected(pnode->addr); + } delete pnode; } diff --git a/src/net.h b/src/net.h index 6b694d9ab7e269..101d57a2486c62 100644 --- a/src/net.h +++ b/src/net.h @@ -37,7 +37,6 @@ #include #endif -#include // "Optimistic send" was introduced in the beginning of the Bitcoin project. I assume this was done because it was // thought that "send" would be very cheap when the send buffer is empty. This is not true, as shown by profiling. @@ -142,7 +141,7 @@ struct CSerializedNetMsg std::string command; }; - +class NetEventsInterface; class CConnman { public: @@ -164,6 +163,7 @@ class CConnman int nMaxFeeler = 0; int nBestHeight = 0; CClientUIInterface* uiInterface = nullptr; + NetEventsInterface* m_msgproc = nullptr; unsigned int nSendBufferMaxSize = 0; unsigned int nReceiveFloodSize = 0; uint64_t nMaxOutboundTimeframe = 0; @@ -184,6 +184,7 @@ class CConnman nMaxFeeler = connOptions.nMaxFeeler; nBestHeight = connOptions.nBestHeight; clientInterface = connOptions.uiInterface; + m_msgproc = connOptions.m_msgproc; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; @@ -546,6 +547,7 @@ class CConnman int nMaxFeeler; std::atomic nBestHeight; CClientUIInterface* clientInterface; + NetEventsInterface* m_msgproc; /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; @@ -593,19 +595,18 @@ struct CombinerAll } }; -// Signals for message handling -struct CNodeSignals +/** + * Interface for message handling + */ +class NetEventsInterface { - boost::signals2::signal&), CombinerAll> ProcessMessages; - boost::signals2::signal&), CombinerAll> SendMessages; - boost::signals2::signal InitializeNode; - boost::signals2::signal FinalizeNode; +public: + virtual bool ProcessMessages(CNode* pnode, std::atomic& interrupt) = 0; + virtual bool SendMessages(CNode* pnode, std::atomic& interrupt) = 0; + virtual void InitializeNode(CNode* pnode) = 0; + virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0; }; - -CNodeSignals& GetNodeSignals(); - - enum { LOCAL_NONE, // unknown diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 06cd79d61b5603..092863b23f8521 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -161,11 +161,6 @@ namespace { std::deque> vRelayExpiration; } // namespace -////////////////////////////////////////////////////////////////////////////// -// -// Registration of network node signals. -// - namespace { struct CBlockReject { @@ -272,7 +267,7 @@ void UpdatePreferredDownload(CNode* node, CNodeState* state) nPreferredDownload += state->fPreferredDownload; } -void PushNodeVersion(CNode *pnode, CConnman& connman, int64_t nTime) +void PushNodeVersion(CNode *pnode, CConnman* connman, int64_t nTime) { ServiceFlags nLocalNodeServices = pnode->GetLocalServices(); uint64_t nonce = pnode->GetLocalNonce(); @@ -290,7 +285,7 @@ void PushNodeVersion(CNode *pnode, CConnman& connman, int64_t nTime) pnode->sentMNAuthChallenge = mnauthChallenge; } - connman.PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe, + connman->PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe, nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes, mnauthChallenge)); if (fLogIPs) { @@ -300,50 +295,6 @@ void PushNodeVersion(CNode *pnode, CConnman& connman, int64_t nTime) } } -void InitializeNode(CNode *pnode, CConnman& connman) { - CAddress addr = pnode->addr; - std::string addrName = pnode->GetAddrName(); - NodeId nodeid = pnode->GetId(); - { - LOCK(cs_main); - mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); - } - if(!pnode->fInbound) - PushNodeVersion(pnode, connman, GetTime()); -} - -void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { - fUpdateConnectionTime = false; - LOCK(cs_main); - CNodeState *state = State(nodeid); - assert(state != nullptr); - - if (state->fSyncStarted) - nSyncStarted--; - - if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { - fUpdateConnectionTime = true; - } - - for (const QueuedBlock& entry : state->vBlocksInFlight) { - mapBlocksInFlight.erase(entry.hash); - } - EraseOrphansFor(nodeid); - nPreferredDownload -= state->fPreferredDownload; - nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); - assert(nPeersWithValidatedDownloads >= 0); - - mapNodeState.erase(nodeid); - - if (mapNodeState.empty()) { - // Do a consistency check after the last peer is removed. - assert(mapBlocksInFlight.empty()); - assert(nPreferredDownload == 0); - assert(nPeersWithValidatedDownloads == 0); - } - LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); -} - // Requires cs_main. // Returns a bool indicating whether we requested this block. // Also used if a block was /not/ received and timed out or started with another peer @@ -439,7 +390,7 @@ void UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) { } } -void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connman) { +void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman* connman) { AssertLockHeld(cs_main); CNodeState* nodestate = State(nodeid); if (!nodestate || !nodestate->fSupportsDesiredCmpctVersion) { @@ -454,20 +405,20 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connman) { return; } } - connman.ForNode(nodeid, [&connman](CNode* pfrom){ + connman->ForNode(nodeid, [connman](CNode* pfrom){ bool fAnnounceUsingCMPCTBLOCK = false; uint64_t nCMPCTBLOCKVersion = 1; if (lNodesAnnouncingHeaderAndIDs.size() >= 3) { // As per BIP152, we only get 3 of our peers to announce // blocks using compact encodings. - connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){ - connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + connman->ForNode(lNodesAnnouncingHeaderAndIDs.front(), [connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){ + connman->PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); return true; }); lNodesAnnouncingHeaderAndIDs.pop_front(); } fAnnounceUsingCMPCTBLOCK = true; - connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); return true; }); @@ -576,6 +527,50 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectoraddr; + std::string addrName = pnode->GetAddrName(); + NodeId nodeid = pnode->GetId(); + { + LOCK(cs_main); + mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); + } + if(!pnode->fInbound) + PushNodeVersion(pnode, connman, GetTime()); +} + +void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { + fUpdateConnectionTime = false; + LOCK(cs_main); + CNodeState *state = State(nodeid); + assert(state != nullptr); + + if (state->fSyncStarted) + nSyncStarted--; + + if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { + fUpdateConnectionTime = true; + } + + for (const QueuedBlock& entry : state->vBlocksInFlight) { + mapBlocksInFlight.erase(entry.hash); + } + EraseOrphansFor(nodeid); + nPreferredDownload -= state->fPreferredDownload; + nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); + assert(nPeersWithValidatedDownloads >= 0); + + mapNodeState.erase(nodeid); + + if (mapNodeState.empty()) { + // Do a consistency check after the last peer is removed. + assert(mapBlocksInFlight.empty()); + assert(nPreferredDownload == 0); + assert(nPeersWithValidatedDownloads == 0); + } + LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); +} + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { LOCK(cs_main); CNodeState *state = State(nodeid); @@ -591,22 +586,6 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { return true; } -void RegisterNodeSignals(CNodeSignals& nodeSignals) -{ - nodeSignals.ProcessMessages.connect(&ProcessMessages); - nodeSignals.SendMessages.connect(&SendMessages); - nodeSignals.InitializeNode.connect(&InitializeNode); - nodeSignals.FinalizeNode.connect(&FinalizeNode); -} - -void UnregisterNodeSignals(CNodeSignals& nodeSignals) -{ - nodeSignals.ProcessMessages.disconnect(&ProcessMessages); - nodeSignals.SendMessages.disconnect(&SendMessages); - nodeSignals.InitializeNode.disconnect(&InitializeNode); - nodeSignals.FinalizeNode.disconnect(&FinalizeNode); -} - ////////////////////////////////////////////////////////////////////////////// // // mapOrphanTransactions @@ -921,7 +900,7 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationSta !IsInitialBlockDownload() && mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) { if (it != mapBlockSource.end()) { - MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, *connman); + MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, connman); } } if (it != mapBlockSource.end()) @@ -1017,7 +996,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma // Use deterministic randomness to send to the same nodes for 24 hours // at a time so the addrKnowns of the chosen nodes prevent repeats uint64_t hashAddr = addr.GetHash(); - const CSipHasher hasher = connman.GetDeterministicRandomizer(RANDOMIZER_ID_ADDRESS_RELAY).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24*60*60)); + const CSipHasher hasher = connman->GetDeterministicRandomizer(RANDOMIZER_ID_ADDRESS_RELAY).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24*60*60)); FastRandomContext insecure_rand; std::array,2> best{{{0, nullptr}, {0, nullptr}}}; @@ -1042,10 +1021,10 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma } }; - connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); + connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman& connman, const std::atomic& interruptMsgProc) +void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman* connman, const std::atomic& interruptMsgProc) { bool send = false; std::shared_ptr a_recent_block; @@ -1089,7 +1068,7 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); // disconnect node in case we have reached the outbound limit for serving historical blocks // never disconnect whitelisted nodes - if (send && connman.OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) + if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) { LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); @@ -1112,7 +1091,7 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus pblock = pblockRead; } if (inv.type == MSG_BLOCK) - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); else if (inv.type == MSG_FILTERED_BLOCK) { bool sendMerkleBlock = false; @@ -1125,7 +1104,7 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus } } if (sendMerkleBlock) { - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see // This avoids hurting performance by pointlessly requiring a round-trip // Note that there is currently no way for a node to request any single transactions we didn't send here - @@ -1134,7 +1113,7 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus // however we MUST always provide at least what the remote peer needs typedef std::pair PairType; for (PairType& pair : merkleBlock.vMatchedTxn) - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::TX, *pblock->vtx[pair.first])); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::TX, *pblock->vtx[pair.first])); } // else // no response @@ -1147,13 +1126,13 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus // instead we respond with the full, non-compact block. if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) { if (a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) { - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); } else { CBlockHeaderAndShortTxIDs cmpctblock(*pblock); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::CMPCTBLOCK, cmpctblock)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::CMPCTBLOCK, cmpctblock)); } } else { - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); } } @@ -1165,7 +1144,7 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus // wait for other stuff first. std::vector vInv; vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); pfrom->hashContinue.SetNull(); } } @@ -1199,14 +1178,14 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam if (inv.type == MSG_TX) { auto mi = mapRelay.find(inv.hash); if (mi != mapRelay.end()) { - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::TX, *mi->second)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::TX, *mi->second)); push = true; } else if (pfrom->timeLastMempoolReq) { auto txinfo = mempool.info(inv.hash); // To protect privacy, do not answer getdata using the mempool when // that TX couldn't have been INVed in reply to a MEMPOOL request. if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::TX, *txinfo.tx)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::TX, *txinfo.tx)); push = true; } } @@ -1351,11 +1330,11 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam // do that because they want to know about (and store and rebroadcast and // risk analyze) the dependencies of transactions relevant to them, without // having to download the entire memory pool. - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); } } -inline void static SendBlockTransactions(const CBlock& block, const BlockTransactionsRequest& req, CNode* pfrom, CConnman& connman) { +inline void static SendBlockTransactions(const CBlock& block, const BlockTransactionsRequest& req, CNode* pfrom, CConnman* connman) { BlockTransactions resp(req); for (size_t i = 0; i < req.indexes.size(); i++) { if (req.indexes[i] >= block.vtx.size()) { @@ -1368,10 +1347,10 @@ inline void static SendBlockTransactions(const CBlock& block, const BlockTransac } LOCK(cs_main); CNetMsgMaker msgMaker(pfrom->GetSendVersion()); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCKTXN, resp)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCKTXN, resp)); } -bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, const std::atomic& interruptMsgProc) +bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic& interruptMsgProc) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); if (gArgs.IsArgSet("-dropmessagestest") && GetRand(gArgs.GetArg("-dropmessagestest", 0)) == 0) @@ -1432,7 +1411,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // Each connection can only send one version message if (pfrom->nVersion != 0) { - connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, std::string("Duplicate version message"))); + connman->PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, std::string("Duplicate version message"))); LOCK(cs_main); Misbehaving(pfrom->GetId(), 1); return false; @@ -1456,12 +1435,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr nServices = ServiceFlags(nServiceInt); if (!pfrom->fInbound) { - connman.SetServices(pfrom->addr, nServices); + connman->SetServices(pfrom->addr, nServices); } if (pfrom->nServicesExpected & ~nServices) { LogPrint(BCLog::NET, "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->GetId(), nServices, pfrom->nServicesExpected); - connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD, + connman->PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD, strprintf("Expected to offer services %08x", pfrom->nServicesExpected))); pfrom->fDisconnect = true; return false; @@ -1471,7 +1450,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr { // disconnect from peers older than this proto version LogPrintf("peer=%d using obsolete version %i; disconnecting\n", pfrom->GetId(), nVersion); - connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE, + connman->PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE, strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION))); pfrom->fDisconnect = true; return false; @@ -1495,7 +1474,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr vRecv >> pfrom->receivedMNAuthChallenge; } // Disconnect if we connected to ourself - if (pfrom->fInbound && !connman.CheckIncomingNonce(nNonce)) + if (pfrom->fInbound && !connman->CheckIncomingNonce(nNonce)) { LogPrintf("connected to self at %s, disconnecting\n", pfrom->addr.ToString()); pfrom->fDisconnect = true; @@ -1524,7 +1503,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } } - connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERACK)); + connman->PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERACK)); pfrom->nServices = nServices; pfrom->SetAddrLocal(addrMe); @@ -1569,12 +1548,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } // Get recent addresses - if (pfrom->fOneShot || pfrom->nVersion >= CADDR_TIME_VERSION || connman.GetAddressCount() < 1000) + if (pfrom->fOneShot || pfrom->nVersion >= CADDR_TIME_VERSION || connman->GetAddressCount() < 1000) { - connman.PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR)); + connman->PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR)); pfrom->fGetAddr = true; } - connman.MarkAddressGood(pfrom->addr); + connman->MarkAddressGood(pfrom->addr); } std::string remoteAddr; @@ -1629,7 +1608,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // We send this to non-NODE NETWORK peers as well, because even // non-NODE NETWORK peers can announce blocks (such as pruning // nodes) - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDHEADERS)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDHEADERS)); } if (pfrom->nVersion >= SHORT_IDS_BLOCKS_VERSION) { @@ -1640,12 +1619,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // they may wish to request compact blocks from us bool fAnnounceUsingCMPCTBLOCK = false; uint64_t nCMPCTBLOCKVersion = 1; - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); } if (pfrom->nVersion >= SENDDSQUEUE_PROTO_VERSION) { // Tell our peer that he should send us PrivateSend queue messages - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDDSQUEUE, true)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDDSQUEUE, true)); } else { // older nodes do not support SENDDSQUEUE and expect us to always send PrivateSend queue messages // TODO we can remove this compatibility code in 0.15.0 @@ -1681,7 +1660,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr vRecv >> vAddr; // Don't want addr from older versions unless seeding - if (pfrom->nVersion < CADDR_TIME_VERSION && connman.GetAddressCount() > 1000) + if (pfrom->nVersion < CADDR_TIME_VERSION && connman->GetAddressCount() > 1000) return true; if (vAddr.size() > 1000) { @@ -1714,7 +1693,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (fReachable) vAddrOk.push_back(addr); } - connman.AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60); + connman->AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60); if (vAddr.size() < 1000) pfrom->fGetAddr = false; if (pfrom->fOneShot) @@ -1814,7 +1793,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // fell back to inv we probably have a reorg which we should get the headers for first, // we now only provide a getheaders response here. When we receive the headers, we will // then ask for the blocks we need. - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash)); LogPrint(BCLog::NET, "getheaders (%d) %s to peer=%d\n", pindexBestHeader->nHeight, inv.hash.ToString(), pfrom->GetId()); } } @@ -2046,7 +2025,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // will re-announce the new block via headers (or compact blocks again) // in the SendMessages logic. nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip(); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); } @@ -2269,7 +2248,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->GetId(), FormatStateMessage(state)); if (state.GetRejectCode() > 0 && state.GetRejectCode() < REJECT_INTERNAL) // Never send AcceptToMemoryPool's internal codes over P2P - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(), + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(), state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash)); if (nDoS > 0) { Misbehaving(pfrom->GetId(), nDoS); @@ -2288,7 +2267,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (mapBlockIndex.find(cmpctblock.header.hashPrevBlock) == mapBlockIndex.end()) { // Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers if (!IsInitialBlockDownload()) - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256())); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256())); return true; } } @@ -2343,7 +2322,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // so we just grab the block via normal getdata std::vector vInv(1); vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); } return true; } @@ -2381,7 +2360,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // Duplicate txindexes, the block is now in-flight, so just request it std::vector vInv(1); vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); return true; } @@ -2398,7 +2377,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr fProcessBLOCKTXN = true; } else { req.blockhash = pindex->GetBlockHash(); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); } } else { // This block is either already in flight from a different @@ -2424,7 +2403,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // mempool will probably be useless - request the block normally std::vector vInv(1); vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); return true; } else { // If this was an announce-cmpctblock, we want the same treatment as a header message @@ -2498,7 +2477,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // Might have collided, fall back to getdata now :( std::vector invs; invs.push_back(CInv(MSG_BLOCK, resp.blockhash)); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); } else { // Block is either okay, or possibly we received // READ_STATUS_CHECKBLOCK_FAILED. @@ -2579,7 +2558,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // nUnconnectingHeaders gets reset back to 0. if (mapBlockIndex.find(headers[0].hashPrevBlock) == mapBlockIndex.end() && nCount < MAX_BLOCKS_TO_ANNOUNCE) { nodestate->nUnconnectingHeaders++; - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256())); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256())); LogPrint(BCLog::NET, "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n", headers[0].GetHash().ToString(), headers[0].hashPrevBlock.ToString(), @@ -2634,7 +2613,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // TODO: optimize: if pindexLast is an ancestor of chainActive.Tip or pindexBestHeader, continue // from there instead. LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom->GetId(), pfrom->nStartingHeight); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256())); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256())); } bool fCanDirectFetch = CanDirectFetch(chainparams.GetConsensus()); @@ -2682,7 +2661,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // In any case, we want to download using a compact block, not a regular one vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash); } - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vGetData)); } } } @@ -2743,7 +2722,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->fSentAddr = true; pfrom->vAddrToSend.clear(); - std::vector vAddr = connman.GetAddresses(); + std::vector vAddr = connman->GetAddresses(); FastRandomContext insecure_rand; for (const CAddress &addr : vAddr) pfrom->PushAddress(addr, insecure_rand); @@ -2759,7 +2738,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - if (connman.OutboundTargetReached(false) && !pfrom->fWhitelisted) + if (connman->OutboundTargetReached(false) && !pfrom->fWhitelisted) { LogPrint(BCLog::NET, "mempool request with bandwidth limit reached, disconnect peer=%d\n", pfrom->GetId()); pfrom->fDisconnect = true; @@ -2788,7 +2767,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // it, if the remote node sends a ping once per second and this node takes 5 // seconds to respond to each, the 5th ping the remote sends would appear to // return very quickly. - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::PONG, nonce)); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::PONG, nonce)); } } @@ -2976,13 +2955,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } -static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman& connman) +static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman) { AssertLockHeld(cs_main); CNodeState &state = *State(pnode->GetId()); for (const CBlockReject& reject : state.rejects) { - connman.PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, (std::string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock)); + connman->PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, (std::string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock)); } state.rejects.clear(); @@ -2998,7 +2977,7 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman& connman) LogPrintf("Warning: not banning local peer %s!\n", pnode->GetLogString()); else { - connman.Ban(pnode->addr, BanReasonNodeMisbehaving); + connman->Ban(pnode->addr, BanReasonNodeMisbehaving); } } return true; @@ -3006,7 +2985,7 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman& connman) return false; } -bool ProcessMessages(CNode* pfrom, CConnman& connman, const std::atomic& interruptMsgProc) +bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); // @@ -3040,7 +3019,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, const std::atomic& i // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; - pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman->GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty(); } CNetMessage& msg(msgs.front()); @@ -3089,7 +3068,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, const std::atomic& i } catch (const std::ios_base::failure& e) { - connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, std::string("error parsing message"))); + connman->PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, std::string("error parsing message"))); if (strstr(e.what(), "end of data")) { // Allow exceptions from under-length message on vRecv @@ -3140,7 +3119,7 @@ class CompareInvMempoolOrder } }; -bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interruptMsgProc) +bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { @@ -3172,11 +3151,11 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr pto->nPingUsecStart = GetTimeMicros(); if (pto->nVersion > BIP0031_VERSION) { pto->nPingNonceSent = nonce; - connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); } else { // Peer is too old to support ping command with nonce, pong will never arrive. pto->nPingNonceSent = 0; - connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING)); } } @@ -3211,14 +3190,14 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr // receiver rejects addr messages larger than 1000 if (vAddr.size() >= 1000) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); vAddr.clear(); } } } pto->vAddrToSend.clear(); if (!vAddr.empty()) - connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); // we only send the big addr message once if (pto->vAddrToSend.capacity() > 40) pto->vAddrToSend.shrink_to_fit(); @@ -3245,7 +3224,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr if (pindexStart->pprev) pindexStart = pindexStart->pprev; LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight); - connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256())); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256())); } } @@ -3254,7 +3233,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr // transactions become unconfirmed and spams other nodes. if (!fReindex && !fImporting && !IsInitialBlockDownload()) { - GetMainSignals().Broadcast(nTimeBestReceived, &connman); + GetMainSignals().Broadcast(nTimeBestReceived, connman); } // @@ -3344,7 +3323,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr { LOCK(cs_most_recent_block); if (most_recent_block_hash == pBestIndex->GetBlockHash()) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); fGotBlockFromCache = true; } } @@ -3353,7 +3332,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams); assert(ret); CBlockHeaderAndShortTxIDs cmpctblock(block); - connman.PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, cmpctblock)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, cmpctblock)); } state.pindexBestHeaderSent = pBestIndex; } else if (state.fPreferHeaders) { @@ -3366,7 +3345,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr LogPrint(BCLog::NET, "%s: sending header %s to peer=%d\n", __func__, vHeaders.front().GetHash().ToString(), pto->GetId()); } - connman.PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); state.pindexBestHeaderSent = pBestIndex; } else fRevertToInv = true; @@ -3412,7 +3391,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr for (const uint256& hash : pto->vInventoryBlockToSend) { vInv.push_back(CInv(MSG_BLOCK, hash)); if (vInv.size() == MAX_INV_SZ) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } } @@ -3455,7 +3434,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr vInv.push_back(inv); if (vInv.size() == MAX_INV_SZ) { LogPrint(BCLog::NET, "SendMessages -- pushing inv's: count=%d peer=%d\n", vInv.size(), pto->GetId()); - connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } } @@ -3513,7 +3492,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr } } if (vInv.size() == MAX_INV_SZ) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } pto->filterInventoryKnown.insert(hash); @@ -3535,7 +3514,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr pto->vInventoryOtherToSend.clear(); } if (!vInv.empty()) - connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); // Detect whether we're stalling nNow = GetTimeMicros(); @@ -3631,7 +3610,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr vGetData.push_back(inv); if (vGetData.size() >= 1000) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); LogPrint(BCLog::NET, "SendMessages -- GETDATA -- pushed size = %lu peer=%d\n", vGetData.size(), pto->GetId()); vGetData.clear(); } @@ -3644,7 +3623,7 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr } pto->vecAskFor.erase(pto->vecAskFor.begin(), it); if (!vGetData.empty()) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); LogPrint(BCLog::NET, "SendMessages -- GETDATA -- pushed size = %lu peer=%d\n", vGetData.size(), pto->GetId()); } diff --git a/src/net_processing.h b/src/net_processing.h index 65ddf12fe6d189..ddd051a0023a2f 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -24,22 +24,31 @@ static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER = 1000; // 1ms/head /** Default number of orphan+recently-replaced txn to keep around for block reconstruction */ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100; -/** Register with a network node to receive its signals */ -void RegisterNodeSignals(CNodeSignals& nodeSignals); -/** Unregister a network node */ -void UnregisterNodeSignals(CNodeSignals& nodeSignals); - class PeerLogicValidation : public CValidationInterface { private: CConnman* connman; public: - explicit PeerLogicValidation(CConnman* connmanIn); + explicit PeerLogicValidation(CConnman* connman); void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected, const std::vector& vtxConflicted) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void BlockChecked(const CBlock& block, const CValidationState& state) override; void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr& pblock) override; + + + void InitializeNode(CNode* pnode) override; + void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override; + /** Process protocol messages received from a given node */ + bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override; + /** + * Send queued protocol messages to be sent to a give node. + * + * @param[in] pto The node which we are sending messages to. + * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done + */ + bool SendMessages(CNode* pto, std::atomic& interrupt) override; }; struct CNodeStateStats { @@ -55,16 +64,4 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); void Misbehaving(NodeId nodeid, int howmuch); bool IsBanned(NodeId nodeid); -/** Process protocol messages received from a given node */ -bool ProcessMessages(CNode* pfrom, CConnman& connman, const std::atomic& interrupt); -/** - * Send queued protocol messages to be sent to a give node. - * - * @param[in] pto The node which we are sending messages to. - * @param[in] connman The connection manager for that node. - * @param[in] interrupt Interrupt condition for processing threads - * @return True if there is more work to be done - */ -bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interrupt); - #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index 961b18c45425e5..ac898466440c9d 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -50,26 +50,26 @@ BOOST_AUTO_TEST_CASE(DoS_banning) CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, CAddress(), "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode1, *connman); + peerLogic->InitializeNode(&dummyNode1); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; Misbehaving(dummyNode1.GetId(), 100); // Should get banned - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned CAddress addr2(ip(0xa0b0c002), NODE_NONE); CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, CAddress(), "", true); dummyNode2.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode2, *connman); + peerLogic->InitializeNode(&dummyNode2); dummyNode2.nVersion = 1; dummyNode2.fSuccessfullyConnected = true; Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode2, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode2, interruptDummy); BOOST_CHECK(connman->IsBanned(addr2)); } @@ -82,17 +82,17 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, CAddress(), "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode1, *connman); + peerLogic->InitializeNode(&dummyNode1); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; Misbehaving(dummyNode1.GetId(), 100); - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 10); - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 1); - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); gArgs.ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); } @@ -108,12 +108,12 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) CAddress addr(ip(0xa0b0c001), NODE_NONE); CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, CAddress(), "", true); dummyNode.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode, *connman); + peerLogic->InitializeNode(&dummyNode); dummyNode.nVersion = 1; dummyNode.fSuccessfullyConnected = true; Misbehaving(dummyNode.GetId(), 100); - SendMessages(&dummyNode, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode, interruptDummy); BOOST_CHECK(connman->IsBanned(addr)); SetMockTime(nStartTime+60*60); diff --git a/src/test/test_dash.cpp b/src/test/test_dash.cpp index e7054324a7b96a..7d002341226df2 100644 --- a/src/test/test_dash.cpp +++ b/src/test/test_dash.cpp @@ -59,7 +59,6 @@ BasicTestingSetup::~BasicTestingSetup() delete evoDb; ECC_Stop(); - g_connman.reset(); } TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(chainName) @@ -96,17 +95,18 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha nScriptCheckThreads = 3; for (int i=0; i < nScriptCheckThreads-1; i++) threadGroup.create_thread(&ThreadScriptCheck); - RegisterNodeSignals(GetNodeSignals()); + peerLogic.reset(new PeerLogicValidation(connman)); } TestingSetup::~TestingSetup() { - UnregisterNodeSignals(GetNodeSignals()); llmq::InterruptLLMQSystem(); threadGroup.interrupt_all(); threadGroup.join_all(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); + g_connman.reset(); + peerLogic.reset(); UnloadBlockIndex(); delete pcoinsTip; llmq::DestroyLLMQSystem(); diff --git a/src/test/test_dash.h b/src/test/test_dash.h index c0eb3a4103461f..87cede97883ec3 100644 --- a/src/test/test_dash.h +++ b/src/test/test_dash.h @@ -50,12 +50,14 @@ struct BasicTestingSetup { * Included are data directory, coins database, script check threads setup. */ class CConnman; +class PeerLogicValidation; struct TestingSetup: public BasicTestingSetup { CCoinsViewDB *pcoinsdbview; fs::path pathTemp; boost::thread_group threadGroup; CConnman* connman; CScheduler scheduler; + std::unique_ptr peerLogic; explicit TestingSetup(const std::string& chainName = CBaseChainParams::MAIN); ~TestingSetup();