Skip to content

Commit

Permalink
llmq: Implement Intra-Quorum Relay connections for qrecsig-s (#4020)
Browse files Browse the repository at this point in the history
* llmq: Implement Intra-Quorum Relay connections for qrecsig-s

Avoid relaying recsigs to non-masternodes.

* test: Simplify loop

* test: Assert connections are equal to llmq_size instead of static number

* test: Drop redundant brackets

* test: Fix typo

Co-authored-by: xdustinface <xdustinfacex@gmail.com>
  • Loading branch information
UdjinM6 and xdustinface authored Mar 15, 2021
1 parent bc31371 commit 95e8492
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 34 deletions.
10 changes: 10 additions & 0 deletions src/evo/mnauth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ void CMNAuth::ProcessMessage(CNode* pnode, const std::string& strCommand, CDataS
pnode->verifiedPubKeyHash = dmn->pdmnState->pubKeyOperator.GetHash();
}

if (!pnode->m_masternode_iqr_connection && connman.IsMasternodeQuorumRelayMember(pnode->verifiedProRegTxHash)) {
// Tell our peer that we're interested in plain LLMQ recovered signatures.
// Otherwise the peer would only announce/send messages resulting from QRECSIG,
// e.g. InstantSend locks or ChainLocks. SPV and regular full nodes should not send
// this message as they are usually only interested in the higher level messages.
const CNetMsgMaker msgMaker(pnode->GetSendVersion());
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QSENDRECSIGS, true));
pnode->m_masternode_iqr_connection = true;
}

LogPrint(BCLog::NET_NETCONN, "CMNAuth::%s -- Valid MNAUTH for %s, peer=%d\n", __func__, mnauth.proRegTxHash.ToString(), pnode->GetId());
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,12 +724,14 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash());
}

CInv inv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash());
g_connman->ForEachNode([&](CNode* pnode) {
if (pnode->nVersion >= LLMQS_PROTO_VERSION && pnode->fSendRecSigs) {
pnode->PushInventory(inv);
}
});
if (fMasternodeMode) {
CInv inv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash());
g_connman->ForEachNode([&](CNode* pnode) {
if (pnode->nVersion >= LLMQS_PROTO_VERSION && pnode->fSendRecSigs) {
pnode->PushInventory(inv);
}
});
}

for (auto& l : listeners) {
l->HandleNewRecoveredSig(*recoveredSig);
Expand Down
6 changes: 6 additions & 0 deletions src/llmq/quorums_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,16 @@ void CLLMQUtils::EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBl
}

std::set<uint256> connections;
std::set<uint256> relayMembers;
if (isMember) {
connections = CLLMQUtils::GetQuorumConnections(llmqType, pindexQuorum, myProTxHash, true);
relayMembers = CLLMQUtils::GetQuorumRelayMembers(llmqType, pindexQuorum, myProTxHash, true);
} else {
auto cindexes = CLLMQUtils::CalcDeterministicWatchConnections(llmqType, pindexQuorum, members.size(), 1);
for (auto idx : cindexes) {
connections.emplace(members[idx]->proTxHash);
}
relayMembers = connections;
}
if (!connections.empty()) {
if (!g_connman->HasMasternodeQuorumNodes(llmqType, pindexQuorum->GetBlockHash()) && LogAcceptCategory(BCLog::LLMQ)) {
Expand All @@ -215,6 +218,9 @@ void CLLMQUtils::EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBl
}
g_connman->SetMasternodeQuorumNodes(llmqType, pindexQuorum->GetBlockHash(), connections);
}
if (!relayMembers.empty()) {
g_connman->SetMasternodeQuorumRelayMembers(llmqType, pindexQuorum->GetBlockHash(), relayMembers);
}
}

void CLLMQUtils::AddQuorumProbeConnections(Consensus::LLMQType llmqType, const CBlockIndex *pindexQuorum, const uint256 &myProTxHash)
Expand Down
47 changes: 43 additions & 4 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1341,11 +1341,11 @@ void CConnman::DisconnectNodes()
}

