Skip to content

Commit f605400

Browse files
committed
refactor: reduce PeerManager usage in InstantSend worker functions
So far we cannot stop using `PeerManager` entirely the triggers are internal (i.e. not coming from P2P interactions) but we can extend and embrace MessageProcessingResult, to at least isolate its usage as best we can.
1 parent 5d9ca20 commit f605400

File tree

5 files changed

+51
-24
lines changed

5 files changed

+51
-24
lines changed

src/instantsend/instantsend.cpp

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ MessageProcessingResult CInstantSendManager::ProcessMessageInstantSendLock(const
172172
return ret;
173173
}
174174

175-
bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
175+
bool CInstantSendManager::ProcessPendingInstantSendLocks(std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
176176
{
177177
decltype(pendingInstantSendLocks) pend;
178178
bool fMoreWork{false};
@@ -217,7 +217,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
217217
auto dkgInterval = llmq_params.dkgInterval;
218218

219219
// First check against the current active set and don't ban
220-
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false);
220+
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, /*signOffset=*/0, /*ban=*/false, pend, peer_activity);
221221
if (!badISLocks.empty()) {
222222
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);
223223

@@ -230,16 +230,16 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
230230
}
231231
}
232232
// Now check against the previous active set and perform banning if this fails
233-
ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true);
233+
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, peer_activity);
234234
}
235235

236236
return fMoreWork;
237237
}
238238

239239
std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(
240-
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
240+
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
241241
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend,
242-
bool ban)
242+
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
243243
{
244244
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
245245
std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs;
@@ -315,7 +315,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
315315
for (const auto& nodeId : batchVerifier.badSources) {
316316
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
317317
// does not validate anymore due to changed quorums
318-
peerman.Misbehaving(nodeId, 20);
318+
peer_activity.push_back({nodeId, MisbehavingError{20}});
319319
}
320320
}
321321
for (const auto& p : pend) {
@@ -330,7 +330,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
330330
continue;
331331
}
332332

333-
ProcessInstantSendLock(nodeId, peerman, hash, islock);
333+
peer_activity.push_back({nodeId, ProcessInstantSendLock(nodeId, hash, islock)});
334334

335335
// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
336336
// double-verification of the sig.
@@ -348,8 +348,8 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
348348
return badISLocks;
349349
}
350350

351-
void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash,
352-
const instantsend::InstantSendLockPtr& islock)
351+
MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash,
352+
const instantsend::InstantSendLockPtr& islock)
353353
{
354354
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n",
355355
__func__, islock->txid.ToString(), hash.ToString(), from);
@@ -358,12 +358,12 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
358358
m_signer->ClearLockFromQueue(islock);
359359
}
360360
if (db.KnownInstantSendLock(hash)) {
361-
return;
361+
return {};
362362
}
363363

364364
if (const auto sameTxIsLock = db.GetInstantSendLockByTxid(islock->txid)) {
365365
// can happen, nothing to do
366-
return;
366+
return {};
367367
}
368368
for (const auto& in : islock->inputs) {
369369
const auto sameOutpointIsLock = db.GetInstantSendLockByInput(in);
@@ -386,7 +386,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
386386
if (pindexMined != nullptr && clhandler.HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) {
387387
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txlock=%s, islock=%s: dropping islock as it already got a ChainLock in block %s, peer=%d\n", __func__,
388388
islock->txid.ToString(), hash.ToString(), hashBlock.ToString(), from);
389-
return;
389+
return {};
390390
}
391391
}
392392

@@ -417,15 +417,17 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
417417
mempool.AddTransactionsUpdated(1);
418418
}
419419

420+
MessageProcessingResult ret{/*is_masternode*/m_signer != nullptr};
420421
CInv inv(MSG_ISDLOCK, hash);
421422
if (found_transaction) {
422-
peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
423+
ret.m_inv_filter = std::make_pair(inv, tx);
423424
} else {
424425
// we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce
425426
// with the TX taken into account.
426-
peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
427-
peerman.AskPeersForTransaction(islock->txid, /*is_masternode=*/m_signer != nullptr);
427+
ret.m_inv_filter = std::make_pair(inv, islock->txid);
428+
ret.m_request_tx = islock->txid;
428429
}
430+
return ret;
429431
}
430432

