Skip to content

Commit

Permalink
Try to use whole key bloom filter when insert data with index (vesoft…
Browse files Browse the repository at this point in the history
…-inc#2377)

* use bloom filter if available

* add ut
  • Loading branch information
critical27 authored Oct 27, 2020
1 parent 5593705 commit b4ace3f
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 18 deletions.
7 changes: 7 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "stats/StatsManager.h"
#include "stats/Stats.h"

DECLARE_bool(enable_multi_versions);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -57,6 +59,11 @@ class BaseProcessor {
delete this;
}

kvstore::ResultCode doGetFirstRecord(GraphSpaceID spaceId,
PartitionID partId,
std::string& key,
std::string* value);

void doPut(GraphSpaceID spaceId, PartitionID partId, std::vector<kvstore::KV> data);

kvstore::ResultCode doSyncPut(GraphSpaceID spaceId,
Expand Down
19 changes: 19 additions & 0 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ void BaseProcessor<RESP>::handleAsync(GraphSpaceID spaceId,
}
}

template <typename RESP>
kvstore::ResultCode BaseProcessor<RESP>::doGetFirstRecord(GraphSpaceID spaceId,
PartitionID partId,
std::string& key,
std::string* value) {
if (!FLAGS_enable_multi_versions) {
int64_t version = folly::Endian::big(0L);
key.append(reinterpret_cast<const char*>(&version), sizeof(int64_t));
return kvstore_->get(spaceId, partId, key, value);
} else {
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(spaceId, partId, key, &iter);
if (ret == kvstore::ResultCode::SUCCEEDED && iter && iter->valid()) {
*value = iter->val().str();
}
return ret;
}
}

template <typename RESP>
void BaseProcessor<RESP>::doPut(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
13 changes: 4 additions & 9 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,10 @@ std::string AddEdgesProcessor::findObsoleteIndex(PartitionID partId,
NebulaKeyUtils::getEdgeType(rawKey),
NebulaKeyUtils::getRank(rawKey),
NebulaKeyUtils::getDstId(rawKey));
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(this->spaceId_, partId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceId " << this->spaceId_;
return "";
}
if (iter && iter->valid()) {
return iter->val().str();
std::string value;
auto ret = doGetFirstRecord(spaceId_, partId, prefix, &value);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return value;
}
return "";
}
Expand Down
13 changes: 4 additions & 9 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,10 @@ std::string AddVerticesProcessor::findObsoleteIndex(PartitionID partId,
VertexID vId,
TagID tagId) {
auto prefix = NebulaKeyUtils::vertexPrefix(partId, vId, tagId);
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kvstore_->prefix(this->spaceId_, partId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceId " << this->spaceId_;
return "";
}
if (iter && iter->valid()) {
return iter->val().str();
std::string value;
auto ret = doGetFirstRecord(spaceId_, partId, prefix, &value);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return value;
}
return "";
}
Expand Down
110 changes: 110 additions & 0 deletions src/storage/test/IndexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <gtest/gtest.h>
#include <rocksdb/db.h>
#include "fs/TempDir.h"
#include "kvstore/RocksEngineConfig.h"
#include "storage/test/TestUtils.h"
#include "storage/mutate/AddVerticesProcessor.h"
#include "storage/mutate/AddEdgesProcessor.h"
Expand Down Expand Up @@ -630,6 +631,115 @@ TEST(IndexTest, RebulidEdgeIndexWithOfflineTest) {
}
}

TEST(IndexTest, VertexBloomFilterTest) {
// vertex bloom filter should be used when enable_multi_versions is false
FLAGS_enable_rocksdb_statistics = true;
fs::TempDir rootPath("/tmp/InsertVerticesTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());
auto schemaMan = TestUtils::mockSchemaMan();
auto indexMan = TestUtils::mockIndexMan();

auto writeData = [&] (VertexID vIdFrom, VertexID vIdTo) {
cpp2::AddVerticesRequest req;
req.space_id = 0;
req.overwritable = true;
PartitionID partId = 0;
auto vertices = TestUtils::setupVertices(partId, vIdFrom, vIdTo, 3001, 3010);
req.parts.emplace(partId, std::move(vertices));
auto* processor = AddVerticesProcessor::instance(kv.get(),
schemaMan.get(),
indexMan.get(),
nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.failed_codes.size());
};

auto statistics = kvstore::getDBStatistics();
// write initial data
writeData(0, 100);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, kv->flush(0));

// overwrite existed data
writeData(0, 100);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, kv->flush(0));

// write not existed data
writeData(100, 200);
EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
auto count = statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL);
EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, kv->flush(0));

// when enable_multi_versions is true, whole key bloom filter won't be used anymore
FLAGS_enable_multi_versions = true;
writeData(200, 300);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), count);
FLAGS_enable_multi_versions = false;

FLAGS_enable_rocksdb_statistics = false;
}

TEST(IndexTest, EdgeBloomFilterTest) {
// edge bloom filter should be used when enable_multi_versions is false
FLAGS_enable_rocksdb_statistics = true;
fs::TempDir rootPath("/tmp/InsertEdgesTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());
auto schemaMan = TestUtils::mockSchemaMan();
auto indexMan = TestUtils::mockIndexMan();

auto writeData = [&] (VertexID vIdFrom, VertexID vIdTo) {
auto* processor = AddEdgesProcessor::instance(kv.get(),
schemaMan.get(),
indexMan.get(),
nullptr);
cpp2::AddEdgesRequest req;
req.space_id = 0;
req.overwritable = true;
PartitionID partId = 0;
auto edges = TestUtils::setupEdges(partId, vIdFrom, vIdTo, 101);
req.parts.emplace(partId, std::move(edges));
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.failed_codes.size());
};

// reset stat count
auto statistics = kvstore::getDBStatistics();
statistics->getAndResetTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);

// write initial data
writeData(0, 100);
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, kv->flush(0));

// overwrite existed data
writeData(0, 100);
statistics = kvstore::getDBStatistics();
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, kv->flush(0));

// write not existed data
writeData(100, 200);
statistics = kvstore::getDBStatistics();
EXPECT_GT(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), 0);
auto count = statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL);
EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, kv->flush(0));

// when enable_multi_versions is true, whole key bloom filter won't be used anymore
FLAGS_enable_multi_versions = true;
writeData(200, 300);
statistics = kvstore::getDBStatistics();
EXPECT_EQ(statistics->getTickerCount(rocksdb::Tickers::BLOOM_FILTER_USEFUL), count);
FLAGS_enable_multi_versions = false;

FLAGS_enable_rocksdb_statistics = false;
}

} // namespace storage
} // namespace nebula

Expand Down

0 comments on commit b4ace3f

Please sign in to comment.