Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Makefile.bench.include
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ bench_bench_dash_SOURCES = \
bench/ccoins_caching.cpp \
bench/gcs_filter.cpp \
bench/hashpadding.cpp \
bench/index_blockfilter.cpp \
bench/load_external.cpp \
bench/merkle_root.cpp \
bench/mempool_eviction.cpp \
Expand Down
43 changes: 43 additions & 0 deletions src/bench/index_blockfilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.

#include <bench/bench.h>

#include <addresstype.h>
#include <index/blockfilterindex.h>
#include <node/chainstate.h>
#include <node/context.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>

// Very simple block filter index sync benchmark, only using coinbase outputs.
static void BlockFilterIndexSync(benchmark::Bench& bench)
{
const auto test_setup = MakeNoLogFileContext<TestChain100Setup>();

// Create more blocks
int CHAIN_SIZE = 600;
CPubKey pubkey{ParseHex("02ed26169896db86ced4cbb7b3ecef9859b5952825adbeab998fb5b307e54949c9")};
CScript script = GetScriptForDestination(WitnessV0KeyHash(pubkey));
std::vector<CMutableTransaction> noTxns;
for (int i = 0; i < CHAIN_SIZE - 100; i++) {
test_setup->CreateAndProcessBlock(noTxns, script);
SetMockTime(GetTime() + 1);
}
assert(WITH_LOCK(::cs_main, return test_setup->m_node.chainman->ActiveHeight() == CHAIN_SIZE));

bench.minEpochIterations(5).run([&] {
BlockFilterIndex filter_index(interfaces::MakeChain(test_setup->m_node), BlockFilterType::BASIC,
/*n_cache_size=*/0, /*f_memory=*/false, /*f_wipe=*/true);
assert(filter_index.Init());
assert(!filter_index.BlockUntilSyncedToCurrentChain());
filter_index.Sync();

IndexSummary summary = filter_index.GetSummary();
assert(summary.synced);
assert(summary.best_block_hash == WITH_LOCK(::cs_main, return test_setup->m_node.chainman->ActiveTip()->GetBlockHash()));
});
}

BENCHMARK(BlockFilterIndexSync, benchmark::PriorityLevel::HIGH);
33 changes: 15 additions & 18 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain&
return chain.Next(chain.FindFork(pindex_prev));
}

void BaseIndex::ThreadSync()
void BaseIndex::Sync()
{
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
Expand All @@ -146,23 +146,20 @@ void BaseIndex::ThreadSync()
return;
}

{
LOCK(cs_main);
const CBlockIndex* pindex_next = NextSyncBlock(pindex, m_chainstate->m_chain);
if (!pindex_next) {
SetBestBlockIndex(pindex);
// No need to handle errors in Commit. See rationale above.
Commit();
m_synced = true;
break;
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalError("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName());
return;
}
pindex = pindex_next;
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
if (!pindex_next) {
SetBestBlockIndex(pindex);
m_synced = true;
// No need to handle errors in Commit. See rationale above.
Commit();
break;
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("%s: Failed to rewind index %s to a previous chain tip", __func__, GetName());
return;
}
pindex = pindex_next;


CBlock block;
if (!ReadBlockFromDisk(block, pindex, consensus_params)) {
Expand Down Expand Up @@ -364,7 +361,7 @@ bool BaseIndex::Start(CChainState& active_chainstate)
return false;
}

m_thread_sync = std::thread(&util::TraceThread, GetName(), [this] { ThreadSync(); });
m_thread_sync = std::thread(&util::TraceThread, GetName(), [this] { Sync(); });
return true;
}

Expand Down
14 changes: 7 additions & 7 deletions src/index/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ class BaseIndex : public CValidationInterface
std::thread m_thread_sync;
CThreadInterrupt m_interrupt;

/// Sync the index with the block index starting from the current best block.
/// Intended to be run in its own thread, m_thread_sync, and can be
/// interrupted with m_interrupt. Once the index gets in sync, the m_synced
/// flag is set and the BlockConnected ValidationInterface callback takes
/// over and the sync thread exits.
void ThreadSync();

/// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
///
/// Recommendations for error handling:
Expand Down Expand Up @@ -132,6 +125,13 @@ class BaseIndex : public CValidationInterface
/// ValidationInterface so that it stays in sync with blockchain updates.
[[nodiscard]] bool Start(CChainState& active_chainstate);

/// Sync the index with the block index starting from the current best block.
/// Intended to be run in its own thread, m_thread_sync, and can be
/// interrupted with m_interrupt. Once the index gets in sync, the m_synced
/// flag is set and the BlockConnected ValidationInterface callback takes
/// over and the sync thread exits.
void Sync();

/// Stops the instance from staying in sync with blockchain updates.
void Stop();

Expand Down
65 changes: 48 additions & 17 deletions src/index/blockfilterindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ bool BlockFilterIndex::Init()
m_next_filter_pos.nFile = 0;
m_next_filter_pos.nPos = 0;
}
// Initialize last header cache if we have indexed blocks
CBlockLocator locator;
if (m_db->Read(DB_BEST_BLOCK, locator)) {
LOCK(cs_main);
const CBlockIndex* block = m_chainstate->FindForkInGlobalIndex(locator);
if (block) {
auto op_last_header = ReadFilterHeader(block->nHeight, block->GetBlockHash());
if (!op_last_header) {
return error("%s: Cannot read last block filter header; index may be corrupted",
__func__);
}
m_last_header = *op_last_header;
}
}

