Skip to content

Commit

Permalink
llmq|rpc|test|version: Implement P2P messages QGETDATA <-> QDATA (#3953)
Browse files Browse the repository at this point in the history
* version: Bump PROTOCOL_VERSION and MIN_MASTERNODE_PROTO_VERSION

* version: Introduce LLMQ_DATA_MESSAGES_VERSION for QGETDATA/QDATA support

* test: Bump MY_VERSION to 70219 (LLMQ_DATA_MESSAGES_VERSION)

* llmq: Introduce CQuorumDataRequest as wrapper for QGETDATA requests

* llmq: Implement CQuorum::{SetVerificationVector, SetSecretKeyShare}

* llmq|net|protocol: Implement QGETDATA/QDATA P2P messages

* llmq: Restrict processing QGETDATA/QDATA to masternodes only

* llmq: Implement request limiting for QGETDATA/QDATA

* llmq: Implement CQuorumManger::RequestQuorumData

* rpc: Implement "quorum getdata" as wrapper around QGETDATA

Allows to trigger sending QGETDATA messages to connected peers by RPC.

* test: Handle QGETDATA/QDATA messages in mininode

* test: Add data structures to support QGETDATA/QDATA

* test: Add some helper in test_framework.py

* test: Implement tests for QGETDATA/QDATA in p2p_quorum_data.py

* test: Add p2p_quorum_data.py to BASE_SCRIPTS

* llmq|test: Add QWATCH support for QGETDATA/QDATA

* llmq: Store CQuorumPtr in cache, not CQuorumCPtr

* llmq: Fix cache usage after recent changes

* Use uacomment to create/find specific p2ps

* No need to use network adjusted time here, GetTime should be enough

* rpc: check proTxHash

* minor tweaks

* test: Adjustments after 4e27d65

* llmq: Rename and improve error lambda in CQuorumManager::ProcessMessage

* llmq: Process QDATA if -watchquorums is enabled

* test: Handle qwatch messages in mininode

* test: Add test for -watchquorums support

* test: Just some empty lines

* test: Properly stop the p2p network thread at the end of the test

* rpc: Adjust "quorum getdata" parameter descriptions

Co-authored-by: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com>

* rpc: Fix optionality of proTxHash in "quorum getdata" command

* test: Test optionality of proTxHash for "quorum getdata" command

* test: Be more specific about imports in p2p_quorum_data.py

* llmq|rpc: Add some comments about the request.GetDataMask checks

* test: Some more empty lines

* rpc: One more parameter description

Co-authored-by: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com>

* test: Unify assert statements / drop parentheses for all of them

* fix typo

Signed-off-by: pasta <pasta@dashboost.org>

* adjust some line wrapping to 80 chars

Signed-off-by: pasta <pasta@dashboost.org>

* tests: Seperate out into dif atomic methods, add logging

Signed-off-by: pasta <pasta@dashboost.org>

* test: Avoid restarting masternodes, just let available requests expire

Just takes a lot time and isn't required imo.

* test: Drop redundant code/tests after separation

This was introduced in 9e224ec

* test: Merge three tests

"test_mnauth_restriction", "test_invalid_messages" and "test_invalid_unexpected_qdata" with the resulting name "test_basics" because i don't feel like DKG recovery thing should be part of a test called "test_invalid_messages" and giving it an own test probably wouldn't make a lot sense because it would still depend on "test_invalid_messages". I also think there is no need for a separated "test_invalid_unexpected_qdata".

* test: Rename test_ratelimiting_banscore -> test_request_limit

* test: Apply python style

* test: Wrap all at 120 characters

Thats the default "draw annoying warnings" setting for PyCharm (and IMO a reasonable line length).

* test: Move some variables

* test: Optimize for speed

* tests: use wait_until in get_mininode_id

* test: Don't use `!=` to check for `None`

Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
Co-authored-by: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com>
Co-authored-by: pasta <pasta@dashboost.org>
  • Loading branch information
4 people authored Jan 28, 2021
1 parent 5d4431c commit 21cfb4c
Show file tree
Hide file tree
Showing 12 changed files with 1,065 additions and 6 deletions.
266 changes: 264 additions & 2 deletions src/llmq/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <chainparams.h>
#include <init.h>
#include <masternode/masternode-sync.h>
#include <net_processing.h>
#include <netmessagemaker.h>
#include <univalue.h>
#include <validation.h>

Expand All @@ -28,6 +30,9 @@ static const std::string DB_QUORUM_QUORUM_VVEC = "q_Qqvvec";

CQuorumManager* quorumManager;

CCriticalSection cs_data_requests;
static std::unordered_map<std::pair<uint256, bool>, CQuorumDataRequest, StaticSaltedHasher> mapQuorumDataRequests;

static uint256 MakeQuorumKey(const CQuorum& q)
{
CHashWriter hw(SER_NETWORK, 0);
Expand All @@ -53,6 +58,24 @@ void CQuorum::Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum
minedBlockHash = _minedBlockHash;
}

bool CQuorum::SetVerificationVector(const BLSVerificationVector& quorumVecIn)
{
if (::SerializeHash(quorumVecIn) != qc.quorumVvecHash) {
return false;
}
quorumVvec = std::make_shared<BLSVerificationVector>(quorumVecIn);
return true;
}

bool CQuorum::SetSecretKeyShare(const CBLSSecretKey& secretKeyShare)
{
if (!secretKeyShare.IsValid() || (secretKeyShare.GetPublicKey() != GetPubKeyShare(GetMemberIndex(activeMasternodeInfo.proTxHash)))) {
return false;
}
skShare = secretKeyShare;
return true;
}

bool CQuorum::IsMember(const uint256& proTxHash) const
{
for (auto& dmn : members) {
Expand Down Expand Up @@ -168,6 +191,17 @@ void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitial
for (auto& p : Params().GetConsensus().llmqs) {
EnsureQuorumConnections(p.first, pindexNew);
}

// Cleanup expired data requests
LOCK(cs_data_requests);
auto it = mapQuorumDataRequests.begin();
while (it != mapQuorumDataRequests.end()) {
if (it->second.IsExpired()) {
it = mapQuorumDataRequests.erase(it);
} else {
++it;
}
}
}

void CQuorumManager::EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexNew) const
Expand Down Expand Up @@ -238,7 +272,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l
CQuorum::StartCachePopulatorThread(quorum);
}

