Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 108 additions & 43 deletions src/llmq/signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,45 @@ CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate,

CSigSharesManager::~CSigSharesManager() = default;

void CSigSharesManager::StartWorkerThread()
void CSigSharesManager::Start()
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
// can't start if threads are already running
if (housekeepingThread.joinable() || dispatcherThread.joinable()) {
assert(false);
}

workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
// Initialize worker pool
int workerCount = std::clamp(static_cast<int>(std::thread::hardware_concurrency() / 2), 1, 4);
workerPool.resize(workerCount);
RenameThreadPool(workerPool, "sigsh-work");

// Start housekeeping thread
housekeepingThread = std::thread(&util::TraceThread, "sigsh-maint",
[this] { HousekeepingThreadMain(); });

// Start dispatcher thread
dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispat",
[this] { WorkDispatcherThreadMain(); });
}

void CSigSharesManager::StopWorkerThread()
void CSigSharesManager::Stop()
{
// make sure to call InterruptWorkerThread() first
if (!workInterrupt) {
assert(false);
}

if (workThread.joinable()) {
workThread.join();
// Join threads FIRST to stop any pending push() calls
if (housekeepingThread.joinable()) {
housekeepingThread.join();
}
if (dispatcherThread.joinable()) {
dispatcherThread.join();
}

// Then stop worker pool (now safe, no more push() calls)
workerPool.clear_queue();
workerPool.stop(true);
}

void CSigSharesManager::RegisterAsRecoveredSigsListener()
Expand Down Expand Up @@ -1611,60 +1630,106 @@ void CSigSharesManager::BanNode(NodeId nodeId)
nodeState.banned = true;
}

void CSigSharesManager::WorkThreadMain()
void CSigSharesManager::HousekeepingThreadMain()
{
int64_t lastSendTime = 0;

while (!workInterrupt) {
RemoveBannedNodeStates();
SendMessages();
Cleanup();

bool fMoreWork = ProcessPendingSigShares();
SignPendingSigShares();
workInterrupt.sleep_for(std::chrono::milliseconds(100));
}
}

if (TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now()) - lastSendTime > 100) {
SendMessages();
lastSendTime = TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now());
}
void CSigSharesManager::WorkDispatcherThreadMain()
{
while (!workInterrupt) {
// Dispatch all pending signs (individual tasks)
DispatchPendingSigns();

Cleanup();
// If there's processing work, spawn a helper worker
DispatchPendingProcessing();

// TODO Wakeup when pending signing is needed?
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
// Always sleep briefly between checks
workInterrupt.sleep_for(std::chrono::milliseconds(10));
}
}

void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash)
void CSigSharesManager::DispatchPendingSigns()
{
LOCK(cs_pendingSigns);
pendingSigns.emplace_back(std::move(quorum), id, msgHash);
// Swap out entire vector to avoid lock thrashing
std::vector<PendingSignatureData> signs;
{
LOCK(cs_pendingSigns);
signs.swap(pendingSigns);
}

// Dispatch all signs to worker pool
for (auto& work : signs) {
if (workInterrupt) break;

workerPool.push([this, work = std::move(work)](int) {
SignAndProcessSingleShare(std::move(work));
});
}
}

void CSigSharesManager::SignPendingSigShares()
void CSigSharesManager::DispatchPendingProcessing()
{
std::vector<PendingSignatureData> v;
WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns));

for (const auto& [pQuorum, id, msgHash] : v) {
auto opt_sigShare = CreateSigShare(*pQuorum, id, msgHash);

if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
auto& sigShare = *opt_sigShare;
ProcessSigShare(sigShare, pQuorum);

if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) {
LOCK(cs);
auto& session = signedSessions[sigShare.GetSignHash()];
session.sigShare = std::move(sigShare);
session.quorum = pQuorum;
session.nextAttemptTime = 0;
session.attempt = 0;
}
// Check if there's work, spawn a helper if so
bool hasWork = false;
{
LOCK(cs);
hasWork = std::any_of(nodeStates.begin(), nodeStates.end(),
[](const auto& entry) {
return !entry.second.pendingIncomingSigShares.Empty();
});
}

