Skip to content

Commit 90b1b71

Browse files
codablockUdjinM6
authored andcommitted
Move processing of InstantSend locks into its own worker thread (#2857)
* Let ProcessPendingInstantSendLocks return true when it did some work * Introduce own worker thread for CInstantSendManager Instead of using the scheduler. * Remove scheduler from CInstantSendManager * Add missing reset() call for workInterrupt
1 parent ae78360 commit 90b1b71

File tree

3 files changed

+66
-27
lines changed

3 files changed

+66
-27
lines changed

src/llmq/quorums_init.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests, bool f
3636
quorumSigSharesManager = new CSigSharesManager();
3737
quorumSigningManager = new CSigningManager(*llmqDb, unitTests);
3838
chainLocksHandler = new CChainLocksHandler(scheduler);
39-
quorumInstantSendManager = new CInstantSendManager(scheduler, *llmqDb);
39+
quorumInstantSendManager = new CInstantSendManager(*llmqDb);
4040
}
4141

4242
void DestroyLLMQSystem()
@@ -84,14 +84,14 @@ void StartLLMQSystem()
8484
chainLocksHandler->Start();
8585
}
8686
if (quorumInstantSendManager) {
87-
quorumInstantSendManager->RegisterAsRecoveredSigsListener();
87+
quorumInstantSendManager->Start();
8888
}
8989
}
9090

9191
void StopLLMQSystem()
9292
{
9393
if (quorumInstantSendManager) {
94-
quorumInstantSendManager->UnregisterAsRecoveredSigsListener();
94+
quorumInstantSendManager->Stop();
9595
}
9696
if (chainLocksHandler) {
9797
chainLocksHandler->Stop();
@@ -113,6 +113,9 @@ void InterruptLLMQSystem()
113113
if (quorumSigSharesManager) {
114114
quorumSigSharesManager->InterruptWorkerThread();
115115
}
116+
if (quorumInstantSendManager) {
117+
quorumInstantSendManager->InterruptWorkerThread();
118+
}
116119
}
117120

118121
}

src/llmq/quorums_instantsend.cpp

+50-16
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#include "txmempool.h"
1313
#include "masternode-sync.h"
1414
#include "net_processing.h"
15-
#include "scheduler.h"
1615
#include "spork.h"
1716
#include "validation.h"
1817

@@ -24,6 +23,7 @@
2423
#include "instantx.h"
2524

2625
#include <boost/algorithm/string/replace.hpp>
26+
#include <boost/thread.hpp>
2727

2828
namespace llmq
2929
{
@@ -208,24 +208,45 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& o
208208

209209
////////////////
210210

211-
CInstantSendManager::CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb) :
212-
scheduler(_scheduler),
211+
CInstantSendManager::CInstantSendManager(CDBWrapper& _llmqDb) :
213212
db(_llmqDb)
214213
{
214+
workInterrupt.reset();
215215
}
216216

217217
CInstantSendManager::~CInstantSendManager()
218218
{
219219
}
220220

221-
void CInstantSendManager::RegisterAsRecoveredSigsListener()
221+
void CInstantSendManager::Start()
222222
{
223+
// can't start new thread if we have one running already
224+
if (workThread.joinable()) {
225+
assert(false);
226+
}
227+
228+
workThread = std::thread(&TraceThread<std::function<void()> >, "instantsend", std::function<void()>(std::bind(&CInstantSendManager::WorkThreadMain, this)));
229+
223230
quorumSigningManager->RegisterRecoveredSigsListener(this);
224231
}
225232

226-
void CInstantSendManager::UnregisterAsRecoveredSigsListener()
233+
void CInstantSendManager::Stop()
227234
{
228235
quorumSigningManager->UnregisterRecoveredSigsListener(this);
236+
237+
// make sure to call InterruptWorkerThread() first
238+
if (!workInterrupt) {
239+
assert(false);
240+
}
241+
242+
if (workThread.joinable()) {
243+
workThread.join();
244+
}
245+
}
246+
247+
void CInstantSendManager::InterruptWorkerThread()
248+
{
249+
workInterrupt();
229250
}
230251

231252
bool CInstantSendManager::ProcessTx(const CTransaction& tx, const Consensus::Params& params)
@@ -552,13 +573,6 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq
552573
islock.txid.ToString(), hash.ToString(), pfrom->id);
553574

554575
pendingInstantSendLocks.emplace(hash, std::make_pair(pfrom->id, std::move(islock)));
555-
556-
if (!hasScheduledProcessPending) {
557-
hasScheduledProcessPending = true;
558-
scheduler->scheduleFromNow([&] {
559-
ProcessPendingInstantSendLocks();
560-
}, 100);
561-
}
562576
}
563577

