Skip to content

Commit

Permalink
net: use an interface class rather than signals for message processing
Browse files Browse the repository at this point in the history
Drop boost signals in favor of a stateful class. This will allow the message
processing loop to actually move to net_processing in a future step.

Github-Pull: bitcoin#10756
Rebased-From: 8ad663c
  • Loading branch information
theuni authored and MarcoFalke committed Nov 2, 2017
1 parent 8aee55a commit dc897e5
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 125 deletions.
5 changes: 2 additions & 3 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,10 @@ void Shutdown()
#endif
MapPort(false);
UnregisterValidationInterface(peerLogic.get());
peerLogic.reset();
g_connman.reset();
peerLogic.reset();

StopTorControl();
UnregisterNodeSignals(GetNodeSignals());
if (fDumpMempoolLater && gArgs.GetArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) {
DumpMempool();
}
Expand Down Expand Up @@ -1277,7 +1276,6 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)

peerLogic.reset(new PeerLogicValidation(&connman));
RegisterValidationInterface(peerLogic.get());
RegisterNodeSignals(GetNodeSignals());

// sanitize comments per BIP-0014, format user agent and check total size
std::vector<std::string> uacomments;
Expand Down Expand Up @@ -1668,6 +1666,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
connOptions.nMaxFeeler = 1;
connOptions.nBestHeight = chainActive.Height();
connOptions.uiInterface = &uiInterface;
connOptions.m_msgproc = peerLogic.get();
connOptions.nSendBufferMaxSize = 1000*gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER);
connOptions.nReceiveFloodSize = 1000*gArgs.GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER);

Expand Down
20 changes: 9 additions & 11 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ std::string strSubVersion;

limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);

// Signals for message handling
static CNodeSignals g_signals;
CNodeSignals& GetNodeSignals() { return g_signals; }

void CConnman::AddOneShot(const std::string& strDest)
{
LOCK(cs_vOneShots);
Expand Down Expand Up @@ -1114,7 +1110,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true);
pnode->AddRef();
pnode->fWhitelisted = whitelisted;
GetNodeSignals().InitializeNode(pnode, this);
m_msgproc->InitializeNode(pnode, this);

LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString());

Expand Down Expand Up @@ -1966,7 +1962,7 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
if (manual_connection)
pnode->m_manual_connection = true;

GetNodeSignals().InitializeNode(pnode, this);
m_msgproc->InitializeNode(pnode, this);
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
Expand Down Expand Up @@ -1996,16 +1992,16 @@ void CConnman::ThreadMessageHandler()
continue;

// Receive messages
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, this, flagInterruptMsgProc);
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;

// Send messages
{
LOCK(pnode->cs_sendProcessing);
GetNodeSignals().SendMessages(pnode, this, flagInterruptMsgProc);
m_msgproc->SendMessages(pnode, this, flagInterruptMsgProc);
}

if (flagInterruptMsgProc)
return;
}
Expand Down Expand Up @@ -2324,6 +2320,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
//
// Start threads
//
assert(m_msgproc);
InterruptSocks5(false);
interruptNet.reset();
flagInterruptMsgProc = false;
Expand Down Expand Up @@ -2443,9 +2440,10 @@ void CConnman::DeleteNode(CNode* pnode)
{
assert(pnode);
bool fUpdateConnectionTime = false;
GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
if(fUpdateConnectionTime)
m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
if(fUpdateConnectionTime) {
addrman.Connected(pnode->addr);
}
delete pnode;
}

Expand Down
25 changes: 13 additions & 12 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <arpa/inet.h>
#endif

#include <boost/signals2/signal.hpp>

class CScheduler;
class CNode;
Expand Down Expand Up @@ -116,7 +115,7 @@ struct CSerializedNetMsg
std::string command;
};


class NetEventsInterface;
class CConnman
{
public:
Expand All @@ -138,6 +137,7 @@ class CConnman
int nMaxFeeler = 0;
int nBestHeight = 0;
CClientUIInterface* uiInterface = nullptr;
NetEventsInterface* m_msgproc = nullptr;
unsigned int nSendBufferMaxSize = 0;
unsigned int nReceiveFloodSize = 0;
uint64_t nMaxOutboundTimeframe = 0;
Expand All @@ -156,6 +156,7 @@ class CConnman
nMaxFeeler = connOptions.nMaxFeeler;
nBestHeight = connOptions.nBestHeight;
clientInterface = connOptions.uiInterface;
m_msgproc = connOptions.m_msgproc;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
Expand Down Expand Up @@ -396,6 +397,7 @@ class CConnman
int nMaxFeeler;
std::atomic<int> nBestHeight;
CClientUIInterface* clientInterface;
NetEventsInterface* m_msgproc;

/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;
Expand Down Expand Up @@ -436,19 +438,18 @@ struct CombinerAll
}
};

// Signals for message handling
struct CNodeSignals
/**
* Interface for message handling
*/
class NetEventsInterface
{
boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> ProcessMessages;
boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> SendMessages;
boost::signals2::signal<void (CNode*, CConnman*)> InitializeNode;
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
public:
virtual bool ProcessMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0;
virtual bool SendMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0;
virtual void InitializeNode(CNode* pnode, CConnman* connman) = 0;
virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0;
};


CNodeSignals& GetNodeSignals();


enum
{
LOCAL_NONE, // unknown
Expand Down
112 changes: 46 additions & 66 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ namespace {
std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration;
} // namespace

//////////////////////////////////////////////////////////////////////////////
//
// Registration of network node signals.
//

namespace {

struct CBlockReject {
Expand Down Expand Up @@ -265,49 +260,6 @@ void PushNodeVersion(CNode *pnode, CConnman* connman, int64_t nTime)
}
}

void InitializeNode(CNode *pnode, CConnman* connman) {
CAddress addr = pnode->addr;
std::string addrName = pnode->GetAddrName();
NodeId nodeid = pnode->GetId();
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
}
if(!pnode->fInbound)
PushNodeVersion(pnode, connman, GetTime());
}