if (hasWork) {
// Work exists - spawn a worker to help!
workerPool.push([this](int) {
ProcessPendingSigSharesLoop();
});
}
}

void CSigSharesManager::ProcessPendingSigSharesLoop()
{
while (!workInterrupt) {
bool moreWork = ProcessPendingSigShares();

if (!moreWork) {
return; // No work found, exit immediately
}
}
Comment on lines +1699 to +1705
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simplify a bit?

Suggested change
while (!workInterrupt) {
bool moreWork = ProcessPendingSigShares();
if (!moreWork) {
return; // No work found, exit immediately
}
}
bool moreWork = true;
while (!workInterrupt && moreWork) {
moreWork = ProcessPendingSigShares();
}

}

void CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work)
{
auto opt_sigShare = CreateSigShare(*work.quorum, work.id, work.msgHash);

if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
auto& sigShare = *opt_sigShare;
ProcessSigShare(sigShare, work.quorum);

if (IsAllMembersConnectedEnabled(work.quorum->params.type, m_sporkman)) {
LOCK(cs);
auto& session = signedSessions[sigShare.GetSignHash()];
session.sigShare = std::move(sigShare);
session.quorum = work.quorum;
session.nextAttemptTime = 0;
session.attempt = 0;
}
}
}

void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash)
{
LOCK(cs_pendingSigns);
pendingSigns.emplace_back(std::move(quorum), id, msgHash);
}

std::optional<CSigShare> CSigSharesManager::CreateSigShareForSingleMember(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const
{
cxxtimer::Timer t(true);
Expand Down
25 changes: 19 additions & 6 deletions src/llmq/signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define BITCOIN_LLMQ_SIGNING_SHARES_H

#include <bls/bls.h>
#include <ctpl_stl.h>
#include <evo/types.h>
#include <llmq/signhash.h>
#include <llmq/signing.h>
Expand Down Expand Up @@ -361,7 +362,7 @@ class CSignedSession
int attempt{0};
};

class CSigSharesManager : public CRecoveredSigsListener
class CSigSharesManager : public llmq::CRecoveredSigsListener
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems unrelated

{
private:
static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60};
Expand All @@ -380,7 +381,9 @@ class CSigSharesManager : public CRecoveredSigsListener

Mutex cs;

std::thread workThread;
mutable ctpl::thread_pool workerPool;
std::thread housekeepingThread;
std::thread dispatcherThread;
CThreadInterrupt workInterrupt;

SigShareMap<CSigShare> sigShares GUARDED_BY(cs);
Expand Down Expand Up @@ -426,8 +429,8 @@ class CSigSharesManager : public CRecoveredSigsListener
const CQuorumManager& _qman, const CSporkManager& sporkman);
~CSigSharesManager() override;

void StartWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void StopWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void Start() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void RegisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void UnregisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void InterruptWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs);
Expand Down Expand Up @@ -500,8 +503,18 @@ class CSigSharesManager : public CRecoveredSigsListener
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, Uint256HashMap<CSigSharesInv>>& sigSharesToAnnounce)
EXCLUSIVE_LOCKS_REQUIRED(cs);
void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);
void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);

// Thread main functions
void HousekeepingThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void WorkDispatcherThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);

// Dispatcher functions
void DispatchPendingSigns() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns);
void DispatchPendingProcessing() EXCLUSIVE_LOCKS_REQUIRED(!cs);

// Worker pool task functions
void ProcessPendingSigSharesLoop() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void SignAndProcessSingleShare(PendingSignatureData work) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);
};
} // namespace llmq

Expand Down
4 changes: 2 additions & 2 deletions src/masternode/active/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ void ActiveContext::Start(CConnman& connman, PeerManager& peerman)
{
m_llmq_ctx.qdkgsman->StartThreads(connman, peerman);
shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread();
shareman->Start();
}

void ActiveContext::Stop()
{
shareman->StopWorkerThread();
shareman->Stop();
shareman->UnregisterAsRecoveredSigsListener();
m_llmq_ctx.qdkgsman->StopThreads();
}
Loading