12
12
#include " txmempool.h"
13
13
#include " masternode-sync.h"
14
14
#include " net_processing.h"
15
- #include " scheduler.h"
16
15
#include " spork.h"
17
16
#include " validation.h"
18
17
24
23
#include " instantx.h"
25
24
26
25
#include < boost/algorithm/string/replace.hpp>
26
+ #include < boost/thread.hpp>
27
27
28
28
namespace llmq
29
29
{
@@ -208,24 +208,45 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& o
208
208
209
209
// //////////////
210
210
211
- CInstantSendManager::CInstantSendManager (CScheduler* _scheduler, CDBWrapper& _llmqDb) :
212
- scheduler (_scheduler),
211
+ CInstantSendManager::CInstantSendManager (CDBWrapper& _llmqDb) :
213
212
db (_llmqDb)
214
213
{
214
+ workInterrupt.reset ();
215
215
}
216
216
217
217
CInstantSendManager::~CInstantSendManager ()
218
218
{
219
219
}
220
220
221
- void CInstantSendManager::RegisterAsRecoveredSigsListener ()
221
+ void CInstantSendManager::Start ()
222
222
{
223
+ // can't start new thread if we have one running already
224
+ if (workThread.joinable ()) {
225
+ assert (false );
226
+ }
227
+
228
+ workThread = std::thread (&TraceThread<std::function<void ()> >, " instantsend" , std::function<void ()>(std::bind (&CInstantSendManager::WorkThreadMain, this )));
229
+
223
230
quorumSigningManager->RegisterRecoveredSigsListener (this );
224
231
}
225
232
226
- void CInstantSendManager::UnregisterAsRecoveredSigsListener ()
233
+ void CInstantSendManager::Stop ()
227
234
{
228
235
quorumSigningManager->UnregisterRecoveredSigsListener (this );
236
+
237
+ // make sure to call InterruptWorkerThread() first
238
+ if (!workInterrupt) {
239
+ assert (false );
240
+ }
241
+
242
+ if (workThread.joinable ()) {
243
+ workThread.join ();
244
+ }
245
+ }
246
+
247
+ void CInstantSendManager::InterruptWorkerThread ()
248
+ {
249
+ workInterrupt ();
229
250
}
230
251
231
252
bool CInstantSendManager::ProcessTx (const CTransaction& tx, const Consensus::Params& params)
@@ -552,13 +573,6 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq
552
573
islock.txid .ToString (), hash.ToString (), pfrom->id );
553
574
554
575
pendingInstantSendLocks.emplace (hash, std::make_pair (pfrom->id , std::move (islock)));
555
-
556
- if (!hasScheduledProcessPending) {
557
- hasScheduledProcessPending = true ;
558
- scheduler->scheduleFromNow ([&] {
559
- ProcessPendingInstantSendLocks ();
560
- }, 100 );
561
- }
562
576
}
563
577
564
578
bool CInstantSendManager::PreVerifyInstantSendLock (NodeId nodeId, const llmq::CInstantSendLock& islock, bool & retBan)
@@ -581,20 +595,23 @@ bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CI
581
595
return true ;
582
596
}
583
597
584
- void CInstantSendManager::ProcessPendingInstantSendLocks ()
598
+ bool CInstantSendManager::ProcessPendingInstantSendLocks ()
585
599
{
586
600
auto llmqType = Params ().GetConsensus ().llmqForInstantSend ;
587
601
588
602
decltype (pendingInstantSendLocks) pend;
589
603
590
604
{
591
605
LOCK (cs);
592
- hasScheduledProcessPending = false ;
593
606
pend = std::move (pendingInstantSendLocks);
594
607
}
595
608
609
+ if (pend.empty ()) {
610
+ return false ;
611
+ }
612
+
596
613
if (!IsNewInstantSendEnabled ()) {
597
- return ;
614
+ return false ;
598
615
}
599
616
600
617
int tipHeight;
@@ -621,7 +638,7 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
621
638
auto quorum = quorumSigningManager->SelectQuorumForSigning (llmqType, tipHeight, id);
622
639
if (!quorum) {
623
640
// should not happen, but if one fails to select, all others will also fail to select
624
- return ;
641
+ return false ;
625
642
}
626
643
uint256 signHash = CLLMQUtils::BuildSignHash (llmqType, quorum->qc .quorumHash , id, islock.txid );
627
644
batchVerifier.PushMessage (nodeId, hash, signHash, islock.sig , quorum->qc .quorumPublicKey );
@@ -679,6 +696,8 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
679
696
}
680
697
}
681
698
}
699
+
700
+ return true ;
682
701
}
683
702
684
703
void CInstantSendManager::ProcessInstantSendLock (NodeId from, const uint256& hash, const CInstantSendLock& islock)
@@ -1052,6 +1071,21 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC
1052
1071
return false ;
1053
1072
}
1054
1073
1074
+ void CInstantSendManager::WorkThreadMain ()
1075
+ {
1076
+ while (!workInterrupt) {
1077
+ bool didWork = false ;
1078
+
1079
+ didWork |= ProcessPendingInstantSendLocks ();
1080
+
1081
+ if (!didWork) {
1082
+ if (!workInterrupt.sleep_for (std::chrono::milliseconds (100 ))) {
1083
+ return ;
1084
+ }
1085
+ }
1086
+ }
1087
+ }
1088
+
1055
1089
bool IsOldInstantSendEnabled ()
1056
1090
{
1057
1091
return sporkManager.IsSporkActive (SPORK_2_INSTANTSEND_ENABLED) && !sporkManager.IsSporkActive (SPORK_20_INSTANTSEND_LLMQ_BASED);
0 commit comments