mapQuorumsCache[llmqType].emplace(quorumHash, quorum);
mapQuorumsCache[llmqType].insert(quorumHash, quorum);

return quorum;
}
Expand Down Expand Up @@ -284,6 +318,43 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const uint256& quor
return quorumBlockProcessor->HasMinedCommitment(llmqType, quorumHash);
}

bool CQuorumManager::RequestQuorumData(CNode* pFrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumIndex, uint16_t nDataMask, const uint256& proTxHash)
{
if (pFrom->nVersion < LLMQ_DATA_MESSAGES_VERSION) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Version must be %d or greater.\n", __func__, LLMQ_DATA_MESSAGES_VERSION);
return false;
}
if (pFrom == nullptr || (pFrom->verifiedProRegTxHash.IsNull() && !pFrom->qwatch)) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- pFrom is neither a verified masternode nor a qwatch connection\n", __func__);
return false;
}
if (Params().GetConsensus().llmqs.count(llmqType) == 0) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid llmqType: %d\n", __func__, llmqType);
return false;
}
if (pQuorumIndex == nullptr) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid pQuorumIndex: nullptr\n", __func__);
return false;
}
if (GetQuorum(llmqType, pQuorumIndex) == nullptr) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Quorum not found: %s, %d\n", __func__, pQuorumIndex->GetBlockHash().ToString(), llmqType);
return false;
}

LOCK(cs_data_requests);
auto key = std::make_pair(pFrom->verifiedProRegTxHash, true);
auto it = mapQuorumDataRequests.emplace(key, CQuorumDataRequest(llmqType, pQuorumIndex->GetBlockHash(), nDataMask, proTxHash));
if (!it.second && !it.first->second.IsExpired()) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Already requested\n", __func__);
return false;
}

CNetMsgMaker msgMaker(pFrom->GetSendVersion());
g_connman->PushMessage(pFrom, msgMaker.Make(NetMsgType::QGETDATA, it.first->second));

return true;
}

std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const
{
const CBlockIndex* pindex;
Expand Down Expand Up @@ -381,12 +452,203 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock
}

LOCK(quorumsCacheCs);
CQuorumCPtr pQuorum;
CQuorumPtr pQuorum;
if (mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) {
return pQuorum;
}

return BuildQuorumFromCommitment(llmqType, pindexQuorum);
}

void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, CDataStream& vRecv)
{
auto strFunc = __func__;
auto errorHandler = [&](const std::string strError, int nScore = 10) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: %s, from peer=%d\n", strFunc, strCommand, strError, pFrom->GetId());
if (nScore > 0) {
LOCK(cs_main);
Misbehaving(pFrom->GetId(), nScore);
}
};

if (strCommand == NetMsgType::QGETDATA) {

if (!fMasternodeMode || pFrom == nullptr || (pFrom->verifiedProRegTxHash.IsNull() && !pFrom->qwatch)) {
errorHandler("Not a verified masternode or a qwatch connection");
return;
}

CQuorumDataRequest request;
vRecv >> request;

auto sendQDATA = [&](CQuorumDataRequest::Errors nError = CQuorumDataRequest::Errors::UNDEFINED,
const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) {
request.SetError(nError);
CDataStream ssResponse(SER_NETWORK, pFrom->GetSendVersion(), request, body);
g_connman->PushMessage(pFrom, CNetMsgMaker(pFrom->GetSendVersion()).Make(NetMsgType::QDATA, ssResponse));
};

{
LOCK2(cs_main, cs_data_requests);
auto key = std::make_pair(pFrom->verifiedProRegTxHash, false);
auto it = mapQuorumDataRequests.find(key);
if (it == mapQuorumDataRequests.end()) {
it = mapQuorumDataRequests.emplace(key, request).first;
} else if(it->second.IsExpired()) {
it->second = request;
} else {
errorHandler("Request limit exceeded", 25);
}
}

if (Params().GetConsensus().llmqs.count(request.GetLLMQType()) == 0) {
sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID);
return;
}