if (fLogIPs) {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d m_masternode_connection=%d\n",
pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, pnode->m_masternode_connection);
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d m_masternode_connection=%d m_masternode_iqr_connection=%d\n",
pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, pnode->m_masternode_connection, pnode->m_masternode_iqr_connection);
} else {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d m_masternode_connection=%d\n",
pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, pnode->m_masternode_connection);
LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d m_masternode_connection=%d m_masternode_iqr_connection=%d\n",
pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, pnode->m_masternode_connection, pnode->m_masternode_iqr_connection);
}

// remove from vNodes
Expand Down Expand Up @@ -3501,6 +3501,30 @@ void CConnman::SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint
}
}

void CConnman::SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes)
{
{
LOCK(cs_vPendingMasternodes);
auto it = masternodeQuorumRelayMembers.emplace(std::make_pair(llmqType, quorumHash), proTxHashes);
if (!it.second) {
it.first->second = proTxHashes;
}
}

// Update existing connections
ForEachNode([&](CNode* pnode) {
if (!pnode->verifiedProRegTxHash.IsNull() && !pnode->m_masternode_iqr_connection && IsMasternodeQuorumRelayMember(pnode->verifiedProRegTxHash)) {
// Tell our peer that we're interested in plain LLMQ recovered signatures.
// Otherwise the peer would only announce/send messages resulting from QRECSIG,
// e.g. InstantSend locks or ChainLocks. SPV and regular full nodes should not send
// this message as they are usually only interested in the higher level messages.
const CNetMsgMaker msgMaker(pnode->GetSendVersion());
PushMessage(pnode, msgMaker.Make(NetMsgType::QSENDRECSIGS, true));
pnode->m_masternode_iqr_connection = true;
}
});
}

bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash)
{
LOCK(cs_vPendingMasternodes);
Expand Down Expand Up @@ -3546,6 +3570,7 @@ void CConnman::RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const u
{
LOCK(cs_vPendingMasternodes);
masternodeQuorumNodes.erase(std::make_pair(llmqType, quorumHash));
masternodeQuorumRelayMembers.erase(std::make_pair(llmqType, quorumHash));
}

bool CConnman::IsMasternodeQuorumNode(const CNode* pnode)
Expand Down Expand Up @@ -3578,6 +3603,20 @@ bool CConnman::IsMasternodeQuorumNode(const CNode* pnode)
return false;
}

bool CConnman::IsMasternodeQuorumRelayMember(const uint256& protxHash)
{
if (protxHash.IsNull()) {
return false;
}
LOCK(cs_vPendingMasternodes);
for (const auto& p : masternodeQuorumRelayMembers) {
if (p.second.count(protxHash)) {
return true;
}
}
return false;
}

void CConnman::AddPendingProbeConnections(const std::set<uint256> &proTxHashes)
{
LOCK(cs_vPendingMasternodes);
Expand Down
5 changes: 5 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,14 @@ friend class CNode;

bool AddPendingMasternode(const uint256& proTxHash);
void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash);
std::set<uint256> GetMasternodeQuorums(Consensus::LLMQType llmqType);
// also returns QWATCH nodes
std::set<NodeId> GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash);
bool IsMasternodeQuorumNode(const CNode* pnode);
bool IsMasternodeQuorumRelayMember(const uint256& protxHash);
void AddPendingProbeConnections(const std::set<uint256>& proTxHashes);

