Skip to content

Commit c93ade5

Browse files
kwvgknst
andcommitted
refactor: reduce PeerManager usage in InstantSend worker functions
So far we cannot stop using `PeerManager` entirely the triggers are internal (i.e. not coming from P2P interactions) but we can extend and embrace MessageProcessingResult, to at least isolate its usage as best we can. Co-Authored-By: Konstantin Akimov <knstqq@gmail.com>
1 parent 9b2c916 commit c93ade5

File tree

4 files changed

+59
-25
lines changed

4 files changed

+59
-25
lines changed

src/instantsend/instantsend.cpp

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,13 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st
169169
return ret;
170170
}
171171

172-
bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
172+
instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks()
173173
{
174174
decltype(pendingInstantSendLocks) pend;
175-
bool fMoreWork{false};
175+
instantsend::PendingState ret;
176176

177177
if (!IsInstantSendEnabled()) {
178-
return false;
178+
return ret;
179179
}
180180

181181
{
@@ -190,7 +190,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
190190
for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) {
191191
// Check if we've reached max count
192192
if (pend.size() >= maxCount) {
193-
fMoreWork = true;
193+
ret.m_pending_work = true;
194194
break;
195195
}
196196
pend.emplace(islockHash, std::move(nodeid_islptr_pair));
@@ -203,7 +203,8 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
203203
}
204204

205205
if (pend.empty()) {
206-
return false;
206+
ret.m_pending_work = false;
207+
return ret;
207208
}
208209

209210
// TODO Investigate if leaving this is ok
@@ -214,7 +215,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
214215
auto dkgInterval = llmq_params.dkgInterval;
215216

216217
// First check against the current active set and don't ban
217-
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false);
218+
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, /*signOffset=*/0, /*ban=*/false, pend, ret.m_peer_activity);
218219
if (!badISLocks.empty()) {
219220
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);
220221

@@ -227,16 +228,16 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
227228
}
228229
}
229230
// Now check against the previous active set and perform banning if this fails
230-
ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true);
231+
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, ret.m_peer_activity);
231232
}
232233

233-
return fMoreWork;
234+
return ret;
234235
}
235236

236237
std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(
237-
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
238+
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
238239
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend,
239-
bool ban)
240+
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
240241
{
241242
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
242243
std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs;
@@ -312,7 +313,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
312313
for (const auto& nodeId : batchVerifier.badSources) {
313314
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
314315
// does not validate anymore due to changed quorums
315-
peerman.Misbehaving(nodeId, 20);
316+
peer_activity.emplace_back(nodeId, MisbehavingError{20});
316317
}
317318
}
318319
for (const auto& p : pend) {
@@ -327,7 +328,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
327328
continue;
328329
}
329330

330-
ProcessInstantSendLock(nodeId, peerman, hash, islock);
331+
peer_activity.emplace_back(nodeId, ProcessInstantSendLock(nodeId, hash, islock));
331332

332333
// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
333334
// double-verification of the sig.
@@ -345,8 +346,8 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
345346
return badISLocks;
346347
}
347348

348-
void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash,
349-
const instantsend::InstantSendLockPtr& islock)
349+
MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash,
350+
const instantsend::InstantSendLockPtr& islock)
350351
{
351352
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n",
352353
__func__, islock->txid.ToString(), hash.ToString(), from);
@@ -355,12 +356,12 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
355356
m_signer->ClearLockFromQueue(islock);
356357
}
357358
if (db.KnownInstantSendLock(hash)) {
358-
return;
359+
return {};
359360
}
360361

361362
if (const auto sameTxIsLock = db.GetInstantSendLockByTxid(islock->txid)) {
362363
// can happen, nothing to do
363-
return;
364+
return {};
364365
}
365366
for (const auto& in : islock->inputs) {
366367
const auto sameOutpointIsLock = db.GetInstantSendLockByInput(in);
@@ -383,7 +384,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
383384
if (pindexMined != nullptr && clhandler.HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) {
384385
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txlock=%s, islock=%s: dropping islock as it already got a ChainLock in block %s, peer=%d\n", __func__,
385386
islock->txid.ToString(), hash.ToString(), hashBlock.ToString(), from);
386-
return;
387+
return {};
387388
}
388389
}
389390

