Skip to content

Commit

Permalink
Add prefix bloom filter support (vesoft-inc#2274)
Browse files Browse the repository at this point in the history
* Add prefix bloom filter support

* fix checkstyle

* Address review comments

* Changed option name

Co-authored-by: yaphet <darion.wang@vesoft.com>
Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 7, 2020
1 parent dcf96d9 commit 12a54dc
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 0 deletions.
8 changes: 8 additions & 0 deletions etc/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@
# * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable the whole key filtering.
--enable_rocksdb_whole_key_filtering=true
# The prefix length for each key to use as the filter value.
# can be 12 bytes(PartitionId + VertexID), or 16 bytes(PartitionId + VertexID + TagID/EdgeType).
--rocksdb_filtering_prefix_length=12

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
--rocksdb_db_options={"max_subcompactions":"1","max_background_jobs":"1"}
Expand Down
8 changes: 8 additions & 0 deletions etc/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@
# * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable the whole key filtering.
--enable_rocksdb_whole_key_filtering=true
# The prefix length for each key to use as the filter value.
# can be 12 bytes(PartitionId + VertexID), or 16 bytes(PartitionId + VertexID + TagID/EdgeType).
--rocksdb_filtering_prefix_length=12

############### misc ####################
--max_handlers_per_req=1
--heartbeat_interval_secs=10
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ ResultCode RocksEngine::range(const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.total_order_seek = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand All @@ -204,6 +205,7 @@ ResultCode RocksEngine::range(const std::string& start,
ResultCode RocksEngine::prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.prefix_same_as_start = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
Expand All @@ -217,6 +219,7 @@ ResultCode RocksEngine::rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.prefix_same_as_start = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down
35 changes: 35 additions & 0 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/concurrent_task_limiter.h"
#include "rocksdb/rate_limiter.h"
#include "base/Configuration.h"
#include "utils/NebulaKeyUtils.h"

// [WAL]
DEFINE_bool(rocksdb_disable_wal,
Expand Down Expand Up @@ -71,9 +72,38 @@ DEFINE_int32(num_compaction_threads, 0,
DEFINE_int32(rate_limit, 0,
"write limit in bytes per sec. The unit is MB. 0 means unlimited.");

DEFINE_bool(enable_rocksdb_prefix_filtering, false,
"Whether or not to enable rocksdb's prefix bloom filter.");
DEFINE_bool(enable_rocksdb_whole_key_filtering, true,
"Whether or not to enable the whole key filtering.");
DEFINE_int32(rocksdb_filtering_prefix_length, 12,
"The prefix length, default value is 12 bytes(PartitionID+VertexID).");

namespace nebula {
namespace kvstore {

class GraphPrefixTransform : public rocksdb::SliceTransform {
private:
size_t prefixLen_;
std::string name_;

public:
explicit GraphPrefixTransform(size_t prefixLen)
: prefixLen_(prefixLen),
name_("nebula.GraphPrefix." + std::to_string(prefixLen_)) {}

const char* Name() const override { return name_.c_str(); }

rocksdb::Slice Transform(const rocksdb::Slice& src) const override {
return rocksdb::Slice(src.data(), prefixLen_);
}

bool InDomain(const rocksdb::Slice& src) const override {
return src.size() >= prefixLen_ && NebulaKeyUtils::isDataKey(
folly::StringPiece(src.data(), 1));
}
};

static rocksdb::Status initRocksdbCompression(rocksdb::Options &baseOpts) {
static std::unordered_map<std::string, rocksdb::CompressionType> m = {
{ "no", rocksdb::kNoCompression },
Expand Down Expand Up @@ -194,6 +224,11 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) {
bbtOpts.pin_l0_filter_and_index_blocks_in_cache =
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
if (FLAGS_enable_rocksdb_prefix_filtering) {
baseOpts.prefix_extractor.reset(
new GraphPrefixTransform(FLAGS_rocksdb_filtering_prefix_length));
}
bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering;
baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts));
baseOpts.create_if_missing = true;
return s;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/RocksEngineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ DECLARE_string(rocksdb_compression);
DECLARE_bool(enable_rocksdb_statistics);
DECLARE_string(rocksdb_stats_level);

DECLARE_bool(enable_rocksdb_prefix_filtering);
DECLARE_bool(enable_rocksdb_whole_key_filtering);
DECLARE_int32(rocksdb_filtering_prefix_length);

namespace nebula {
namespace kvstore {

Expand Down
15 changes: 15 additions & 0 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,21 @@ nebula_add_executable(
boost_regex
)

nebula_add_executable(
NAME
prefix_bloom_filter_bm
SOURCES
PrefixBloomFilterBenchmark.cpp
OBJECTS
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
follybenchmark
wangle
boost_regex
)


nebula_add_executable(
NAME
Expand Down
119 changes: 119 additions & 0 deletions src/storage/test/PrefixBloomFilterBenchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include <rocksdb/db.h>
#include <folly/Benchmark.h>
#include "fs/TempDir.h"
#include "kvstore/RocksEngineConfig.h"
#include "storage/test/TestUtils.h"
#include "utils/NebulaKeyUtils.h"

DEFINE_int64(part_number, 20, "partition numbers");
DEFINE_int64(vertex_per_part, 100, "vertex count with each partition");

namespace nebula {
namespace storage {

void mockData(kvstore::KVStore *kv) {
LOG(INFO) << "Prepare data...";
for (PartitionID partId = 0; partId < FLAGS_part_number; partId++) {
std::vector<kvstore::KV> data;
for (int32_t vertexId = partId * FLAGS_vertex_per_part;
vertexId < (partId + 1) * FLAGS_vertex_per_part; vertexId++) {
for (TagID tagId = 3001; tagId < 3010; tagId++) {
auto key = NebulaKeyUtils::vertexKey(partId, vertexId, tagId, 1);
auto val = folly::stringPrintf("%d_%d", vertexId, tagId);
data.emplace_back(std::move(key), std::move(val));
}
}
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, partId, std::move(data),
[&](kvstore::ResultCode code) {
baton.post();
folly::doNotOptimizeAway(code);
});
baton.wait();
kv->flush(0);
}
}

void testPrefixSeek(kvstore::KVStore *kv, int32_t iters) {
for (decltype(iters) i = 0; i < iters; i++) {
for (PartitionID partId = 0; partId < FLAGS_part_number; partId++) {
for (int32_t vertexId = partId * FLAGS_vertex_per_part;
vertexId < (partId + 1) * FLAGS_vertex_per_part; vertexId++) {
auto prefix = NebulaKeyUtils::vertexPrefix(partId, vertexId);
std::unique_ptr<kvstore::KVIterator> iter;
kv->prefix(0, partId, prefix, &iter);
iter->next();
}
}
}
}

BENCHMARK(PrefixWithFilterOff, n) {
folly::BenchmarkSuspender braces;
FLAGS_rocksdb_column_family_options = R"({
"level0_file_num_compaction_trigger":"100"
})";
FLAGS_rocksdb_block_cache = 0;
fs::TempDir rootPath("/tmp/PrefixBloomFilterBenchmark.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path(),
FLAGS_part_number);
mockData(kv.get());
braces.dismiss();
testPrefixSeek(kv.get(), n);
}

BENCHMARK_RELATIVE(PrefixWithFilterOn, n) {
folly::BenchmarkSuspender braces;
FLAGS_rocksdb_column_family_options = R"({
"level0_file_num_compaction_trigger":"100"
})";
FLAGS_enable_rocksdb_prefix_filtering = true;
FLAGS_rocksdb_block_cache = 0;
fs::TempDir rootPath("/tmp/PrefixBloomFilterBenchmark.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path(),
FLAGS_part_number);
mockData(kv.get());
braces.dismiss();
testPrefixSeek(kv.get(), n);
}

} // namespace storage
} // namespace nebula

int main(int argc, char **argv) {
folly::init(&argc, &argv, true);
folly::runBenchmarks();
return 0;
}

/*
40 processors, Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz
--part_number=20 --vertex_per_part=100
============================================================================
PrefixBloomFilterBenchmark.cpprelative time/iter iters/s
PrefixWithFilterOff 4.07s 245.88m
PrefixWithFilterOn 121.40% 3.35s 298.51m
============================================================================
--part_number=40 --vertex_per_part=100
============================================================================
PrefixBloomFilterBenchmark.cpprelative time/iter iters/s
PrefixWithFilterOff 15.37s 65.07m
PrefixWithFilterOn 137.75% 11.16s 89.63m
============================================================================
--part_number=40 --vertex_per_part=200
============================================================================
PrefixBloomFilterBenchmark.cpprelative time/iter iters/s
PrefixWithFilterOff 35.29s 28.34m
PrefixWithFilterOn 148.22% 23.81s 42.01m
============================================================================
*/
22 changes: 22 additions & 0 deletions src/storage/test/QueryVertexPropsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <gtest/gtest.h>
#include <rocksdb/db.h>
#include "fs/TempDir.h"
#include "kvstore/RocksEngineConfig.h"
#include "storage/test/TestUtils.h"
#include "storage/query/QueryVertexPropsProcessor.h"
#include "dataman/RowSetReader.h"
Expand Down Expand Up @@ -48,6 +49,10 @@ void mockData(kvstore::KVStore* kv,
baton.post();
});
baton.wait();
if (FLAGS_enable_rocksdb_prefix_filtering) {
kvstore::ResultCode code = kv->flush(0); // flush per partition
EXPECT_EQ(code, kvstore::ResultCode::SUCCEEDED);
}
}
}

Expand Down Expand Up @@ -181,6 +186,23 @@ TEST(QueryVertexPropsTest, SimpleTest) {
testWithVersion(kv.get(), schemaMng.get(), executor.get(), version);
}

TEST(QueryVertexPropsTest, PrefixBloomFilterTest) {
FLAGS_enable_rocksdb_statistics = true;
FLAGS_enable_rocksdb_prefix_filtering = true;
fs::TempDir rootPath("/tmp/QueryVertexPropsTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());

LOG(INFO) << "Prepare meta...";
auto schemaMng = TestUtils::mockSchemaMan();
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(3);
TagVersion version = 1;
mockData(kv.get(), schemaMng.get(), version);
testWithVersion(kv.get(), schemaMng.get(), executor.get(), version);
std::shared_ptr<rocksdb::Statistics> statistics = kvstore::getDBStatistics();
ASSERT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_CHECKED), 0);
ASSERT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_PREFIX_USEFUL), 0);
}

TEST(QueryVertexPropsTest, QueryAfterTagAltered) {
fs::TempDir rootPath("/tmp/QueryVertexPropsTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());
Expand Down

0 comments on commit 12a54dc

Please sign in to comment.