564578
bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CInstantSendLock& islock, bool& retBan)
@@ -581,20 +595,23 @@ bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CI
581595
return true;
582596
}
583597

584-
void CInstantSendManager::ProcessPendingInstantSendLocks()
598+
bool CInstantSendManager::ProcessPendingInstantSendLocks()
585599
{
586600
auto llmqType = Params().GetConsensus().llmqForInstantSend;
587601

588602
decltype(pendingInstantSendLocks) pend;
589603

590604
{
591605
LOCK(cs);
592-
hasScheduledProcessPending = false;
593606
pend = std::move(pendingInstantSendLocks);
594607
}
595608

609+
if (pend.empty()) {
610+
return false;
611+
}
612+
596613
if (!IsNewInstantSendEnabled()) {
597-
return;
614+
return false;
598615
}
599616

600617
int tipHeight;
@@ -621,7 +638,7 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
621638
auto quorum = quorumSigningManager->SelectQuorumForSigning(llmqType, tipHeight, id);
622639
if (!quorum) {
623640
// should not happen, but if one fails to select, all others will also fail to select
624-
return;
641+
return false;
625642
}
626643
uint256 signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->qc.quorumHash, id, islock.txid);
627644
batchVerifier.PushMessage(nodeId, hash, signHash, islock.sig, quorum->qc.quorumPublicKey);
@@ -679,6 +696,8 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
679696
}
680697
}
681698
}
699+
700+
return true;
682701
}
683702

684703
void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock)
@@ -1052,6 +1071,21 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC
10521071
return false;
10531072
}
10541073

1074+
void CInstantSendManager::WorkThreadMain()
1075+
{
1076+
while (!workInterrupt) {
1077+
bool didWork = false;
1078+
1079+
didWork |= ProcessPendingInstantSendLocks();
1080+
1081+
if (!didWork) {
1082+
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
1083+
return;
1084+
}
1085+
}
1086+
}
1087+
}
1088+
10551089
bool IsOldInstantSendEnabled()
10561090
{
10571091
return sporkManager.IsSporkActive(SPORK_2_INSTANTSEND_ENABLED) && !sporkManager.IsSporkActive(SPORK_20_INSTANTSEND_LLMQ_BASED);

src/llmq/quorums_instantsend.h

+10-8
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
#include <unordered_map>
1515
#include <unordered_set>
1616

17-
class CScheduler;
18-
1917
namespace llmq
2018
{
2119

@@ -72,9 +70,11 @@ class CInstantSendManager : public CRecoveredSigsListener
7270
{
7371
private:
7472
CCriticalSection cs;
75-
CScheduler* scheduler;
7673
CInstantSendDb db;
7774

75+
std::thread workThread;
76+
CThreadInterrupt workInterrupt;
77+
7878
/**
7979
* Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an
8080
* in-progress input lock.
@@ -92,14 +92,14 @@ class CInstantSendManager : public CRecoveredSigsListener
9292

9393
// Incoming and not verified yet
9494
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
95-
bool hasScheduledProcessPending{false};
9695

9796
public:
98-
CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb);
97+
CInstantSendManager(CDBWrapper& _llmqDb);
9998
~CInstantSendManager();
10099

101-
void RegisterAsRecoveredSigsListener();
102-
void UnregisterAsRecoveredSigsListener();
100+
void Start();
101+
void Stop();
102+
void InterruptWorkerThread();
103103

104104
public:
105105
bool ProcessTx(const CTransaction& tx, const Consensus::Params& params);
@@ -118,7 +118,7 @@ class CInstantSendManager : public CRecoveredSigsListener
118118
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
119119
void ProcessMessageInstantSendLock(CNode* pfrom, const CInstantSendLock& islock, CConnman& connman);
120120
bool PreVerifyInstantSendLock(NodeId nodeId, const CInstantSendLock& islock, bool& retBan);
121-
void ProcessPendingInstantSendLocks();
121+
bool ProcessPendingInstantSendLocks();
122122
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock);
123123
void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx);
124124

@@ -133,6 +133,8 @@ class CInstantSendManager : public CRecoveredSigsListener
133133

134134
bool AlreadyHave(const CInv& inv);
135135
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);
136+
137+
void WorkThreadMain();
136138
};
137139

138140
extern CInstantSendManager* quorumInstantSendManager;

0 commit comments

Comments
 (0)