@@ -414,15 +415,17 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
414415
mempool.AddTransactionsUpdated(1);
415416
}
416417

418+
MessageProcessingResult ret{};
417419
CInv inv(MSG_ISDLOCK, hash);
418420
if (found_transaction) {
419-
peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
421+
ret.m_inv_filter = std::make_pair(inv, tx);
420422
} else {
421423
// we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce
422424
// with the TX taken into account.
423-
peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
424-
peerman.AskPeersForTransaction(islock->txid);
425+
ret.m_inv_filter = std::make_pair(inv, islock->txid);
426+
ret.m_request_tx = islock->txid;
425427
}
428+
return ret;
426429
}
427430

428431
void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
@@ -922,7 +925,10 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
922925
while (!workInterrupt) {
923926
bool fMoreWork = [&]() -> bool {
924927
if (!IsInstantSendEnabled()) return false;
925-
const bool more_work{ProcessPendingInstantSendLocks(peerman)};
928+
auto [more_work, peer_activity] = ProcessPendingInstantSendLocks();
929+
for (auto& [node_id, mpr] : peer_activity) {
930+
peerman.PostProcessMessage(std::move(mpr), node_id);
931+
}
926932
if (!m_signer) return more_work;
927933
// Construct set of non-locked transactions that are pending to retry
928934
std::vector<CTransactionRef> txns{};

src/instantsend/instantsend.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@ class PeerManager;
3131
namespace Consensus {
3232
struct LLMQParams;
3333
} // namespace Consensus
34+
3435
namespace instantsend {
3536
class InstantSendSigner;
37+
38+
struct PendingState {
39+
bool m_pending_work{false};
40+
std::vector<std::pair<NodeId, MessageProcessingResult>> m_peer_activity{};
41+
};
3642
} // namespace instantsend
3743

3844
namespace llmq {
@@ -95,14 +101,16 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent
95101
void InterruptWorkerThread() { workInterrupt(); };
96102

97103
private:
98-
bool ProcessPendingInstantSendLocks(PeerManager& peerman)
104+
instantsend::PendingState ProcessPendingInstantSendLocks()
99105
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
100106

101107
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(
102-
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
103-
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
108+
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
109+
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend,
110+
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
104111
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
105-
void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const instantsend::InstantSendLockPtr& islock)
112+
MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash,
113+
const instantsend::InstantSendLockPtr& islock)
106114
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
107115

108116
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)

src/net_processing.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3492,6 +3492,19 @@ void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& result, NodeI
34923492
for (const auto& inv : result.m_inventory) {
34933493
RelayInv(inv);
34943494
}
3495+
if (result.m_inv_filter) {
3496+
const auto& [inv, filter] = result.m_inv_filter.value();
3497+
if (std::holds_alternative<CTransactionRef>(filter)) {
3498+
RelayInvFiltered(inv, *std::get<CTransactionRef>(filter), ISDLOCK_PROTO_VERSION);
3499+
} else if (std::holds_alternative<uint256>(filter)) {
3500+
RelayInvFiltered(inv, std::get<uint256>(filter), ISDLOCK_PROTO_VERSION);
3501+
} else {
3502+
assert(false);
3503+
}
3504+
}
3505+
if (result.m_request_tx) {
3506+
AskPeersForTransaction(result.m_request_tx.value());
3507+
}
34953508
}
34963509

34973510
void PeerManagerImpl::ProcessMessage(

src/protocol.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <cstdint>
1717
#include <limits>
1818
#include <string>
19+
#include <variant>
1920

2021
/** Message header.
2122
* (4) message start.
@@ -599,6 +600,12 @@ struct MessageProcessingResult
599600
//! @m_inventory will relay these inventories to connected peers
600601
std::vector<CInv> m_inventory;
601602

603+
//! @m_inv_filter will relay this inventory if filter matches to connected peers if not nullopt
604+
std::optional<std::pair<CInv, std::variant<CTransactionRef, uint256>>> m_inv_filter;
605+
606+
//! @m_request_tx will ask connected peers to relay transaction if not nullopt
607+
std::optional<uint256> m_request_tx;
608+
602609
//! @m_transactions will relay transactions to peers which is ready to accept it (some peers does not accept transactions)
603610
std::vector<uint256> m_transactions;
604611

0 commit comments

Comments
 (0)