431433
void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
@@ -925,7 +927,11 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
925927
while (!workInterrupt) {
926928
bool fMoreWork = [&]() -> bool {
927929
if (!IsInstantSendEnabled()) return false;
928-
const bool more_work{ProcessPendingInstantSendLocks(peerman)};
930+
std::vector<std::pair<NodeId, MessageProcessingResult>> peer_activity{};
931+
const bool more_work{ProcessPendingInstantSendLocks(peer_activity)};
932+
for (const auto& [node_id, mpr] : peer_activity) {
933+
peerman.PostProcessMessage(std::move(mpr), node_id);
934+
}
929935
if (!m_signer) return more_work;
930936
// Construct set of non-locked transactions that are pending to retry
931937
std::vector<CTransactionRef> txns{};

src/instantsend/instantsend.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,15 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent
9898

9999
private:
100100
MessageProcessingResult ProcessMessageInstantSendLock(const NodeId& pfrom, const instantsend::InstantSendLockPtr& islock);
101-
bool ProcessPendingInstantSendLocks(PeerManager& peerman)
101+
bool ProcessPendingInstantSendLocks(std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
102102
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
103103

104104
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(
105-
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
106-
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
105+
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
106+
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend,
107+
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
107108
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
108-
void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const instantsend::InstantSendLockPtr& islock)
109+
MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash, const instantsend::InstantSendLockPtr& islock)
109110
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
110111

111112
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)

src/net_processing.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ class PeerManagerImpl final : public PeerManager
651651

652652
/** Helpers to process result of external handlers of message */
653653
void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
654-
void PostProcessMessage(MessageProcessingResult&& ret, NodeId node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
654+
void PostProcessMessage(const MessageProcessingResult&& ret, NodeId node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
655655

656656
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
657657
void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex);
@@ -3488,7 +3488,7 @@ void PeerManagerImpl::ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom)
34883488
if (!ret) Misbehaving(pfrom.GetId(), ret.error().score, ret.error().message);
34893489
}
34903490

3491-
void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& result, NodeId node)
3491+
void PeerManagerImpl::PostProcessMessage(const MessageProcessingResult&& result, NodeId node)
34923492
{
34933493
if (result.m_error) {
34943494
Misbehaving(node, result.m_error->score, result.m_error->message);
@@ -3502,6 +3502,17 @@ void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& result, NodeI
35023502
if (result.m_inventory) {
35033503
RelayInv(result.m_inventory.value());
35043504
}
3505+
if (result.m_inv_filter) {
3506+
const auto& [inv, filter] = result.m_inv_filter.value();
3507+
if (std::holds_alternative<CTransactionRef>(filter)) {
3508+
RelayInvFiltered(inv, *std::get<CTransactionRef>(filter), ISDLOCK_PROTO_VERSION);
3509+
} else {
3510+
RelayInvFiltered(inv, std::get<uint256>(filter), ISDLOCK_PROTO_VERSION);
3511+
}
3512+
}
3513+
if (result.m_request_tx) {
3514+
AskPeersForTransaction(result.m_request_tx.value(), result.m_is_masternode);
3515+
}
35053516
}
35063517

35073518
void PeerManagerImpl::ProcessMessage(

src/net_processing.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class PeerManager : public CValidationInterface, public NetEventsInterface
132132
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
133133

134134
/** Finish message processing. Used for some specific messages */
135-
virtual void PostProcessMessage(MessageProcessingResult&& ret, NodeId node = -1) = 0;
135+
virtual void PostProcessMessage(const MessageProcessingResult&& ret, NodeId node = -1) = 0;
136136

137137
/** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */
138138
virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0;

src/protocol.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <cstdint>
1818
#include <limits>
1919
#include <string>
20+
#include <variant>
2021

2122
/** Message header.
2223
* (4) message start.
@@ -597,19 +598,27 @@ using PeerMsgRet = tl::expected<void, MisbehavingError>;
597598
*/
598599
struct MessageProcessingResult
599600
{
601+
bool m_is_masternode{false};
602+
600603
//! @m_error triggers Misbehaving error with score and optional message if not nullopt
601604
std::optional<MisbehavingError> m_error;
602605

603606
//! @m_inventory will relay this inventory to connected peers if not nullopt
604607
std::optional<CInv> m_inventory;
605608

609+
//! @m_inv_filter will relay this inventory if filter matches to connected peers if not nullopt
610+
std::optional<std::pair<CInv, std::variant<CTransactionRef, uint256>>> m_inv_filter;
611+
612+
//! @m_request_tx will ask connected peers to relay transaction if not nullopt
613+
std::optional<uint256> m_request_tx;
614+
606615
//! @m_transactions will relay transactions to peers which is ready to accept it (some peers does not accept transactions)
607616
std::vector<uint256> m_transactions;
608617

609618
//! @m_to_erase triggers EraseObjectRequest from PeerManager for this inventory if not nullopt
610619
std::optional<CInv> m_to_erase;
611620

612-
MessageProcessingResult() = default;
621+
MessageProcessingResult(bool is_masternode = false) : m_is_masternode{is_masternode} {}
613622
MessageProcessingResult(MisbehavingError error) :
614623
m_error(error)
615624
{}

0 commit comments

Comments
 (0)