From 21cfb4c9349ecca9bb1dc104196d57e997eb01b8 Mon Sep 17 00:00:00 2001 From: dustinface <35775977+xdustinface@users.noreply.github.com> Date: Thu, 28 Jan 2021 23:33:18 +0100 Subject: [PATCH] llmq|rpc|test|version: Implement P2P messages QGETDATA <-> QDATA (#3953) * version: Bump PROTOCOL_VERSION and MIN_MASTERNODE_PROTO_VERSION * version: Introduce LLMQ_DATA_MESSAGES_VERSION for QGETDATA/QDATA support * test: Bump MY_VERSION to 70219 (LLMQ_DATA_MESSAGES_VERSION) * llmq: Introduce CQuorumDataRequest as wrapper for QGETDATA requests * llmq: Implement CQuorum::{SetVerificationVector, SetSecretKeyShare} * llmq|net|protocol: Implement QGETDATA/QDATA P2P messages * llmq: Restrict processing QGETDATA/QDATA to masternodes only * llmq: Implement request limiting for QGETDATA/QDATA * llmq: Implement CQuorumManger::RequestQuorumData * rpc: Implement "quorum getdata" as wrapper around QGETDATA Allows to trigger sending QGETDATA messages to connected peers by RPC. * test: Handle QGETDATA/QDATA messages in mininode * test: Add data structures to support QGETDATA/QDATA * test: Add some helper in test_framework.py * test: Implement tests for QGETDATA/QDATA in p2p_quorum_data.py * test: Add p2p_quorum_data.py to BASE_SCRIPTS * llmq|test: Add QWATCH support for QGETDATA/QDATA * llmq: Store CQuorumPtr in cache, not CQuorumCPtr * llmq: Fix cache usage after recent changes * Use uacomment to create/find specific p2ps * No need to use network adjusted time here, GetTime should be enough * rpc: check proTxHash * minor tweaks * test: Adjustments after 4e27d6513e0073ed848ede262cfec82a9134abc0 * llmq: Rename and improve error lambda in CQuorumManager::ProcessMessage * llmq: Process QDATA if -watchquorums is enabled * test: Handle qwatch messages in mininode * test: Add test for -watchquorums support * test: Just some empty lines * test: Properly stop the p2p network thread at the end of the test * rpc: Adjust "quorum getdata" parameter descriptions Co-authored-by: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com> * rpc: Fix optionality of proTxHash in "quorum getdata" command * test: Test optionality of proTxHash for "quorum getdata" command * test: Be more specific about imports in p2p_quorum_data.py * llmq|rpc: Add some comments about the request.GetDataMask checks * test: Some more empty lines * rpc: One more parameter description Co-authored-by: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com> * test: Unify assert statements / drop parentheses for all of them * fix typo Signed-off-by: pasta * adjust some line wrapping to 80 chars Signed-off-by: pasta * tests: Seperate out into dif atomic methods, add logging Signed-off-by: pasta * test: Avoid restarting masternodes, just let available requests expire Just takes a lot time and isn't required imo. * test: Drop redundant code/tests after separation This was introduced in 9e224ec2f2ef4a58adaf0f9d4ffe110e379718ef * test: Merge three tests "test_mnauth_restriction", "test_invalid_messages" and "test_invalid_unexpected_qdata" with the resulting name "test_basics" because i don't feel like DKG recovery thing should be part of a test called "test_invalid_messages" and giving it an own test probably wouldn't make a lot sense because it would still depend on "test_invalid_messages". I also think there is no need for a separated "test_invalid_unexpected_qdata". * test: Rename test_ratelimiting_banscore -> test_request_limit * test: Apply python style * test: Wrap all at 120 characters Thats the default "draw annoying warnings" setting for PyCharm (and IMO a reasonable line length). * test: Move some variables * test: Optimize for speed * tests: use wait_until in get_mininode_id * test: Don't use `!=` to check for `None` Co-authored-by: UdjinM6 Co-authored-by: PastaPastaPasta <6443210+PastaPastaPasta@users.noreply.github.com> Co-authored-by: pasta --- src/llmq/quorums.cpp | 266 +++++++++- src/llmq/quorums.h | 111 ++++- src/net_processing.cpp | 1 + src/protocol.cpp | 4 + src/protocol.h | 2 + src/rpc/rpcquorums.cpp | 53 ++ src/version.h | 7 +- test/functional/p2p_quorum_data.py | 460 ++++++++++++++++++ test/functional/test_framework/messages.py | 127 ++++- test/functional/test_framework/mininode.py | 7 + .../test_framework/test_framework.py | 32 ++ test/functional/test_runner.py | 1 + 12 files changed, 1065 insertions(+), 6 deletions(-) create mode 100755 test/functional/p2p_quorum_data.py diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 008c03c6ef2d5..7c37eb58d7a7e 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include @@ -28,6 +30,9 @@ static const std::string DB_QUORUM_QUORUM_VVEC = "q_Qqvvec"; CQuorumManager* quorumManager; +CCriticalSection cs_data_requests; +static std::unordered_map, CQuorumDataRequest, StaticSaltedHasher> mapQuorumDataRequests; + static uint256 MakeQuorumKey(const CQuorum& q) { CHashWriter hw(SER_NETWORK, 0); @@ -53,6 +58,24 @@ void CQuorum::Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum minedBlockHash = _minedBlockHash; } +bool CQuorum::SetVerificationVector(const BLSVerificationVector& quorumVecIn) +{ + if (::SerializeHash(quorumVecIn) != qc.quorumVvecHash) { + return false; + } + quorumVvec = std::make_shared(quorumVecIn); + return true; +} + +bool CQuorum::SetSecretKeyShare(const CBLSSecretKey& secretKeyShare) +{ + if (!secretKeyShare.IsValid() || (secretKeyShare.GetPublicKey() != GetPubKeyShare(GetMemberIndex(activeMasternodeInfo.proTxHash)))) { + return false; + } + skShare = secretKeyShare; + return true; +} + bool CQuorum::IsMember(const uint256& proTxHash) const { for (auto& dmn : members) { @@ -168,6 +191,17 @@ void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitial for (auto& p : Params().GetConsensus().llmqs) { EnsureQuorumConnections(p.first, pindexNew); } + + // Cleanup expired data requests + LOCK(cs_data_requests); + auto it = mapQuorumDataRequests.begin(); + while (it != mapQuorumDataRequests.end()) { + if (it->second.IsExpired()) { + it = mapQuorumDataRequests.erase(it); + } else { + ++it; + } + } } void CQuorumManager::EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexNew) const @@ -238,7 +272,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l CQuorum::StartCachePopulatorThread(quorum); } - mapQuorumsCache[llmqType].emplace(quorumHash, quorum); + mapQuorumsCache[llmqType].insert(quorumHash, quorum); return quorum; } @@ -284,6 +318,43 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const uint256& quor return quorumBlockProcessor->HasMinedCommitment(llmqType, quorumHash); } +bool CQuorumManager::RequestQuorumData(CNode* pFrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumIndex, uint16_t nDataMask, const uint256& proTxHash) +{ + if (pFrom->nVersion < LLMQ_DATA_MESSAGES_VERSION) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Version must be %d or greater.\n", __func__, LLMQ_DATA_MESSAGES_VERSION); + return false; + } + if (pFrom == nullptr || (pFrom->verifiedProRegTxHash.IsNull() && !pFrom->qwatch)) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- pFrom is neither a verified masternode nor a qwatch connection\n", __func__); + return false; + } + if (Params().GetConsensus().llmqs.count(llmqType) == 0) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid llmqType: %d\n", __func__, llmqType); + return false; + } + if (pQuorumIndex == nullptr) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid pQuorumIndex: nullptr\n", __func__); + return false; + } + if (GetQuorum(llmqType, pQuorumIndex) == nullptr) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Quorum not found: %s, %d\n", __func__, pQuorumIndex->GetBlockHash().ToString(), llmqType); + return false; + } + + LOCK(cs_data_requests); + auto key = std::make_pair(pFrom->verifiedProRegTxHash, true); + auto it = mapQuorumDataRequests.emplace(key, CQuorumDataRequest(llmqType, pQuorumIndex->GetBlockHash(), nDataMask, proTxHash)); + if (!it.second && !it.first->second.IsExpired()) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Already requested\n", __func__); + return false; + } + + CNetMsgMaker msgMaker(pFrom->GetSendVersion()); + g_connman->PushMessage(pFrom, msgMaker.Make(NetMsgType::QGETDATA, it.first->second)); + + return true; +} + std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const { const CBlockIndex* pindex; @@ -381,7 +452,7 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock } LOCK(quorumsCacheCs); - CQuorumCPtr pQuorum; + CQuorumPtr pQuorum; if (mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { return pQuorum; } @@ -389,4 +460,195 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock return BuildQuorumFromCommitment(llmqType, pindexQuorum); } +void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, CDataStream& vRecv) +{ + auto strFunc = __func__; + auto errorHandler = [&](const std::string strError, int nScore = 10) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: %s, from peer=%d\n", strFunc, strCommand, strError, pFrom->GetId()); + if (nScore > 0) { + LOCK(cs_main); + Misbehaving(pFrom->GetId(), nScore); + } + }; + + if (strCommand == NetMsgType::QGETDATA) { + + if (!fMasternodeMode || pFrom == nullptr || (pFrom->verifiedProRegTxHash.IsNull() && !pFrom->qwatch)) { + errorHandler("Not a verified masternode or a qwatch connection"); + return; + } + + CQuorumDataRequest request; + vRecv >> request; + + auto sendQDATA = [&](CQuorumDataRequest::Errors nError = CQuorumDataRequest::Errors::UNDEFINED, + const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) { + request.SetError(nError); + CDataStream ssResponse(SER_NETWORK, pFrom->GetSendVersion(), request, body); + g_connman->PushMessage(pFrom, CNetMsgMaker(pFrom->GetSendVersion()).Make(NetMsgType::QDATA, ssResponse)); + }; + + { + LOCK2(cs_main, cs_data_requests); + auto key = std::make_pair(pFrom->verifiedProRegTxHash, false); + auto it = mapQuorumDataRequests.find(key); + if (it == mapQuorumDataRequests.end()) { + it = mapQuorumDataRequests.emplace(key, request).first; + } else if(it->second.IsExpired()) { + it->second = request; + } else { + errorHandler("Request limit exceeded", 25); + } + } + + if (Params().GetConsensus().llmqs.count(request.GetLLMQType()) == 0) { + sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID); + return; + } + + const CBlockIndex* pQuorumIndex{nullptr}; + { + LOCK(cs_main); + pQuorumIndex = LookupBlockIndex(request.GetQuorumHash()); + } + if (pQuorumIndex == nullptr) { + sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND); + return; + } + + const CQuorumCPtr pQuorum = GetQuorum(request.GetLLMQType(), pQuorumIndex); + if (pQuorum == nullptr) { + sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND); + return; + } + + CDataStream ssResponseData(SER_NETWORK, pFrom->GetSendVersion()); + + // Check if request wants QUORUM_VERIFICATION_VECTOR data + if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { + + if (!pQuorum->quorumVvec) { + sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING); + return; + } + + ssResponseData << *pQuorum->quorumVvec; + } + + // Check if request wants ENCRYPTED_CONTRIBUTIONS data + if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) { + + int memberIdx = pQuorum->GetMemberIndex(request.GetProTxHash()); + if (memberIdx == -1) { + sendQDATA(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER); + return; + } + + std::vector> vecEncrypted; + if (!quorumDKGSessionManager->GetEncryptedContributions(request.GetLLMQType(), pQuorumIndex, pQuorum->qc.validMembers, request.GetProTxHash(), vecEncrypted)) { + sendQDATA(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING); + return; + } + + ssResponseData << vecEncrypted; + } + + sendQDATA(CQuorumDataRequest::Errors::NONE, ssResponseData); + return; + } + + if (strCommand == NetMsgType::QDATA) { + + bool fIsWatching = gArgs.GetBoolArg("-watchquorums", DEFAULT_WATCH_QUORUMS); + if ((!fMasternodeMode && !fIsWatching) || pFrom == nullptr || (pFrom->verifiedProRegTxHash.IsNull() && !pFrom->qwatch)) { + errorHandler("Not a verified masternode or a qwatch connection"); + return; + } + + CQuorumDataRequest request; + vRecv >> request; + + { + LOCK2(cs_main, cs_data_requests); + auto it = mapQuorumDataRequests.find(std::make_pair(pFrom->verifiedProRegTxHash, true)); + if (it == mapQuorumDataRequests.end()) { + errorHandler("Not requested"); + return; + } + if (it->second.IsProcessed()) { + errorHandler("Already received"); + return; + } + if (request != it->second) { + errorHandler("Not like requested"); + return; + } + it->second.SetProcessed(); + } + + if (request.GetError() != CQuorumDataRequest::Errors::NONE) { + errorHandler(strprintf("Error %d", request.GetError()), 0); + return; + } + + CQuorumPtr pQuorum; + { + LOCK(quorumsCacheCs); + if (!mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { + errorHandler("Quorum not found", 0); // Don't bump score because we asked for it + return; + } + } + + // Check if request has QUORUM_VERIFICATION_VECTOR data + if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { + + BLSVerificationVector verficationVector; + vRecv >> verficationVector; + + if (pQuorum->SetVerificationVector(verficationVector)) { + CQuorum::StartCachePopulatorThread(pQuorum); + } else { + errorHandler("Invalid quorum verification vector"); + return; + } + } + + // Check if request has ENCRYPTED_CONTRIBUTIONS data + if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) { + + if (pQuorum->quorumVvec->size() != pQuorum->params.threshold) { + errorHandler("No valid quorum verification vector available", 0); // Don't bump score because we asked for it + return; + } + + int memberIdx = pQuorum->GetMemberIndex(request.GetProTxHash()); + if (memberIdx == -1) { + errorHandler("Not a member of the quorum", 0); // Don't bump score because we asked for it + return; + } + + std::vector> vecEncrypted; + vRecv >> vecEncrypted; + + BLSSecretKeyVector vecSecretKeys; + vecSecretKeys.resize(vecEncrypted.size()); + for (size_t i = 0; i < vecEncrypted.size(); ++i) { + if (!vecEncrypted[i].Decrypt(memberIdx, *activeMasternodeInfo.blsKeyOperator, vecSecretKeys[i], PROTOCOL_VERSION)) { + errorHandler("Failed to decrypt"); + return; + } + } + + CBLSSecretKey secretKeyShare = blsWorker.AggregateSecretKeys(vecSecretKeys); + if (!pQuorum->SetSecretKeyShare(secretKeyShare)) { + errorHandler("Invalid secret key share received"); + return; + } + } + pQuorum->WriteContributions(evoDb); + return; + } +} + } // namespace llmq diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 8a64e9ffa4554..43a43465ae372 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -22,6 +22,106 @@ namespace llmq class CDKGSessionManager; +/** + * An object of this class represents a QGETDATA request or a QDATA response header + */ +class CQuorumDataRequest +{ +public: + + enum Flags : uint16_t { + QUORUM_VERIFICATION_VECTOR = 0x0001, + ENCRYPTED_CONTRIBUTIONS = 0x0002, + }; + enum Errors : uint8_t { + NONE = 0x00, + QUORUM_TYPE_INVALID = 0x01, + QUORUM_BLOCK_NOT_FOUND = 0x02, + QUORUM_NOT_FOUND = 0x03, + MASTERNODE_IS_NO_MEMBER = 0x04, + QUORUM_VERIFICATION_VECTOR_MISSING = 0x05, + ENCRYPTED_CONTRIBUTIONS_MISSING = 0x06, + UNDEFINED = 0xFF, + }; + +private: + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint16_t nDataMask; + uint256 proTxHash; + Errors nError; + + int64_t nTime; + bool fProcessed; + + static const int64_t EXPIRATION_TIMEOUT{300}; + +public: + + CQuorumDataRequest() : nTime(GetTime()) {} + CQuorumDataRequest(const Consensus::LLMQType llmqTypeIn, const uint256& quorumHashIn, const uint16_t nDataMaskIn, const uint256& proTxHashIn = uint256()) : + llmqType(llmqTypeIn), + quorumHash(quorumHashIn), + nDataMask(nDataMaskIn), + proTxHash(proTxHashIn), + nError(UNDEFINED), + nTime(GetTime()), + fProcessed(false) {} + + ADD_SERIALIZE_METHODS + + template + inline void SerializationOp(Stream& s, Operation ser_action) + { + READWRITE(llmqType); + READWRITE(quorumHash); + READWRITE(nDataMask); + READWRITE(proTxHash); + if (ser_action.ForRead()) { + try { + READWRITE(nError); + } catch (...) { + nError = UNDEFINED; + } + } else if (nError != UNDEFINED) { + READWRITE(nError); + } + } + + const Consensus::LLMQType GetLLMQType() const { return llmqType; } + const uint256& GetQuorumHash() const { return quorumHash; } + const uint16_t GetDataMask() const { return nDataMask; } + const uint256& GetProTxHash() const { return proTxHash; } + + void SetError(Errors nErrorIn) { nError = nErrorIn; } + const Errors GetError() const { return nError; } + + bool IsExpired() const + { + return (GetTime() - nTime) >= EXPIRATION_TIMEOUT; + } + bool IsProcessed() const + { + return fProcessed; + } + void SetProcessed() + { + fProcessed = true; + } + + bool operator==(const CQuorumDataRequest& other) + { + return llmqType == other.llmqType && + quorumHash == other.quorumHash && + nDataMask == other.nDataMask && + proTxHash == other.proTxHash; + } + bool operator!=(const CQuorumDataRequest& other) + { + return !(*this == other); + } +}; + /** * An object of this class represents a quorum which was mined on-chain (through a quorum commitment) * It at least contains information about the members and the quorum public key which is needed to verify recovered @@ -57,6 +157,9 @@ class CQuorum ~CQuorum(); void Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum, const uint256& _minedBlockHash, const std::vector& _members); + bool SetVerificationVector(const BLSVerificationVector& quorumVecIn); + bool SetSecretKeyShare(const CBLSSecretKey& secretKeyShare); + bool IsMember(const uint256& proTxHash) const; bool IsValidMember(const uint256& proTxHash) const; int GetMemberIndex(const uint256& proTxHash) const; @@ -86,7 +189,7 @@ class CQuorumManager CDKGSessionManager& dkgManager; mutable CCriticalSection quorumsCacheCs; - mutable std::map> mapQuorumsCache; + mutable std::map> mapQuorumsCache; mutable std::map, StaticSaltedHasher>> scanQuorumsCache; public: @@ -94,8 +197,12 @@ class CQuorumManager void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload) const; + void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv); + static bool HasQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash); + bool RequestQuorumData(CNode* pFrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumIndex, uint16_t nDataMask, const uint256& proTxHash = uint256()); + // all these methods will lock cs_main for a short period of time CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const; std::vector ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const; @@ -117,4 +224,6 @@ extern CQuorumManager* quorumManager; } // namespace llmq +template<> struct is_serializable_enum : std::true_type {}; + #endif // BITCOIN_LLMQ_QUORUMS_H diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 6e64b37bf3f33..065d86214df1c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -3584,6 +3584,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr CMNAuth::ProcessMessage(pfrom, strCommand, vRecv, *connman); llmq::quorumBlockProcessor->ProcessMessage(pfrom, strCommand, vRecv); llmq::quorumDKGSessionManager->ProcessMessage(pfrom, strCommand, vRecv); + llmq::quorumManager->ProcessMessage(pfrom, strCommand, vRecv); llmq::quorumSigSharesManager->ProcessMessage(pfrom, strCommand, vRecv); llmq::quorumSigningManager->ProcessMessage(pfrom, strCommand, vRecv); llmq::chainLocksHandler->ProcessMessage(pfrom, strCommand, vRecv); diff --git a/src/protocol.cpp b/src/protocol.cpp index bd7665b4aedc5..31a06be840ffa 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -72,6 +72,8 @@ const char *QGETSIGSHARES="qgetsigs"; const char *QBSIGSHARES="qbsigs"; const char *QSIGREC="qsigrec"; const char *QSIGSHARE="qsigshare"; +const char* QGETDATA = "qgetdata"; +const char* QDATA = "qdata"; const char *CLSIG="clsig"; const char *ISLOCK="islock"; const char *MNAUTH="mnauth"; @@ -139,6 +141,8 @@ const static std::string allNetMessageTypes[] = { NetMsgType::QBSIGSHARES, NetMsgType::QSIGREC, NetMsgType::QSIGSHARE, + NetMsgType::QGETDATA, + NetMsgType::QDATA, NetMsgType::CLSIG, NetMsgType::ISLOCK, NetMsgType::MNAUTH, diff --git a/src/protocol.h b/src/protocol.h index d26258f84f7dd..9ab34c64887b0 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -268,6 +268,8 @@ extern const char *QGETSIGSHARES; extern const char *QBSIGSHARES; extern const char *QSIGREC; extern const char *QSIGSHARE; +extern const char* QGETDATA; +extern const char* QDATA; extern const char *CLSIG; extern const char *ISLOCK; extern const char *MNAUTH; diff --git a/src/rpc/rpcquorums.cpp b/src/rpc/rpcquorums.cpp index be0f7830b6110..d24eab394358c 100644 --- a/src/rpc/rpcquorums.cpp +++ b/src/rpc/rpcquorums.cpp @@ -548,6 +548,57 @@ UniValue quorum_dkgsimerror(const JSONRPCRequest& request) return UniValue(); } +void quorum_getdata_help() +{ + throw std::runtime_error( + "quorum getdata nodeId llmqType \"quorumHash\" dataMask ( \"proTxHash\" )\n" + "Send a QGETDATA message to the specified peer.\n" + "\nArguments:\n" + "1. nodeId (integer, required) The internal nodeId of the peer to request quorum data from.\n" + "2. llmqType (integer, required) The quorum type related to the quorum data being requested.\n" + "3. \"quorumHash\" (string, required) The quorum hash related to the quorum data being requested.\n" + "4. dataMask (integer, required) Specify what data to request.\n" + " Possible values: 1 - Request quorum verification vector\n" + " 2 - Request encrypted contributions for member defined by \"proTxHash\". \"proTxHash\" must be specified if this option is used.\n" + " 3 - Request both, 1 and 2\n" + "5. \"proTxHash\" (string, optional) The proTxHash the contributions will be requested for. Must be member of the specified LLMQ.\n" + ); +} + +UniValue quorum_getdata(const JSONRPCRequest& request) +{ + if (request.fHelp || (request.params.size() < 5 || request.params.size() > 6)) { + quorum_getdata_help(); + } + + NodeId nodeId = ParseInt64V(request.params[1], "nodeId"); + Consensus::LLMQType llmqType = static_cast(ParseInt32V(request.params[2], "llmqType")); + uint256 quorumHash = ParseHashV(request.params[3], "quorumHash"); + uint16_t nDataMask = static_cast(ParseInt32V(request.params[4], "dataMask")); + uint256 proTxHash; + + // Check if request wants ENCRYPTED_CONTRIBUTIONS data + if (nDataMask & llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) { + if (!request.params[5].isNull()) { + proTxHash = ParseHashV(request.params[5], "proTxHash"); + if (proTxHash.IsNull()) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "proTxHash invalid"); + } + } else { + throw JSONRPCError(RPC_INVALID_PARAMETER, "proTxHash missing"); + } + } + + const CBlockIndex* pQuorumIndex{nullptr}; + { + LOCK(cs_main); + pQuorumIndex = LookupBlockIndex(quorumHash); + } + return g_connman->ForNode(nodeId, [&](CNode* pNode) { + return llmq::quorumManager->RequestQuorumData(pNode, llmqType, pQuorumIndex, nDataMask, proTxHash); + }); +} + [[ noreturn ]] void quorum_help() { @@ -597,6 +648,8 @@ UniValue quorum(const JSONRPCRequest& request) return quorum_selectquorum(request); } else if (command == "dkgsimerror") { return quorum_dkgsimerror(request); + } else if (command == "getdata") { + return quorum_getdata(request); } else { quorum_help(); } diff --git a/src/version.h b/src/version.h index 5c8d3f2d772cf..7a14f4a4f0778 100644 --- a/src/version.h +++ b/src/version.h @@ -11,7 +11,7 @@ */ -static const int PROTOCOL_VERSION = 70218; +static const int PROTOCOL_VERSION = 70219; //! initial proto version, to be increased after version/verack negotiation static const int INIT_PROTO_VERSION = 209; @@ -20,7 +20,7 @@ static const int INIT_PROTO_VERSION = 209; static const int MIN_PEER_PROTO_VERSION = 70213; //! minimum proto version of masternode to accept in DKGs -static const int MIN_MASTERNODE_PROTO_VERSION = 70218; +static const int MIN_MASTERNODE_PROTO_VERSION = 70219; //! nTime field added to CAddress, starting with this version; //! if possible, avoid requesting addresses nodes older than this @@ -36,4 +36,7 @@ static const int SENDDSQUEUE_PROTO_VERSION = 70214; //! protocol version is included in MNAUTH starting with this version static const int MNAUTH_NODE_VER_VERSION = 70218; +//! introduction of QGETDATA/QDATA messages +static const int LLMQ_DATA_MESSAGES_VERSION = 70219; + #endif // BITCOIN_VERSION_H diff --git a/test/functional/p2p_quorum_data.py b/test/functional/p2p_quorum_data.py new file mode 100755 index 0000000000000..41c60876fbcd2 --- /dev/null +++ b/test/functional/p2p_quorum_data.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python3 +# Copyright (c) 2021 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import time + +from test_framework.messages import msg_qgetdata, msg_qwatch +from test_framework.mininode import ( + mininode_lock, + network_thread_start, + network_thread_join, + P2PInterface, +) +from test_framework.test_framework import DashTestFramework +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, + connect_nodes, + force_finish_mnsync, + wait_until, +) + +''' +p2p_quorum_data.py + +Tests QGETDATA/QDATA functionality +''' + +# Possible error values of QDATA +QUORUM_TYPE_INVALID = 1 +QUORUM_BLOCK_NOT_FOUND = 2 +QUORUM_NOT_FOUND = 3 +MASTERNODE_IS_NO_MEMBER = 4 +QUORUM_VERIFICATION_VECTOR_MISSING = 5 +ENCRYPTED_CONTRIBUTIONS_MISSING = 6 + +# Used to overwrite MNAUTH for mininode connections +fake_mnauth_1 = ["cecf37bf0ec05d2d22cb8227f88074bb882b94cd2081ba318a5a444b1b15b9fd", + "087ba00bf61135f3860c4944a0debabe186ef82628fbe4ceaed1ad51d672c58dde14ea4b321efe0b89257a40322bc972"] +fake_mnauth_2 = ["6ad7ed7a2d6c2c1db30fc364114602b36b2730a9aa96d8f11f1871a9cee37378", + "122463411a86362966a5161805f24cf6a0eef08a586b8e00c4f0ad0b084c5bb3f5c9a60ee5ffc78db2313897e3ab2223"] + +# Used to distinguish mininode connections +uacomment_m3_1 = "MN3_1" +uacomment_m3_2 = "MN3_2" + + +def assert_qdata(qdata, qgetdata, error, len_vvec=0, len_contributions=0): + assert qdata is not None and qgetdata is not None + assert_equal(qdata.quorum_type, qgetdata.quorum_type) + assert_equal(qdata.quorum_hash, qgetdata.quorum_hash) + assert_equal(qdata.data_mask, qgetdata.data_mask) + assert_equal(qdata.protx_hash, qgetdata.protx_hash) + assert_equal(qdata.error, error) + assert_equal(len(qdata.quorum_vvec), len_vvec) + assert_equal(len(qdata.enc_contributions), len_contributions) + + +def wait_for_banscore(node, peer_id, expected_score): + def get_score(): + for peer in node.getpeerinfo(): + if peer["id"] == peer_id: + return peer["banscore"] + return None + wait_until(lambda: get_score() == expected_score, timeout=6) + + +def p2p_connection(node, uacomment=None): + return node.add_p2p_connection(QuorumDataInterface(), uacomment=uacomment) + + +def get_mininode_id(node, uacomment=None): + def get_id(): + for p in node.getpeerinfo(): + for p2p in node.p2ps: + if uacomment is not None and p2p.uacomment != uacomment: + continue + if p["subver"] == p2p.strSubVer.decode(): + return p["id"] + return None + wait_until(lambda: get_id() is not None, timeout=10) + return get_id() + + +def mnauth(node, node_id, protx_hash, operator_pubkey): + assert node.mnauth(node_id, protx_hash, operator_pubkey) + mnauth_peer_id = None + for peer in node.getpeerinfo(): + if "verified_proregtx_hash" in peer and peer["verified_proregtx_hash"] == protx_hash: + assert_equal(mnauth_peer_id, None) + mnauth_peer_id = peer["id"] + assert_equal(mnauth_peer_id, node_id) + + +class QuorumDataInterface(P2PInterface): + def __init__(self): + super().__init__() + + def test_qgetdata(self, qgetdata, expected_error=0, len_vvec=0, len_contributions=0, response_expected=True): + self.send_message(qgetdata) + self.wait_for_qdata(message_expected=response_expected) + if response_expected: + assert_qdata(self.get_qdata(), qgetdata, expected_error, len_vvec, len_contributions) + + def wait_for_qgetdata(self, timeout=3, message_expected=True): + def test_function(): + return self.message_count["qgetdata"] + wait_until(test_function, timeout=timeout, lock=mininode_lock, do_assert=message_expected) + self.message_count["qgetdata"] = 0 + if not message_expected: + assert not self.message_count["qgetdata"] + + def get_qdata(self): + return self.last_message["qdata"] + + def wait_for_qdata(self, timeout=10, message_expected=True): + def test_function(): + return self.message_count["qdata"] + wait_until(test_function, timeout=timeout, lock=mininode_lock, do_assert=message_expected) + self.message_count["qdata"] = 0 + if not message_expected: + assert not self.message_count["qdata"] + + +class QuorumDataMessagesTest(DashTestFramework): + def set_test_params(self): + self.set_dash_test_params(4, 3, fast_dip3_enforcement=True) + + def restart_mn(self, mn, reindex=False): + args = self.extra_args[mn.nodeIdx] + ['-masternodeblsprivkey=%s' % mn.keyOperator] + if reindex: + args.append('-reindex') + self.restart_node(mn.nodeIdx, args) + force_finish_mnsync(mn.node) + connect_nodes(mn.node, 0) + self.sync_blocks() + + def run_test(self): + + def force_request_expire(bump_seconds=self.quorum_data_request_expiration_timeout + 1): + self.bump_mocktime(bump_seconds) + # Test with/without expired request cleanup + if node0.getblockcount() % 2: + node0.generate(1) + self.sync_blocks() + + def test_basics(): + self.log.info("Testing basics of QGETDATA/QDATA") + p2p_node0 = p2p_connection(node0) + p2p_mn1 = p2p_connection(mn1.node) + network_thread_start() + p2p_node0.wait_for_verack() + p2p_mn1.wait_for_verack() + id_p2p_node0 = get_mininode_id(node0) + id_p2p_mn1 = get_mininode_id(mn1.node) + + # Ensure that both nodes start with zero ban score + wait_for_banscore(node0, id_p2p_node0, 0) + wait_for_banscore(mn1.node, id_p2p_mn1, 0) + + self.log.info("Check that normal node doesn't respond to qgetdata " + "and does bump our score") + p2p_node0.test_qgetdata(qgetdata_all, response_expected=False) + wait_for_banscore(node0, id_p2p_node0, 10) + # The masternode should not respond to qgetdata for non-masternode connections + self.log.info("Check that masternode doesn't respond to " + "non-masternode connection. Doesn't bump score.") + p2p_mn1.test_qgetdata(qgetdata_all, response_expected=False) + wait_for_banscore(mn1.node, id_p2p_mn1, 10) + # Open a fake MNAUTH authenticated P2P connection to the masternode to allow qgetdata + node0.disconnect_p2ps() + mn1.node.disconnect_p2ps() + network_thread_join() + p2p_mn1 = p2p_connection(mn1.node) + network_thread_start() + p2p_mn1.wait_for_verack() + id_p2p_mn1 = get_mininode_id(mn1.node) + mnauth(mn1.node, id_p2p_mn1, fake_mnauth_1[0], fake_mnauth_1[1]) + # The masternode should now respond to qgetdata requests + self.log.info("Request verification vector") + p2p_mn1.test_qgetdata(qgetdata_vvec, 0, self.llmq_threshold, 0) + wait_for_banscore(mn1.node, id_p2p_mn1, 0) + # Note: our banscore is bumped as we are requesting too rapidly, + # however the node still returns the data + self.log.info("Request encrypted contributions") + p2p_mn1.test_qgetdata(qgetdata_contributions, 0, 0, self.llmq_size) + wait_for_banscore(mn1.node, id_p2p_mn1, 25) + # Request both + # Note: our banscore is bumped as we are requesting too rapidly, + # however the node still returns the data + self.log.info("Request both") + p2p_mn1.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + wait_for_banscore(mn1.node, id_p2p_mn1, 50) + mn1.node.disconnect_p2ps() + network_thread_join() + self.log.info("Test ban score increase for invalid / unexpected QDATA") + p2p_mn1 = p2p_connection(mn1.node) + p2p_mn2 = p2p_connection(mn2.node) + network_thread_start() + p2p_mn1.wait_for_verack() + p2p_mn2.wait_for_verack() + id_p2p_mn1 = get_mininode_id(mn1.node) + id_p2p_mn2 = get_mininode_id(mn2.node) + mnauth(mn1.node, id_p2p_mn1, fake_mnauth_1[0], fake_mnauth_1[1]) + mnauth(mn2.node, id_p2p_mn2, fake_mnauth_2[0], fake_mnauth_2[1]) + wait_for_banscore(mn1.node, id_p2p_mn1, 0) + p2p_mn2.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + qdata_valid = p2p_mn2.get_qdata() + # - Not requested + p2p_mn1.send_message(qdata_valid) + time.sleep(1) + wait_for_banscore(mn1.node, id_p2p_mn1, 10) + # - Already received + force_request_expire() + assert mn1.node.quorum("getdata", id_p2p_mn1, 100, quorum_hash, 0x03, mn1.proTxHash) + p2p_mn1.wait_for_qgetdata() + p2p_mn1.send_message(qdata_valid) + time.sleep(1) + p2p_mn1.send_message(qdata_valid) + wait_for_banscore(mn1.node, id_p2p_mn1, 20) + # - Not like requested + force_request_expire() + assert mn1.node.quorum("getdata", id_p2p_mn1, 100, quorum_hash, 0x03, mn1.proTxHash) + p2p_mn1.wait_for_qgetdata() + qdata_invalid_request = qdata_valid + qdata_invalid_request.data_mask = 2 + p2p_mn1.send_message(qdata_invalid_request) + wait_for_banscore(mn1.node, id_p2p_mn1, 30) + # - Invalid verification vector + force_request_expire() + assert mn1.node.quorum("getdata", id_p2p_mn1, 100, quorum_hash, 0x03, mn1.proTxHash) + p2p_mn1.wait_for_qgetdata() + qdata_invalid_vvec = qdata_valid + qdata_invalid_vvec.quorum_vvec.pop() + p2p_mn1.send_message(qdata_invalid_vvec) + wait_for_banscore(mn1.node, id_p2p_mn1, 40) + # - Invalid contributions + force_request_expire() + assert mn1.node.quorum("getdata", id_p2p_mn1, 100, quorum_hash, 0x03, mn1.proTxHash) + p2p_mn1.wait_for_qgetdata() + qdata_invalid_contribution = qdata_valid + qdata_invalid_contribution.enc_contributions.pop() + p2p_mn1.send_message(qdata_invalid_contribution) + wait_for_banscore(mn1.node, id_p2p_mn1, 50) + mn1.node.disconnect_p2ps() + mn2.node.disconnect_p2ps() + network_thread_join() + self.log.info("Test all available error codes") + p2p_mn1 = p2p_connection(mn1.node) + network_thread_start() + p2p_mn1.wait_for_verack() + id_p2p_mn1 = get_mininode_id(mn1.node) + mnauth(mn1.node, id_p2p_mn1, fake_mnauth_1[0], fake_mnauth_1[1]) + qgetdata_invalid_type = msg_qgetdata(quorum_hash_int, 103, 0x01, protx_hash_int) + qgetdata_invalid_block = msg_qgetdata(protx_hash_int, 100, 0x01, protx_hash_int) + qgetdata_invalid_quorum = msg_qgetdata(int(mn1.node.getblockhash(0), 16), 100, 0x01, protx_hash_int) + qgetdata_invalid_no_member = msg_qgetdata(quorum_hash_int, 100, 0x02, quorum_hash_int) + p2p_mn1.test_qgetdata(qgetdata_invalid_type, QUORUM_TYPE_INVALID) + p2p_mn1.test_qgetdata(qgetdata_invalid_block, QUORUM_BLOCK_NOT_FOUND) + p2p_mn1.test_qgetdata(qgetdata_invalid_quorum, QUORUM_NOT_FOUND) + p2p_mn1.test_qgetdata(qgetdata_invalid_no_member, MASTERNODE_IS_NO_MEMBER) + # The last two error case require the node to miss its DKG data so we just reindex the node. + mn1.node.disconnect_p2ps() + network_thread_join() + self.restart_mn(mn1, reindex=True) + # Re-connect to the masternode + p2p_mn1 = p2p_connection(mn1.node) + p2p_mn2 = p2p_connection(mn2.node) + network_thread_start() + p2p_mn1.wait_for_verack() + p2p_mn2.wait_for_verack() + id_p2p_mn1 = get_mininode_id(mn1.node) + id_p2p_mn2 = get_mininode_id(mn2.node) + assert id_p2p_mn1 is not None + assert id_p2p_mn2 is not None + mnauth(mn1.node, id_p2p_mn1, fake_mnauth_1[0], fake_mnauth_1[1]) + mnauth(mn2.node, id_p2p_mn2, fake_mnauth_2[0], fake_mnauth_2[1]) + # Validate the DKG data is missing + p2p_mn1.test_qgetdata(qgetdata_vvec, QUORUM_VERIFICATION_VECTOR_MISSING) + p2p_mn1.test_qgetdata(qgetdata_contributions, ENCRYPTED_CONTRIBUTIONS_MISSING) + self.log.info("Test DKG data recovery with QDATA") + # Now that mn1 is missing its DKG data try to recover it by querying the data from mn2 and then sending it + # to mn1 with a direct QDATA message. + # + # mininode - QGETDATA -> mn2 - QDATA -> mininode - QDATA -> mn1 + # + # However, mn1 only accepts self requested QDATA messages, that's why we trigger mn1 - QGETDATA -> mininode + # via the RPC command "quorum getdata". + # + # Get the required DKG data for mn1 + p2p_mn2.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + # Trigger mn1 - QGETDATA -> p2p_mn1 + assert mn1.node.quorum("getdata", id_p2p_mn1, 100, quorum_hash, 0x03, mn1.proTxHash) + # Wait until mn1 sent the QGETDATA to p2p_mn1 + p2p_mn1.wait_for_qgetdata() + # Send the QDATA received from mn2 to mn1 + p2p_mn1.send_message(p2p_mn2.get_qdata()) + # Now mn1 should have its data back! + self.wait_for_quorum_data([mn1], 100, quorum_hash, recover=False) + # Restart one more time and make sure data gets saved to db + mn1.node.disconnect_p2ps() + mn2.node.disconnect_p2ps() + network_thread_join() + self.restart_mn(mn1) + self.wait_for_quorum_data([mn1], 100, quorum_hash, recover=False) + + # Test request limiting / banscore increase + def test_request_limit(): + + def test_send_from_two_to_one(send_1, expected_score_1, send_2, expected_score_2, clear_requests=False): + if clear_requests: + force_request_expire() + if send_1: + p2p_mn3_1.test_qgetdata(qgetdata_vvec, 0, self.llmq_threshold, 0) + if send_2: + p2p_mn3_2.test_qgetdata(qgetdata_vvec, 0, self.llmq_threshold, 0) + wait_for_banscore(mn3.node, id_p2p_mn3_1, expected_score_1) + wait_for_banscore(mn3.node, id_p2p_mn3_2, expected_score_2) + + self.log.info("Test request limiting / banscore increases") + + p2p_mn1 = p2p_connection(mn1.node) + network_thread_start() + p2p_mn1.wait_for_verack() + id_p2p_mn1 = get_mininode_id(mn1.node) + mnauth(mn1.node, id_p2p_mn1, fake_mnauth_1[0], fake_mnauth_1[1]) + p2p_mn1.test_qgetdata(qgetdata_vvec, 0, self.llmq_threshold, 0) + wait_for_banscore(mn1.node, id_p2p_mn1, 0) + force_request_expire(299) # This shouldn't clear requests, next request should bump score + p2p_mn1.test_qgetdata(qgetdata_vvec, 0, self.llmq_threshold, 0) + wait_for_banscore(mn1.node, id_p2p_mn1, 25) + force_request_expire(1) # This should clear the requests now, next request should not bump score + p2p_mn1.test_qgetdata(qgetdata_vvec, 0, self.llmq_threshold, 0) + wait_for_banscore(mn1.node, id_p2p_mn1, 25) + mn1.node.disconnect_p2ps() + network_thread_join() + # Requesting one QDATA with mn1 and mn2 from mn3 should not result + # in banscore increase for either of both. + p2p_mn3_1 = p2p_connection(mn3.node, uacomment_m3_1) + p2p_mn3_2 = p2p_connection(mn3.node, uacomment_m3_2) + network_thread_start() + p2p_mn3_1.wait_for_verack() + p2p_mn3_2.wait_for_verack() + id_p2p_mn3_1 = get_mininode_id(mn3.node, uacomment_m3_1) + id_p2p_mn3_2 = get_mininode_id(mn3.node, uacomment_m3_2) + assert id_p2p_mn3_1 != id_p2p_mn3_2 + mnauth(mn3.node, id_p2p_mn3_1, fake_mnauth_1[0], fake_mnauth_1[1]) + mnauth(mn3.node, id_p2p_mn3_2, fake_mnauth_2[0], fake_mnauth_2[1]) + # Now try some {mn1, mn2} - QGETDATA -> mn3 combinations to make + # sure request limit works connection based + test_send_from_two_to_one(False, 0, True, 0, True) + test_send_from_two_to_one(True, 0, True, 25) + test_send_from_two_to_one(True, 25, False, 25) + test_send_from_two_to_one(False, 25, True, 25, True) + test_send_from_two_to_one(True, 25, True, 50) + test_send_from_two_to_one(True, 50, True, 75) + test_send_from_two_to_one(True, 50, True, 75, True) + test_send_from_two_to_one(True, 75, False, 75) + test_send_from_two_to_one(False, 75, True, None) + # mn1 should still have a score of 75 + wait_for_banscore(mn3.node, id_p2p_mn3_1, 75) + # mn2 should be "banned" now + wait_until(lambda: not p2p_mn3_2.is_connected, timeout=10) + mn3.node.disconnect_p2ps() + network_thread_join() + + # Test that QWATCH connections are also allowed to query data but all + # QWATCH connections share one request limit slot + def test_qwatch_connections(): + self.log.info("Test QWATCH connections") + force_request_expire() + p2p_mn3_1 = p2p_connection(mn3.node, uacomment_m3_1) + p2p_mn3_2 = p2p_connection(mn3.node, uacomment_m3_2) + network_thread_start() + p2p_mn3_1.wait_for_verack() + p2p_mn3_2.wait_for_verack() + id_p2p_mn3_1 = get_mininode_id(mn3.node, uacomment_m3_1) + id_p2p_mn3_2 = get_mininode_id(mn3.node, uacomment_m3_2) + assert id_p2p_mn3_1 != id_p2p_mn3_2 + + wait_for_banscore(mn3.node, id_p2p_mn3_1, 0) + wait_for_banscore(mn3.node, id_p2p_mn3_2, 0) + + # Send QWATCH for both connections + p2p_mn3_1.send_message(msg_qwatch()) + p2p_mn3_2.send_message(msg_qwatch()) + + # Now send alternating and make sure they share the same request limit + p2p_mn3_1.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + wait_for_banscore(mn3.node, id_p2p_mn3_1, 0) + p2p_mn3_2.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + wait_for_banscore(mn3.node, id_p2p_mn3_2, 25) + p2p_mn3_1.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + wait_for_banscore(mn3.node, id_p2p_mn3_1, 25) + mn3.node.disconnect_p2ps() + network_thread_join() + + def test_watchquorums(): + self.log.info("Test -watchquorums support") + for extra_args in [[], ["-watchquorums"]]: + self.restart_node(0, self.extra_args[0] + extra_args) + for i in range(self.num_nodes - 1): + connect_nodes(node0, i + 1) + p2p_node0 = p2p_connection(node0) + p2p_mn2 = p2p_connection(mn2.node) + network_thread_start() + p2p_node0.wait_for_verack() + p2p_mn2.wait_for_verack() + id_p2p_node0 = get_mininode_id(node0) + id_p2p_mn2 = get_mininode_id(mn2.node) + mnauth(node0, id_p2p_node0, fake_mnauth_1[0], fake_mnauth_1[1]) + mnauth(mn2.node, id_p2p_mn2, fake_mnauth_2[0], fake_mnauth_2[1]) + p2p_mn2.test_qgetdata(qgetdata_all, 0, self.llmq_threshold, self.llmq_size) + assert node0.quorum("getdata", id_p2p_node0, 100, quorum_hash, 0x03, mn1.proTxHash) + p2p_node0.wait_for_qgetdata() + p2p_node0.send_message(p2p_mn2.get_qdata()) + wait_for_banscore(node0, id_p2p_node0, (1 - len(extra_args)) * 10) + node0.disconnect_p2ps() + mn2.node.disconnect_p2ps() + network_thread_join() + + def test_rpc_quorum_getdata_protx_hash(): + self.log.info("Test optional proTxHash of `quorum getdata`") + assert_raises_rpc_error(-8, "proTxHash missing", + mn1.node.quorum, "getdata", 0, 100, quorum_hash, 0x02) + assert_raises_rpc_error(-8, "proTxHash invalid", + mn1.node.quorum, "getdata", 0, 100, quorum_hash, 0x03, + "0000000000000000000000000000000000000000000000000000000000000000") + + # Enable DKG and disable ChainLocks + self.nodes[0].spork("SPORK_17_QUORUM_DKG_ENABLED", 0) + self.nodes[0].spork("SPORK_19_CHAINLOCKS_ENABLED", 4070908800) + + self.wait_for_sporks_same() + quorum_hash = self.mine_quorum() + + node0 = self.nodes[0] + mn1 = self.mninfo[0] + mn2 = self.mninfo[1] + mn3 = self.mninfo[2] + + # Convert the hex values into integer values + quorum_hash_int = int(quorum_hash, 16) + protx_hash_int = int(mn1.proTxHash, 16) + + # Valid requests + qgetdata_vvec = msg_qgetdata(quorum_hash_int, 100, 0x01, protx_hash_int) + qgetdata_contributions = msg_qgetdata(quorum_hash_int, 100, 0x02, protx_hash_int) + qgetdata_all = msg_qgetdata(quorum_hash_int, 100, 0x03, protx_hash_int) + + test_basics() + test_request_limit() + test_qwatch_connections() + test_watchquorums() + test_rpc_quorum_getdata_protx_hash() + + +if __name__ == '__main__': + QuorumDataMessagesTest().main() diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index 6e6279ab97b17..5d9418d93fae6 100755 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -27,7 +27,7 @@ import dash_hash MIN_VERSION_SUPPORTED = 60001 -MY_VERSION = 70214 # MIN_PEER_PROTO_VERSION +MY_VERSION = 70219 # LLMQ_DATA_MESSAGES_VERSION MY_SUBVERSION = b"/python-mininode-tester:0.0.3%s/" MY_RELAY = 1 # from version 70001 onwards, fRelay should be appended to version messages (BIP37) @@ -1002,6 +1002,41 @@ def serialize(self): r += self.sigShare return r + +class CBLSPublicKey: + def __init__(self): + self.data = b'\\x0' * 48 + + def deserialize(self, f): + self.data = f.read(48) + + def serialize(self): + r = b"" + r += self.data + return r + + +class CBLSIESEncryptedSecretKey: + def __init__(self): + self.ephemeral_pubKey = b'\\x0' * 48 + self.iv = b'\\x0' * 32 + self.data = b'\\x0' * 32 + + def deserialize(self, f): + self.ephemeral_pubKey = f.read(48) + self.iv = f.read(32) + data_size = deser_compact_size(f) + self.data = f.read(data_size) + + def serialize(self): + r = b"" + r += self.ephemeral_pubKey + r += self.iv + r += ser_compact_size(len(self.data)) + r += self.data + return r + + # Objects that correspond to messages on the wire class msg_version(): command = b"version" @@ -1562,3 +1597,93 @@ def serialize(self): def __repr__(self): return "msg_qsigshare(sigShares=%d)" % (len(self.sig_shares)) + + +class msg_qwatch(): + command = b"qwatch" + + def __init__(self): + pass + + def deserialize(self, f): + pass + + def serialize(self): + return b"" + + def __repr__(self): + return "msg_qwatch()" + + +class msg_qgetdata(): + command = b"qgetdata" + + def __init__(self, quorum_hash=0, quorum_type=-1, data_mask=0, protx_hash=0): + self.quorum_hash = quorum_hash + self.quorum_type = quorum_type + self.data_mask = data_mask + self.protx_hash = protx_hash + + def deserialize(self, f): + self.quorum_type = struct.unpack("