Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ BITCOIN_CORE_H = \
llmq/snapshot.h \
llmq/types.h \
llmq/utils.h \
llmq/observer/context.h \
logging.h \
logging/timer.h \
mapport.h \
Expand Down Expand Up @@ -553,6 +554,7 @@ libbitcoin_node_a_SOURCES = \
llmq/signing_shares.cpp \
llmq/snapshot.cpp \
llmq/utils.cpp \
llmq/observer/context.cpp \
mapport.cpp \
masternode/active/context.cpp \
masternode/active/notificationinterface.cpp \
Expand Down
6 changes: 0 additions & 6 deletions src/chainlock/chainlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ CChainLocksHandler::~CChainLocksHandler()

void CChainLocksHandler::Start(const llmq::CInstantSendManager& isman)
{
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
signer->Start();
}
scheduler->scheduleEvery(
[&]() {
auto signer = m_signer.load(std::memory_order_acquire);
Expand All @@ -83,9 +80,6 @@ void CChainLocksHandler::Start(const llmq::CInstantSendManager& isman)
void CChainLocksHandler::Stop()
{
scheduler->stop();
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
signer->Stop();
}
}

bool CChainLocksHandler::AlreadyHave(const CInv& inv) const
Expand Down
4 changes: 2 additions & 2 deletions src/chainlock/signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ ChainLockSigner::ChainLockSigner(CChainState& chainstate, ChainLockSignerParent&

ChainLockSigner::~ChainLockSigner() = default;

void ChainLockSigner::Start()
void ChainLockSigner::RegisterRecoveryInterface()
{
m_sigman.RegisterRecoveredSigsListener(this);
}

void ChainLockSigner::Stop()
void ChainLockSigner::UnregisterRecoveryInterface()
{
m_sigman.UnregisterRecoveredSigsListener(this);
}
Expand Down
4 changes: 2 additions & 2 deletions src/chainlock/signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class ChainLockSigner final : public llmq::CRecoveredSigsListener
llmq::CSigSharesManager& shareman, CSporkManager& sporkman, const CMasternodeSync& mn_sync);
~ChainLockSigner();

void Start();
void Stop();
void RegisterRecoveryInterface();
void UnregisterRecoveryInterface();

void EraseFromBlockHashTxidMap(const uint256& hash)
EXCLUSIVE_LOCKS_REQUIRED(!cs_signer);
Expand Down
1 change: 0 additions & 1 deletion src/dsnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con

m_llmq_ctx->isman->UpdatedBlockTip(pindexNew);
m_llmq_ctx->clhandler->UpdatedBlockTip(*m_llmq_ctx->isman);
m_llmq_ctx->qdkgsman->UpdatedBlockTip(pindexNew, fInitialDownload);
m_llmq_ctx->qman->UpdatedBlockTip(pindexNew, m_connman, fInitialDownload);

if (m_govman.IsValid()) {
Expand Down
80 changes: 55 additions & 25 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@
#include <instantsend/net_instantsend.h>
#include <llmq/context.h>
#include <llmq/dkgsessionmgr.h>
#include <llmq/options.h>
#include <llmq/net_signing.h>
#include <llmq/options.h>
#include <llmq/observer/context.h>
#include <masternode/active/context.h>
#include <masternode/active/notificationinterface.h>
#include <masternode/meta.h>
Expand Down Expand Up @@ -258,9 +259,6 @@ void Interrupt(NodeContext& node)
if (node.peerman) {
node.peerman->InterruptHandlers();
}
if (node.llmq_ctx) {
node.llmq_ctx->Interrupt();
}
InterruptMapPort();
if (node.connman)
node.connman->Interrupt();
Expand Down Expand Up @@ -350,6 +348,10 @@ void PrepareShutdown(NodeContext& node)
// CValidationInterface callbacks, flush them...
GetMainSignals().FlushBackgroundCallbacks();

if (node.observer_ctx) {
UnregisterValidationInterface(node.observer_ctx.get());
}

if (g_active_notification_interface) {
UnregisterValidationInterface(g_active_notification_interface.get());
g_active_notification_interface.reset();
Expand All @@ -366,6 +368,7 @@ void PrepareShutdown(NodeContext& node)

// After all scheduled tasks have been flushed, destroy pointers
// and reset all to nullptr.
node.observer_ctx.reset();
node.active_ctx.reset();
node.mn_sync.reset();
node.sporkman.reset();
Expand Down Expand Up @@ -1409,9 +1412,8 @@ bool AppInitParameterInteraction(const ArgsManager& args)
}

try {
const bool fRecoveryEnabled{llmq::QuorumDataRecoveryEnabled()};
const bool fQuorumVvecRequestsEnabled{llmq::GetEnabledQuorumVvecSyncEntries().size() > 0};
if (!fRecoveryEnabled && fQuorumVvecRequestsEnabled) {
const bool fQuorumVvecRequestsEnabled{llmq::GetEnabledQuorumVvecSyncEntries(args).size() > 0};
if (!args.GetBoolArg("-llmq-data-recovery", llmq::DEFAULT_ENABLE_QUORUM_DATA_RECOVERY) && fQuorumVvecRequestsEnabled) {
InitWarning(Untranslated("-llmq-qvvec-sync set but recovery is disabled due to -llmq-data-recovery=0"));
}
} catch (const std::invalid_argument& e) {
Expand Down Expand Up @@ -1956,6 +1958,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
fReindex = args.GetBoolArg("-reindex", false);
bool fReindexChainState = args.GetBoolArg("-reindex-chainstate", false);

const bool quorums_recovery = args.GetBoolArg("-llmq-data-recovery", llmq::DEFAULT_ENABLE_QUORUM_DATA_RECOVERY);
const bool quorums_watch = args.GetBoolArg("-watchquorums", llmq::DEFAULT_WATCH_QUORUMS);

// cache size calculations
CacheSizes cache_sizes = CalculateCacheSizes(args, g_enabled_filter_types.size());

Expand Down Expand Up @@ -2023,13 +2028,27 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
args.GetBoolArg("-spentindex", DEFAULT_SPENTINDEX),
args.GetBoolArg("-timestampindex", DEFAULT_TIMESTAMPINDEX),
chainparams.GetConsensus(),
llmq::GetEnabledQuorumVvecSyncEntries(args),
fReindexChainState,
cache_sizes.block_tree_db,
cache_sizes.coins_db,
cache_sizes.coins,
/*block_tree_db_in_memory=*/false,
/*coins_db_in_memory=*/false,
/*dash_dbs_in_memory=*/false,
quorums_recovery,
quorums_watch,
/*bls_threads=*/[&args]() -> int8_t {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
I would suggest a refactoring, that takes quorums_recovery, quorums_watch, bls_threads, macrecsigsage and move them to something like struct llmq::Options inside llmq/options.h

Bit amount of bool & int arguments makes code difficult to change and support

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be done as part of dash#7066 or subsequent follow-up.

int8_t threads = args.GetIntArg("-parbls", llmq::DEFAULT_BLSCHECK_THREADS);
if (threads <= 0) {
// -parbls=0 means autodetect (number of cores - 1 validator threads)
// -parbls=-n means "leave n cores free" (number of cores - n - 1 validator threads)
threads += GetNumCores();
}
// Subtract 1 because the main thread counts towards the par threads
return std::clamp<int8_t>(threads - 1, 0, llmq::MAX_BLSCHECK_THREADS);
}(),
args.GetIntArg("-maxrecsigsage", llmq::DEFAULT_MAX_RECOVERED_SIGS_AGE),
/*shutdown_requested=*/ShutdownRequested,
/*coins_error_cb=*/[]() {
uiInterface.ThreadSafeMessageBox(
Expand Down Expand Up @@ -2175,45 +2194,56 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
assert(!node.dstxman);
node.dstxman = std::make_unique<CDSTXManager>();

assert(!node.cj_walletman);
if (!node.mn_activeman) {
node.cj_walletman = CJWalletManager::make(chainman, *node.dmnman, *node.mn_metaman, *node.mempool, *node.mn_sync,
*node.llmq_ctx->isman, !ignores_incoming_txs);
}
if (node.cj_walletman) {
RegisterValidationInterface(node.cj_walletman.get());
}

assert(!node.peerman);
node.peerman = PeerManager::make(chainparams, *node.connman, *node.addrman, node.banman.get(), *node.dstxman,
chainman, *node.mempool, *node.mn_metaman, *node.mn_sync,
*node.govman, *node.sporkman, node.mn_activeman.get(), node.dmnman,
node.active_ctx, node.cj_walletman.get(), node.llmq_ctx, ignores_incoming_txs);
*node.govman, *node.sporkman, node.mn_activeman.get(), node.active_ctx, node.dmnman,
node.cj_walletman, node.llmq_ctx, node.observer_ctx, ignores_incoming_txs);
RegisterValidationInterface(node.peerman.get());

g_ds_notification_interface = std::make_unique<CDSNotificationInterface>(
*node.connman, *node.dstxman, *node.mn_sync, *node.govman, chainman, node.dmnman, node.llmq_ctx
);
RegisterValidationInterface(g_ds_notification_interface.get());

// ********************************************************* Step 7c: Setup masternode mode
// ********************************************************* Step 7c: Setup masternode mode or watch-only mode
assert(!node.active_ctx);
assert(!g_active_notification_interface);
assert(!node.observer_ctx);

node.peerman->AddExtraHandler(std::make_unique<NetInstantSend>(node.peerman.get(), *node.llmq_ctx->isman, *node.llmq_ctx->qman, chainman.ActiveChainstate()));
node.peerman->AddExtraHandler(std::make_unique<NetSigning>(node.peerman.get(), *node.llmq_ctx->sigman));
if (node.mn_activeman) {
std::unique_ptr<CCoinJoinServer> cj_server = std::make_unique<CCoinJoinServer>(node.peerman.get(), chainman, *node.connman, *node.dmnman, *node.dstxman, *node.mn_metaman, *node.mempool, *node.mn_activeman, *node.mn_sync, *node.llmq_ctx->isman);

node.active_ctx = std::make_unique<ActiveContext>(chainman, *node.connman, *node.dmnman, *node.govman,
const util::DbWrapperParams dash_db_params{.path = args.GetDataDirNet(), .memory = false, .wipe = (fReindex || fReindexChainState)};
if (node.mn_activeman) {
auto cj_server = std::make_unique<CCoinJoinServer>(node.peerman.get(), chainman, *node.connman, *node.dmnman, *node.dstxman, *node.mn_metaman,
*node.mempool, *node.mn_activeman, *node.mn_sync, *node.llmq_ctx->isman);
node.active_ctx = std::make_unique<ActiveContext>(*cj_server, *node.connman, *node.dmnman, *node.govman, chainman, *node.mn_metaman,
*node.mnhf_manager, *node.sporkman, *node.mempool, *node.llmq_ctx, *node.peerman,
*node.mn_activeman, *node.mn_sync, *cj_server);
*node.mn_activeman, *node.mn_sync, dash_db_params, quorums_watch);
node.peerman->AddExtraHandler(std::move(cj_server));
g_active_notification_interface = std::make_unique<ActiveNotificationInterface>(*node.active_ctx, *node.mn_activeman);
RegisterValidationInterface(g_active_notification_interface.get());
} else if (quorums_watch) {
node.observer_ctx = std::make_unique<llmq::ObserverContext>(*node.llmq_ctx->bls_worker, *node.dmnman, *node.mn_metaman, *node.llmq_ctx->dkg_debugman,
*node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, chainman,
*node.sporkman, dash_db_params);
RegisterValidationInterface(node.observer_ctx.get());
}

// ********************************************************* Step 7d: Setup other Dash services

assert(!node.cj_walletman);
if (!node.active_ctx) {
// Can return nullptr if built without wallet support, must check before use
node.cj_walletman = CJWalletManager::make(chainman, *node.dmnman, *node.mn_metaman, *node.mempool, *node.mn_sync,
*node.llmq_ctx->isman, !ignores_incoming_txs);
}

if (node.cj_walletman) {
RegisterValidationInterface(node.cj_walletman.get());
}

bool fLoadCacheFiles = !(fReindex || fReindexChainState) && (chainman.ActiveChain().Tip() != nullptr);

if (!node.netfulfilledman->LoadCache(fLoadCacheFiles)) {
Expand Down Expand Up @@ -2307,7 +2337,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

// ********************************************************* Step 10a: schedule Dash-specific tasks

node.llmq_ctx->Start(*node.peerman);
node.llmq_ctx->Start();
node.peerman->StartHandlers();
if (node.active_ctx) node.active_ctx->Start(*node.connman, *node.peerman);

Expand All @@ -2317,7 +2347,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
node.peerman->ScheduleHandlers(*node.scheduler);

if (node.mn_activeman) {
node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.llmq_ctx->qdkgsman)), std::chrono::hours{1});
node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.active_ctx->qdkgsman)), std::chrono::hours{1});
}

if (node.cj_walletman) {
Expand Down
2 changes: 1 addition & 1 deletion src/instantsend/instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent
}
void DisconnectSigner() { m_signer.store(nullptr, std::memory_order_release); }

instantsend::InstantSendSigner* Signer() const { return m_signer.load(); }
instantsend::InstantSendSigner* Signer() const { return m_signer.load(std::memory_order_acquire); }

private:
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
Expand Down
8 changes: 0 additions & 8 deletions src/instantsend/net_instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,10 @@ void NetInstantSend::Start()
}

