Skip to content

Commit

Permalink
Merge pull request zcash#6928 from daira/orphan-resolution
Browse files Browse the repository at this point in the history
Backport fix for orphan handling
  • Loading branch information
nuttycom authored Aug 22, 2024
2 parents f4cad28 + c890bb5 commit 1f4e4d6
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 84 deletions.
2 changes: 1 addition & 1 deletion qa/pull-tester/rpc-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

FLAKY_SCRIPTS = [
# These tests have intermittent failures that we haven't diagnosed yet.
'mempool_nu_activation.py',
'mempool_packages.py',
]

Expand Down Expand Up @@ -103,7 +104,6 @@
'rest.py',
'mempool_spendcoinbase.py',
'mempool_reorg.py',
'mempool_nu_activation.py',
'httpbasics.py',
'multi_rpc.py',
'zapwallettxes.py',
Expand Down
250 changes: 168 additions & 82 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,22 @@ CAmount nTxUnpaidActionLimit = DEFAULT_TX_UNPAID_ACTION_LIMIT;

CTxMemPool mempool(::minRelayTxFee);

struct IteratorComparator
{
template<typename I>
bool operator()(const I& a, const I& b) const
{
return &(*a) < &(*b);
}
};

struct COrphanTx {
CTransaction tx;
NodeId fromPeer;
int64_t nTimeExpire;
};
map<uint256, COrphanTx> mapOrphanTransactions GUARDED_BY(cs_main);;
map<uint256, set<uint256> > mapOrphanTransactionsByPrev GUARDED_BY(cs_main);;
map<uint256, COrphanTx> mapOrphanTransactions GUARDED_BY(cs_main);
map<COutPoint, set<map<uint256, COrphanTx>::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(cs_main);
void EraseOrphansFor(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main);