return BaseIndex::Init();
}

Expand Down Expand Up @@ -217,42 +232,51 @@ size_t BlockFilterIndex::WriteFilterToDisk(FlatFilePos& pos, const BlockFilter&
return data_size;
}

std::optional<uint256> BlockFilterIndex::ReadFilterHeader(int height, const uint256& expected_block_hash)
{
std::pair<uint256, DBVal> read_out;
if (!m_db->Read(DBHeightKey(height), read_out)) {
return std::nullopt;
}

if (read_out.first != expected_block_hash) {
return error("%s: previous block header belongs to unexpected block %s; expected %s",
__func__, read_out.first.ToString(), expected_block_hash.ToString());
}

return read_out.second.header;
}

bool BlockFilterIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex)
{
CBlockUndo block_undo;
uint256 prev_header;

if (pindex->nHeight > 0) {
if (!UndoReadFromDisk(block_undo, pindex)) {
return false;
}

std::pair<uint256, DBVal> read_out;
if (!m_db->Read(DBHeightKey(pindex->nHeight - 1), read_out)) {
return false;
}

uint256 expected_block_hash = pindex->pprev->GetBlockHash();
if (read_out.first != expected_block_hash) {
return error("%s: previous block header belongs to unexpected block %s; expected %s",
__func__, read_out.first.ToString(), expected_block_hash.ToString());
}

prev_header = read_out.second.header;
}

BlockFilter filter(m_filter_type, block, block_undo);

const uint256& header = filter.ComputeHeader(m_last_header);
bool res = Write(filter, pindex->nHeight, header);
if (res) m_last_header = header; // update last header
return res;
}

bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header)
{
size_t bytes_written = WriteFilterToDisk(m_next_filter_pos, filter);
if (bytes_written == 0) return false;

std::pair<uint256, DBVal> value;
value.first = pindex->GetBlockHash();
value.first = filter.GetBlockHash();
value.second.hash = filter.GetHash();
value.second.header = filter.ComputeHeader(prev_header);
value.second.header = filter_header;
value.second.pos = m_next_filter_pos;

if (!m_db->Write(DBHeightKey(pindex->nHeight), value)) {
if (!m_db->Write(DBHeightKey(block_height), value)) {
return false;
}

Expand Down Expand Up @@ -306,6 +330,13 @@ bool BlockFilterIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex*
batch.Write(DB_FILTER_POS, m_next_filter_pos);
if (!m_db->WriteBatch(batch)) return false;

// Update cached header
auto op_last_header = ReadFilterHeader(new_tip->nHeight, new_tip->GetBlockHash());
if (!op_last_header) {
return error("%s: Failed to read filter header for rewound tip", __func__);
}
m_last_header = *op_last_header;

return BaseIndex::Rewind(current_tip, new_tip);
}

Expand Down
7 changes: 7 additions & 0 deletions src/index/blockfilterindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ class BlockFilterIndex final : public BaseIndex
/** cache of block hash to filter header, to avoid disk access when responding to getcfcheckpt. */
std::unordered_map<uint256, uint256, FilterHeaderHasher> m_headers_cache GUARDED_BY(m_cs_headers_cache);

// Last computed header to avoid disk reads on every new block.
uint256 m_last_header{};

bool AllowPrune() const override { return true; }

bool Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header);

std::optional<uint256> ReadFilterHeader(int height, const uint256& expected_block_hash);

protected:
bool Init() override;

Expand Down
21 changes: 6 additions & 15 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,8 @@ void Interrupt(NodeContext& node)
InterruptMapPort();
if (node.connman)
node.connman->Interrupt();
if (g_txindex) {
g_txindex->Interrupt();
}
ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Interrupt(); });
if (g_coin_stats_index) {
g_coin_stats_index->Interrupt();
for (auto* index : node.indexes) {
index->Interrupt();
}
}

Expand Down Expand Up @@ -328,16 +324,11 @@ void PrepareShutdown(NodeContext& node)
node.mn_metaman.reset();

// Stop and delete all indexes only after flushing background callbacks.
if (g_txindex) {
g_txindex->Stop();
g_txindex.reset();
}
if (g_coin_stats_index) {
g_coin_stats_index->Stop();
g_coin_stats_index.reset();
}
ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Stop(); });
for (auto* index : node.indexes) index->Stop();
if (g_txindex) g_txindex.reset();
if (g_coin_stats_index) g_coin_stats_index.reset();
DestroyAllBlockFilterIndexes();
node.indexes.clear(); // all instances are nullptr now

// Any future callbacks will be dropped. This should absolutely be safe - if
// missing a callback results in an unrecoverable situation, unclean shutdown
Expand Down
Loading