Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/llmq/quorums_dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ void CDKGPendingMessages::Clear()

//////

CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
params(_params),
messageHandlerPool(_messageHandlerPool),
blsWorker(_blsWorker),
dkgManager(_dkgManager),
curSession(std::make_shared<CDKGSession>(_params, _blsWorker, _dkgManager)),
Expand All @@ -95,18 +94,13 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctp
pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION),
pendingPrematureCommitments((size_t)_params.size * 2, MSG_QUORUM_PREMATURE_COMMITMENT)
{
phaseHandlerThread = std::thread([this] {
RenameThread(strprintf("dash-q-phase-%d", (uint8_t)params.type).c_str());
PhaseHandlerThread();
});
if (params.type == Consensus::LLMQ_NONE) {
throw std::runtime_error("Can't initialize CDKGSessionHandler with LLMQ_NONE type.");
}
}

CDKGSessionHandler::~CDKGSessionHandler()
{
stopRequested = true;
if (phaseHandlerThread.joinable()) {
phaseHandlerThread.join();
}
}

void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
Expand Down Expand Up @@ -145,6 +139,24 @@ void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strComm
}
}

void CDKGSessionHandler::StartThread()
{
if (phaseHandlerThread.joinable()) {
throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread.");
}

std::string threadName = strprintf("q-phase-%d", params.type);
phaseHandlerThread = std::thread(&TraceThread<std::function<void()> >, threadName, std::function<void()>(std::bind(&CDKGSessionHandler::PhaseHandlerThread, this)));
}

void CDKGSessionHandler::StopThread()
{
stopRequested = true;
if (phaseHandlerThread.joinable()) {
phaseHandlerThread.join();
}
}

bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pindexQuorum)
{
//AssertLockHeld(cs_main);
Expand Down
6 changes: 4 additions & 2 deletions src/llmq/quorums_dkgsessionhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class CDKGSessionHandler
std::atomic<bool> stopRequested{false};

const Consensus::LLMQParams& params;
ctpl::thread_pool& messageHandlerPool;
CBLSWorker& blsWorker;
CDKGSessionManager& dkgManager;

Expand All @@ -120,12 +119,15 @@ class CDKGSessionHandler
CDKGPendingMessages pendingPrematureCommitments;

public:
CDKGSessionHandler(const Consensus::LLMQParams& _params, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
~CDKGSessionHandler();

void UpdatedBlockTip(const CBlockIndex *pindexNew);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);

void StartThread();
void StopThread();

private:
bool InitNewQuorum(const CBlockIndex* pindexQuorum);

Expand Down
22 changes: 12 additions & 10 deletions src/llmq/quorums_dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,29 @@ CDKGSessionManager::CDKGSessionManager(CDBWrapper& _llmqDb, CBLSWorker& _blsWork
llmqDb(_llmqDb),
blsWorker(_blsWorker)
{
for (const auto& qt : Params().GetConsensus().llmqs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why's this called qt? maybe quorum or quorumThread would be clearer (I'm not actually sure what exactly this variable represents)

Copy link
Author

@xdustinface xdustinface Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea why its called like that i just moved it around a bit. But it represents a key/value pair from the Consensus::Params::llmqs map which contains LLMQType, LLMQParams pairs for the available LLMQs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we make it not auto so it's clearer what this is referring to. IMO we should only really be using auto where by the name of it and the surrounding code it's obvious. I guess it's obvious in the sense you can look at llmqs and find what that is...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess it's obvious in the sense you can look at llmqs and find what

Hm.. so for me that auto style isn't an issue at all because the declaration of llmqs is only a click away. But thats obviously a personal preference thing and at the end anyway kind of unrelated to this PR though because it's used like this all over the place :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's fine how it is. Just makes code review easier when it's obvious what variables mean

dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, blsWorker, *this));
}
}

CDKGSessionManager::~CDKGSessionManager()
{
}

void CDKGSessionManager::StartMessageHandlerPool()
void CDKGSessionManager::StartThreads()
{
for (const auto& qt : Params().GetConsensus().llmqs) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, messageHandlerPool, blsWorker, *this));
for (auto& it : dkgSessionHandlers) {
it.second.StartThread();
}

messageHandlerPool.resize(2);
RenameThreadPool(messageHandlerPool, "dash-q-msg");
}

void CDKGSessionManager::StopMessageHandlerPool()
void CDKGSessionManager::StopThreads()
{
messageHandlerPool.stop(true);
for (auto& it : dkgSessionHandlers) {
it.second.StopThread();
}
}

void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload)
Expand Down
5 changes: 2 additions & 3 deletions src/llmq/quorums_dkgsessionmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class CDKGSessionManager
private:
CDBWrapper& llmqDb;
CBLSWorker& blsWorker;
ctpl::thread_pool messageHandlerPool;

std::map<Consensus::LLMQType, CDKGSessionHandler> dkgSessionHandlers;

Expand All @@ -50,8 +49,8 @@ class CDKGSessionManager
CDKGSessionManager(CDBWrapper& _llmqDb, CBLSWorker& _blsWorker);
~CDKGSessionManager();

void StartMessageHandlerPool();
void StopMessageHandlerPool();
void StartThreads();
void StopThreads();

void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload);

Expand Down
4 changes: 2 additions & 2 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void StartLLMQSystem()
blsWorker->Start();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StartMessageHandlerPool();
quorumDKGSessionManager->StartThreads();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->RegisterAsRecoveredSigsListener();
Expand All @@ -97,7 +97,7 @@ void StopLLMQSystem()
quorumSigSharesManager->UnregisterAsRecoveredSigsListener();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopMessageHandlerPool();
quorumDKGSessionManager->StopThreads();
}
if (blsWorker) {
blsWorker->Stop();
Expand Down