/**
Expand Down Expand Up @@ -665,40 +675,42 @@ bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(c
// large transaction with a missing parent then we assume
// it will rebroadcast it later, after the parent transaction(s)
// have been mined or received.
// 10,000 orphans, each of which is at most 5,000 bytes big is
// at most 500 megabytes of orphans:
// 100 orphans, each of which is at most 99,999 bytes big is
// at most 10 megabytes of orphans and somewhat more byprev index (in the worst case):
unsigned int sz = GetSerializeSize(tx, SER_NETWORK, tx.nVersion);
if (sz > 5000)
if (sz >= 100000)
{
LogPrint("mempool", "ignoring large orphan tx (size: %u, hash: %s)\n", sz, hash.ToString());
return false;
}

mapOrphanTransactions[hash].tx = tx;
mapOrphanTransactions[hash].fromPeer = peer;
for (const CTxIn& txin : tx.vin)
mapOrphanTransactionsByPrev[txin.prevout.hash].insert(hash);
auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME});
assert(ret.second);
for (const CTxIn& txin : tx.vin) {
mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first);
}

LogPrint("mempool", "stored orphan tx %s (mapsz %u prevsz %u)\n", hash.ToString(),
LogPrint("mempool", "stored orphan tx %s (mapsz %u outsz %u)\n", hash.ToString(),
mapOrphanTransactions.size(), mapOrphanTransactionsByPrev.size());
return true;
}

void static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.find(hash);
if (it == mapOrphanTransactions.end())
return;
return 0;
for (const CTxIn& txin : it->second.tx.vin)
{
map<uint256, set<uint256> >::iterator itPrev = mapOrphanTransactionsByPrev.find(txin.prevout.hash);
auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout);
if (itPrev == mapOrphanTransactionsByPrev.end())
continue;
itPrev->second.erase(hash);
itPrev->second.erase(it);
if (itPrev->second.empty())
mapOrphanTransactionsByPrev.erase(itPrev);
}
mapOrphanTransactions.erase(it);
return 1;
}

void EraseOrphansFor(NodeId peer)
Expand All @@ -710,8 +722,7 @@ void EraseOrphansFor(NodeId peer)
map<uint256, COrphanTx>::iterator maybeErase = iter++; // increment to avoid iterator becoming invalid
if (maybeErase->second.fromPeer == peer)
{
EraseOrphanTx(maybeErase->second.tx.GetHash());
++nErased;
nErased += EraseOrphanTx(maybeErase->second.tx.GetHash());
}
}
if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer %d\n", nErased, peer);
Expand All @@ -721,6 +732,28 @@ void EraseOrphansFor(NodeId peer)
unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
unsigned int nEvicted = 0;
static int64_t nNextSweep;
int64_t nNow = GetTime();
if (nNextSweep <= nNow) {
// Sweep out expired orphan pool entries:
int nErased = 0;
assert(nNow <= INT64_MAX - ORPHAN_TX_EXPIRE_TIME);
int64_t nMinExpTime = nNow + ORPHAN_TX_EXPIRE_TIME - ORPHAN_TX_EXPIRE_INTERVAL;
map<uint256, COrphanTx>::iterator iter = mapOrphanTransactions.begin();
while (iter != mapOrphanTransactions.end())
{
map<uint256, COrphanTx>::iterator maybeErase = iter++;
if (maybeErase->second.nTimeExpire <= nNow) {
nErased += EraseOrphanTx(maybeErase->second.tx.GetHash());
} else {
nMinExpTime = std::min(maybeErase->second.nTimeExpire, nMinExpTime);
}
}
// Sweep again 5 minutes after the next entry that expires in order to batch the linear scan.
assert(nMinExpTime <= INT64_MAX - ORPHAN_TX_EXPIRE_INTERVAL);
nNextSweep = nMinExpTime + ORPHAN_TX_EXPIRE_INTERVAL;
if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx due to expiration\n", nErased);
}
while (mapOrphanTransactions.size() > nMaxOrphans)
{
// Evict a random orphan:
Expand Down Expand Up @@ -3123,6 +3156,7 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
CCheckQueueControl<CScriptCheck> control(fExpensiveChecks && nScriptCheckThreads ? &scriptcheckqueue : NULL);

int64_t nTimeStart = GetTimeMicros();
std::vector<uint256> vOrphanErase;
CAmount nFees = 0;
int nInputs = 0;
unsigned int nSigOps = 0;
Expand Down Expand Up @@ -3227,6 +3261,17 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
allPrevOutputs.push_back(prevout);
}

// Which orphan pool entries must we evict?
for (size_t j = 0; j < tx.vin.size(); j++) {
auto itByPrev = mapOrphanTransactionsByPrev.find(tx.vin[j].prevout);
if (itByPrev == mapOrphanTransactionsByPrev.end()) continue;
for (auto mi = itByPrev->second.begin(); mi != itByPrev->second.end(); ++mi) {
const CTransaction& orphanTx = (*mi)->second.tx;
const uint256& orphanHash = orphanTx.GetHash();
vOrphanErase.push_back(orphanHash);
}
}

// insightexplorer
// https://github.com/bitpay/bitcoin/commit/017f548ea6d89423ef568117447e61dd5707ec42#diff-7ec3c68a81efff79b6ca22ac1f1eabbaR2597
if (fAddressIndex || fSpentIndex) {
Expand Down Expand Up @@ -3688,6 +3733,15 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
int64_t nTime3 = GetTimeMicros(); nTimeIndex += nTime3 - nTime2;
LogPrint("bench", " - Index writing: %.2fms [%.2fs]\n", 0.001 * (nTime3 - nTime2), nTimeIndex * 0.000001);

// Erase orphan transactions include or precluded by this block
if (vOrphanErase.size()) {
int nErased = 0;
for (uint256 &orphanHash : vOrphanErase) {
nErased += EraseOrphanTx(orphanHash);
}
LogPrint("mempool", "Erased %d orphan tx included or conflicted by block\n", nErased);
}

return true;
}

Expand Down Expand Up @@ -6824,6 +6878,65 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}

void static ProcessOrphanTx(const CChainParams& chainparams, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
AssertLockHeld(cs_main);
set<NodeId> setMisbehaving;
bool done = false;
while (!done && !orphan_work_set.empty()) {
const uint256 orphanHash = *orphan_work_set.begin();
orphan_work_set.erase(orphan_work_set.begin());

auto orphan_it = mapOrphanTransactions.find(orphanHash);
if (orphan_it == mapOrphanTransactions.end()) continue;

const CTransaction& orphanTx = orphan_it->second.tx;
NodeId fromPeer = orphan_it->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;

if (setMisbehaving.count(fromPeer)) continue;
if (AcceptToMemoryPool(chainparams, mempool, stateDummy, orphanTx, true, &fMissingInputs2))
{
LogPrint("mempool", " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
orphan_work_set.insert(elem->first);
}
}
}
EraseOrphanTx(orphanHash);
done = true;
} else if (!fMissingInputs2) {
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0) {
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint("mempool", " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint("mempool", " removed orphan tx %s\n", orphanHash.ToString());
// Add the wtxid of this transaction to our reject filter.
// Unlike upstream Bitcoin Core, we can unconditionally add
// these, as they are always bound to the entirety of the
// transaction regardless of version.
assert(recentRejects);
recentRejects->insert(orphanTx.GetWTxId().ToBytes());
EraseOrphanTx(orphanHash);
done = true;
}
mempool.check(pcoinsTip);
}
}

bool static ProcessMessage(const CChainParams& chainparams, CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived)
{
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
Expand Down Expand Up @@ -7346,8 +7459,6 @@ bool static ProcessMessage(const CChainParams& chainparams, CNode* pfrom, string
return true;
}

vector<uint256> vWorkQueue;
vector<uint256> vEraseQueue;
CTransaction tx;
vRecv >> tx;

Expand All @@ -7372,84 +7483,53 @@ bool static ProcessMessage(const CChainParams& chainparams, CNode* pfrom, string
{
mempool.check(pcoinsTip);
RelayTransaction(tx);
vWorkQueue.push_back(txid);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom->orphan_work_set.insert(elem->first);
}
}
}

LogPrint("mempool", "AcceptToMemoryPool: peer=%d %s: accepted %s (poolsz %u txn, %u kB)\n",
pfrom->id, pfrom->cleanSubVer,
tx.GetHash().ToString(),
mempool.size(), mempool.DynamicMemoryUsage() / 1000);

// Recursively process any orphan transactions that depended on this one
set<NodeId> setMisbehaving;
for (unsigned int i = 0; i < vWorkQueue.size(); i++)
{
map<uint256, set<uint256> >::iterator itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue[i]);
if (itByPrev == mapOrphanTransactionsByPrev.end())
continue;
for (set<uint256>::iterator mi = itByPrev->second.begin();
mi != itByPrev->second.end();
++mi)
{
const uint256& orphanHash = *mi;
const CTransaction& orphanTx = mapOrphanTransactions[orphanHash].tx;
NodeId fromPeer = mapOrphanTransactions[orphanHash].fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;


if (setMisbehaving.count(fromPeer))
continue;
if (AcceptToMemoryPool(chainparams, mempool, stateDummy, orphanTx, true, &fMissingInputs2))
{
LogPrint("mempool", " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx);
vWorkQueue.push_back(orphanHash);
vEraseQueue.push_back(orphanHash);
}
else if (!fMissingInputs2)
{
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0)
{
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint("mempool", " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint("mempool", " removed orphan tx %s\n", orphanHash.ToString());
vEraseQueue.push_back(orphanHash);
// Add the wtxid of this transaction to our reject filter.
// Unlike upstream Bitcoin Core, we can unconditionally add
// these, as they are always bound to the entirety of the
// transaction regardless of version.
assert(recentRejects);
recentRejects->insert(orphanTx.GetWTxId().ToBytes());
}
mempool.check(pcoinsTip);
}
}

for (uint256 hash : vEraseQueue)
EraseOrphanTx(hash);
ProcessOrphanTx(chainparams, pfrom->orphan_work_set);
}
// TODO: currently, prohibit joinsplits and shielded spends/outputs/actions from entering mapOrphans
else if (fMissingInputs &&
tx.vJoinSplit.empty() &&
!tx.GetSaplingBundle().IsPresent() &&
!tx.GetOrchardBundle().IsPresent())
{
AddOrphanTx(tx, pfrom->GetId());

// DoS prevention: do not allow mapOrphanTransactions to grow unbounded
unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS));
unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx);
if (nEvicted > 0)
LogPrint("mempool", "mapOrphan overflow, removed %u tx\n", nEvicted);
bool fRejectedParents = false; // It may be the case that the orphan's parents have all been rejected
for (const CTxIn& txin : tx.vin) {
if (recentRejects->contains(txin.prevout.hash)) {
fRejectedParents = true;
break;
}
}
if (!fRejectedParents) {
for (const CTxIn& txin : tx.vin) {
CInv inv(MSG_TX, txin.prevout.hash);
pfrom->AddKnownTxId(inv.hash);
if (!AlreadyHave(inv)) pfrom->AskFor(inv);
}
AddOrphanTx(tx, pfrom->GetId());

// DoS prevention: do not allow mapOrphanTransactions and
// mapOrphanTransactionsByPrev to grow unbounded.
unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS));
unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx);
if (nEvicted > 0)
LogPrint("mempool", "mapOrphan overflow, removed %u tx\n", nEvicted);
} else {
LogPrint("mempool", "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString());
}
} else {
// Add the wtxid of this transaction to our reject filter.
// Unlike upstream Bitcoin Core, we can unconditionally add
Expand Down Expand Up @@ -7877,8 +7957,14 @@ bool ProcessMessages(const CChainParams& chainparams, CNode* pfrom)
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus());

if (!pfrom->orphan_work_set.empty()) {
LOCK(cs_main);
ProcessOrphanTx(chainparams, pfrom->orphan_work_set);
}

// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk;
if (!pfrom->orphan_work_set.empty()) return true;

std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
Expand Down
4 changes: 4 additions & 0 deletions src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ static const CAmount HIGH_MAX_TX_FEE = 100 * HIGH_TX_FEE_PER_KB;
static const unsigned int LOW_LOGICAL_ACTIONS = 10;
/** Default for -maxorphantx, maximum number of orphan transactions kept in memory */
static const unsigned int DEFAULT_MAX_ORPHAN_TRANSACTIONS = 100;
/** Expiration time for orphan transactions in seconds */
static const int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60;
/** Minimum time between orphan transactions expire time checks in seconds */
static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60;
/** Default for -limitancestorcount, max number of in-mempool ancestors */
static const unsigned int DEFAULT_ANCESTOR_LIMIT = 100;
/** Default for -limitancestorsize, maximum kilobytes of tx + all in-mempool ancestors */
Expand Down
Loading

0 comments on commit 1f4e4d6

Please sign in to comment.