size_t GetNodeCount(NumConnections num);
Expand Down Expand Up @@ -584,6 +586,7 @@ friend class CNode;
CCriticalSection cs_vAddedNodes;
std::vector<uint256> vPendingMasternodes;
std::map<std::pair<Consensus::LLMQType, uint256>, std::set<uint256>> masternodeQuorumNodes; // protected by cs_vPendingMasternodes
std::map<std::pair<Consensus::LLMQType, uint256>, std::set<uint256>> masternodeQuorumRelayMembers; // protected by cs_vPendingMasternodes
std::set<uint256> masternodePendingProbes;
mutable CCriticalSection cs_vPendingMasternodes;
std::vector<CNode*> vNodes;
Expand Down Expand Up @@ -893,6 +896,8 @@ class CNode
bool m_masternode_connection;
// If 'true' this node will be disconnected after MNAUTH
bool m_masternode_probe_connection;
// If 'true', we identified it as an intra-quorum relay connection
bool m_masternode_iqr_connection{false};
CSemaphoreGrant grantOutbound;
CCriticalSection cs_filter;
std::unique_ptr<CBloomFilter> pfilter PT_GUARDED_BY(cs_filter);
Expand Down
8 changes: 0 additions & 8 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2379,14 +2379,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
pfrom->fSendDSQueue = true;
}

if (pfrom->nVersion >= LLMQS_PROTO_VERSION && !pfrom->m_masternode_connection) {
// Tell our peer that we're interested in plain LLMQ recovered signatures.
// Otherwise the peer would only announce/send messages resulting from QRECSIG,
// e.g. InstantSend locks or ChainLocks. SPV nodes should not send this message
// as they are usually only interested in the higher level messages
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::QSENDRECSIGS, true));
}

if (gArgs.GetBoolArg("-watchquorums", llmq::DEFAULT_WATCH_QUORUMS) && !pfrom->m_masternode_connection) {
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::QWATCH));
}
Expand Down
7 changes: 5 additions & 2 deletions test/functional/feature_llmq_is_cl_conflicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,13 @@ def create_chainlock(self, height, block):
request_id = hash256(request_id_buf)[::-1].hex()
message_hash = block.hash

quorum_member = None
for mn in self.mninfo:
mn.node.quorum('sign', 100, request_id, message_hash)
res = mn.node.quorum('sign', 100, request_id, message_hash)
if res and quorum_member is None:
quorum_member = mn

recSig = self.get_recovered_sig(request_id, message_hash)
recSig = self.get_recovered_sig(request_id, message_hash, node=quorum_member.node)
clsig = msg_clsig(height, block.sha256, hex_str_to_bytes(recSig['sig']))
return clsig

Expand Down
24 changes: 18 additions & 6 deletions test/functional/feature_llmq_signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,20 @@ def run_test(self):

self.mine_quorum()

if self.options.spork21:
assert self.mninfo[0].node.getconnectioncount() == self.llmq_size

id = "0000000000000000000000000000000000000000000000000000000000000001"
msgHash = "0000000000000000000000000000000000000000000000000000000000000002"
msgHashConflict = "0000000000000000000000000000000000000000000000000000000000000003"

def check_sigs(hasrecsigs, isconflicting1, isconflicting2):
for node in self.nodes:
if node.quorum("hasrecsig", 100, id, msgHash) != hasrecsigs:
for mn in self.mninfo:
if mn.node.quorum("hasrecsig", 100, id, msgHash) != hasrecsigs:
return False
if node.quorum("isconflicting", 100, id, msgHash) != isconflicting1:
if mn.node.quorum("isconflicting", 100, id, msgHash) != isconflicting1:
return False
if node.quorum("isconflicting", 100, id, msgHashConflict) != isconflicting2:
if mn.node.quorum("isconflicting", 100, id, msgHashConflict) != isconflicting2:
return False
return True

Expand Down Expand Up @@ -85,6 +88,8 @@ def assert_sigs_nochange(hasrecsigs, isconflicting1, isconflicting2, timeout):
sig_share.id = int(sig_share_rpc_1["id"], 16)
sig_share.msgHash = int(sig_share_rpc_1["msgHash"], 16)
sig_share.sigShare = hex_str_to_bytes(sig_share_rpc_1["signature"])
for mn in self.mninfo:
assert mn.node.getconnectioncount() == self.llmq_size
# Get the current recovery member of the quorum
q = self.nodes[0].quorum('selectquorum', 100, id)
mn = self.get_mninfo(q['recoveryMembers'][0])
Expand All @@ -100,8 +105,12 @@ def assert_sigs_nochange(hasrecsigs, isconflicting1, isconflicting2, timeout):