const CBlockIndex* pQuorumIndex{nullptr};
{
LOCK(cs_main);
pQuorumIndex = LookupBlockIndex(request.GetQuorumHash());
}
if (pQuorumIndex == nullptr) {
sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND);
return;
}

const CQuorumCPtr pQuorum = GetQuorum(request.GetLLMQType(), pQuorumIndex);
if (pQuorum == nullptr) {
sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND);
return;
}

CDataStream ssResponseData(SER_NETWORK, pFrom->GetSendVersion());

// Check if request wants QUORUM_VERIFICATION_VECTOR data
if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {

if (!pQuorum->quorumVvec) {
sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING);
return;
}

ssResponseData << *pQuorum->quorumVvec;
}

// Check if request wants ENCRYPTED_CONTRIBUTIONS data
if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) {

int memberIdx = pQuorum->GetMemberIndex(request.GetProTxHash());
if (memberIdx == -1) {
sendQDATA(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER);
return;
}

std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
if (!quorumDKGSessionManager->GetEncryptedContributions(request.GetLLMQType(), pQuorumIndex, pQuorum->qc.validMembers, request.GetProTxHash(), vecEncrypted)) {
sendQDATA(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING);
return;
}

ssResponseData << vecEncrypted;
}

sendQDATA(CQuorumDataRequest::Errors::NONE, ssResponseData);
return;
}

if (strCommand == NetMsgType::QDATA) {

bool fIsWatching = gArgs.GetBoolArg("-watchquorums", DEFAULT_WATCH_QUORUMS);
if ((!fMasternodeMode && !fIsWatching) || pFrom == nullptr || (pFrom->verifiedProRegTxHash.IsNull() && !pFrom->qwatch)) {
errorHandler("Not a verified masternode or a qwatch connection");
return;
}

CQuorumDataRequest request;
vRecv >> request;

{
LOCK2(cs_main, cs_data_requests);
auto it = mapQuorumDataRequests.find(std::make_pair(pFrom->verifiedProRegTxHash, true));
if (it == mapQuorumDataRequests.end()) {
errorHandler("Not requested");
return;
}
if (it->second.IsProcessed()) {
errorHandler("Already received");
return;
}
if (request != it->second) {
errorHandler("Not like requested");
return;
}
it->second.SetProcessed();
}

if (request.GetError() != CQuorumDataRequest::Errors::NONE) {
errorHandler(strprintf("Error %d", request.GetError()), 0);
return;
}

CQuorumPtr pQuorum;
{
LOCK(quorumsCacheCs);
if (!mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) {
errorHandler("Quorum not found", 0); // Don't bump score because we asked for it
return;
}
}

// Check if request has QUORUM_VERIFICATION_VECTOR data
if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) {

BLSVerificationVector verficationVector;
vRecv >> verficationVector;

if (pQuorum->SetVerificationVector(verficationVector)) {
CQuorum::StartCachePopulatorThread(pQuorum);
} else {
errorHandler("Invalid quorum verification vector");
return;
}
}

// Check if request has ENCRYPTED_CONTRIBUTIONS data
if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) {

if (pQuorum->quorumVvec->size() != pQuorum->params.threshold) {
errorHandler("No valid quorum verification vector available", 0); // Don't bump score because we asked for it
return;
}

int memberIdx = pQuorum->GetMemberIndex(request.GetProTxHash());
if (memberIdx == -1) {
errorHandler("Not a member of the quorum", 0); // Don't bump score because we asked for it
return;
}

std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
vRecv >> vecEncrypted;

BLSSecretKeyVector vecSecretKeys;
vecSecretKeys.resize(vecEncrypted.size());
for (size_t i = 0; i < vecEncrypted.size(); ++i) {
if (!vecEncrypted[i].Decrypt(memberIdx, *activeMasternodeInfo.blsKeyOperator, vecSecretKeys[i], PROTOCOL_VERSION)) {
errorHandler("Failed to decrypt");
return;
}
}

CBLSSecretKey secretKeyShare = blsWorker.AggregateSecretKeys(vecSecretKeys);
if (!pQuorum->SetSecretKeyShare(secretKeyShare)) {
errorHandler("Invalid secret key share received");
return;
}
}
pQuorum->WriteContributions(evoDb);
return;
}
}

} // namespace llmq
Loading

0 comments on commit 21cfb4c

Please sign in to comment.