void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
fUpdateConnectionTime = false;
LOCK(cs_main);
CNodeState *state = State(nodeid);

if (state->fSyncStarted)
nSyncStarted--;

if (state->nMisbehavior == 0 && state->fCurrentlyConnected) {
fUpdateConnectionTime = true;
}

for (const QueuedBlock& entry : state->vBlocksInFlight) {
mapBlocksInFlight.erase(entry.hash);
}
EraseOrphansFor(nodeid);
nPreferredDownload -= state->fPreferredDownload;
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
assert(nPeersWithValidatedDownloads >= 0);

mapNodeState.erase(nodeid);

if (mapNodeState.empty()) {
// Do a consistency check after the last peer is removed.
assert(mapBlocksInFlight.empty());
assert(nPreferredDownload == 0);
assert(nPeersWithValidatedDownloads == 0);
}
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
}

// Requires cs_main.
// Returns a bool indicating whether we requested this block.
// Also used if a block was /not/ received and timed out or started with another peer
Expand Down Expand Up @@ -543,6 +495,50 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<con

} // namespace

void PeerLogicValidation::InitializeNode(CNode *pnode, CConnman* connman) {
CAddress addr = pnode->addr;
std::string addrName = pnode->GetAddrName();
NodeId nodeid = pnode->GetId();
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
}
if(!pnode->fInbound)
PushNodeVersion(pnode, connman, GetTime());
}

void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
fUpdateConnectionTime = false;
LOCK(cs_main);
CNodeState *state = State(nodeid);
assert(state != nullptr);

if (state->fSyncStarted)
nSyncStarted--;

if (state->nMisbehavior == 0 && state->fCurrentlyConnected) {
fUpdateConnectionTime = true;
}

for (const QueuedBlock& entry : state->vBlocksInFlight) {
mapBlocksInFlight.erase(entry.hash);
}
EraseOrphansFor(nodeid);
nPreferredDownload -= state->fPreferredDownload;
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
assert(nPeersWithValidatedDownloads >= 0);

mapNodeState.erase(nodeid);

if (mapNodeState.empty()) {
// Do a consistency check after the last peer is removed.
assert(mapBlocksInFlight.empty());
assert(nPreferredDownload == 0);
assert(nPeersWithValidatedDownloads == 0);
}
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
}

bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
LOCK(cs_main);
CNodeState *state = State(nodeid);
Expand All @@ -558,22 +554,6 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
return true;
}

void RegisterNodeSignals(CNodeSignals& nodeSignals)
{
nodeSignals.ProcessMessages.connect(&ProcessMessages);
nodeSignals.SendMessages.connect(&SendMessages);
nodeSignals.InitializeNode.connect(&InitializeNode);
nodeSignals.FinalizeNode.connect(&FinalizeNode);
}

void UnregisterNodeSignals(CNodeSignals& nodeSignals)
{
nodeSignals.ProcessMessages.disconnect(&ProcessMessages);
nodeSignals.SendMessages.disconnect(&SendMessages);
nodeSignals.InitializeNode.disconnect(&InitializeNode);
nodeSignals.FinalizeNode.disconnect(&FinalizeNode);
}

//////////////////////////////////////////////////////////////////////////////
//
// mapOrphanTransactions
Expand Down Expand Up @@ -2672,7 +2652,7 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman)
return false;
}

bool ProcessMessages(CNode* pfrom, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
bool PeerLogicValidation::ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interruptMsgProc)
{
const CChainParams& chainparams = Params();
//
Expand Down Expand Up @@ -2809,7 +2789,7 @@ class CompareInvMempoolOrder
}
};

bool SendMessages(CNode* pto, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
bool PeerLogicValidation::SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interruptMsgProc)
{
const Consensus::Params& consensusParams = Params().GetConsensus();
{
Expand Down
36 changes: 17 additions & 19 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,32 @@ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100;
static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_BASE = 15 * 60 * 1000000; // 15 minutes
static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER = 1000; // 1ms/header

/** Register with a network node to receive its signals */
void RegisterNodeSignals(CNodeSignals& nodeSignals);
/** Unregister a network node */
void UnregisterNodeSignals(CNodeSignals& nodeSignals);

class PeerLogicValidation : public CValidationInterface {
class PeerLogicValidation : public CValidationInterface, public NetEventsInterface {
private:
CConnman* connman;

public:
PeerLogicValidation(CConnman* connmanIn);
explicit PeerLogicValidation(CConnman* connman);

void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void BlockChecked(const CBlock& block, const CValidationState& state) override;
void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override;


void InitializeNode(CNode* pnode, CConnman* connman) override;
void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override;
/** Process protocol messages received from a given node */
bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interrupt) override;
/**
* Send queued protocol messages to be sent to a give node.
*
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interrupt) override;
};

struct CNodeStateStats {
Expand All @@ -52,16 +62,4 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
/** Increase a node's misbehavior score. */
void Misbehaving(NodeId nodeid, int howmuch);

/** Process protocol messages received from a given node */
bool ProcessMessages(CNode* pfrom, CConnman* connman, const std::atomic<bool>& interrupt);
/**
* Send queued protocol messages to be sent to a give node.
*
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman* connman, const std::atomic<bool>& interrupt);

#endif // BITCOIN_NET_PROCESSING_H
Loading

0 comments on commit dc897e5

Please sign in to comment.