wait_for_sigs(True, False, True, 15)

if self.options.spork21:
mn.node.disconnect_p2ps()
network_thread_join()

# Test `quorum verify` rpc
node = self.nodes[0]
node = self.mninfo[0].node
recsig = node.quorum("getrecsig", 100, id, msgHash)
# Find quorum automatically
height = node.getblockcount()
Expand Down Expand Up @@ -156,7 +165,10 @@ def assert_sigs_nochange(hasrecsigs, isconflicting1, isconflicting2, timeout):
# Need to re-connect so that it later gets the recovered sig
mn.node.setnetworkactive(True)
connect_nodes(mn.node, 0)
# Make sure node0 has received qsendrecsigs from the previously isolated node
force_finish_mnsync(mn.node)
# Make sure intra-quorum connections were also restored
self.bump_mocktime(1) # need this to bypass quorum connection retry timeout
wait_until(lambda: mn.node.getconnectioncount() == self.llmq_size, timeout=10, sleep=2)
mn.node.ping()
wait_until(lambda: all('pingwait' not in peer for peer in mn.node.getpeerinfo()))
# Let 2 seconds pass so that the next node is used for recovery, which should succeed
Expand Down
1 change: 1 addition & 0 deletions test/functional/interface_zmq_dash.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def set_test_params(self):
# node0 creates all available ZMQ publisher
node0_extra_args = ["-zmqpub%s=%s" % (pub.value, self.address) for pub in ZMQPublisher]
node0_extra_args.append("-whitelist=127.0.0.1")
node0_extra_args.append("-watchquorums") # have to watch quorums to receive recsigs and trigger zmq

self.set_dash_test_params(4, 3, fast_dip3_enforcement=True, extra_args=[node0_extra_args, [], [], []])

Expand Down
18 changes: 10 additions & 8 deletions test/functional/test_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,10 +813,13 @@ def create_islock(self, hextx):
request_id = hash256(request_id_buf)[::-1].hex()
message_hash = tx.hash

quorum_member = None
for mn in self.mninfo:
mn.node.quorum('sign', 100, request_id, message_hash)
res = mn.node.quorum('sign', 100, request_id, message_hash)
if (res and quorum_member is None):
quorum_member = mn

rec_sig = self.get_recovered_sig(request_id, message_hash)
rec_sig = self.get_recovered_sig(request_id, message_hash, node=quorum_member.node)
islock = msg_islock(inputs, tx.sha256, hex_str_to_bytes(rec_sig['sig']))
return islock

Expand Down Expand Up @@ -1065,17 +1068,16 @@ def mine_quorum(self, expected_connections=None, expected_members=None, expected
return new_quorum

def get_recovered_sig(self, rec_sig_id, rec_sig_msg_hash, llmq_type=100, node=None):
node = self.nodes[0] if node is None else node
rec_sig = None
# Note: recsigs aren't relayed to regular nodes by default,
# make sure to pick a mn as a node to query for recsigs.
node = self.mninfo[0].node if node is None else node
time_start = time.time()
while time.time() - time_start < 10:
try:
rec_sig = node.quorum('getrecsig', llmq_type, rec_sig_id, rec_sig_msg_hash)
break
return node.quorum('getrecsig', llmq_type, rec_sig_id, rec_sig_msg_hash)
except JSONRPCException:
time.sleep(0.1)
assert(rec_sig is not None)
return rec_sig
assert False

def get_quorum_masternodes(self, q):
qi = self.nodes[0].quorum('info', 100, q)
Expand Down

0 comments on commit 95e8492

Please sign in to comment.