workThread = std::thread(&util::TraceThread, "isman", [this] { WorkThreadMain(); });

if (auto signer = m_is_manager.Signer(); signer) {
signer->Start();
}
}

void NetInstantSend::Stop()
{
if (auto signer = m_is_manager.Signer(); signer) {
signer->Stop();
}

// make sure to call Interrupt() first
if (!workInterrupt) {
assert(false);
Expand Down
4 changes: 2 additions & 2 deletions src/instantsend/signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ InstantSendSigner::InstantSendSigner(CChainState& chainstate, llmq::CChainLocksH

InstantSendSigner::~InstantSendSigner() = default;

void InstantSendSigner::Start()
void InstantSendSigner::RegisterRecoveryInterface()
{
m_sigman.RegisterRecoveredSigsListener(this);
}

void InstantSendSigner::Stop()
void InstantSendSigner::UnregisterRecoveryInterface()
{
m_sigman.UnregisterRecoveredSigsListener(this);
}
Expand Down
4 changes: 2 additions & 2 deletions src/instantsend/signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class InstantSendSigner final : public llmq::CRecoveredSigsListener
CTxMemPool& mempool, const CMasternodeSync& mn_sync);
~InstantSendSigner();

void Start();
void Stop();
void RegisterRecoveryInterface();
void UnregisterRecoveryInterface();

void ClearInputsFromQueue(const Uint256HashSet& ids) EXCLUSIVE_LOCKS_REQUIRED(!cs_input_requests);

Expand Down
15 changes: 1 addition & 14 deletions src/llmq/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,13 @@ static const std::string DB_MINED_COMMITMENT_BY_INVERSED_HEIGHT_Q_INDEXED = "q_m
static const std::string DB_BEST_BLOCK_UPGRADE = "q_bbu2";

CQuorumBlockProcessor::CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb,
CQuorumSnapshotManager& qsnapman) :
CQuorumSnapshotManager& qsnapman, int8_t bls_threads) :
m_chainstate{chainstate},
m_dmnman{dmnman},
m_evoDb{evoDb},
m_qsnapman{qsnapman}
{
utils::InitQuorumsCache(mapHasMinedCommitmentCache);

int bls_threads = gArgs.GetIntArg("-parbls", DEFAULT_BLSCHECK_THREADS);
if (bls_threads <= 0) {
// -parbls=0 means autodetect (number of cores - 1 validator threads)
// -parbls=-n means "leave n cores free" (number of cores - n - 1 validator threads)
bls_threads += GetNumCores();
}
// Subtract 1 because the main thread counts towards the par threads
bls_threads = std::max(bls_threads - 1, 0);

// Number of script-checking threads <= MAX_BLSCHECK_THREADS
bls_threads = std::min(bls_threads, MAX_BLSCHECK_THREADS);

LogPrintf("BLS verification uses %d additional threads\n", bls_threads);
m_bls_queue.StartWorkerThreads(bls_threads);
}
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/blockprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class CQuorumBlockProcessor
CQuorumBlockProcessor(const CQuorumBlockProcessor&) = delete;
CQuorumBlockProcessor& operator=(const CQuorumBlockProcessor&) = delete;
explicit CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb,
CQuorumSnapshotManager& qsnapman);
CQuorumSnapshotManager& qsnapman, int8_t bls_threads);
~CQuorumBlockProcessor();

[[nodiscard]] MessageProcessingResult ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv)
Expand Down
Loading
Loading