Skip to content

Commit

Permalink
LookUpIndexProcessor traverse all partitions concurrently (vesoft-inc…
Browse files Browse the repository at this point in the history
…#2417)

Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com>
  • Loading branch information
MMyheart and CPWstatic authored Jan 8, 2021
1 parent 385400c commit 4ebcec1
Show file tree
Hide file tree
Showing 9 changed files with 1,013 additions and 92 deletions.
7 changes: 7 additions & 0 deletions src/storage/StorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ DEFINE_int32(vertex_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");
DEFINE_int32(vertex_cache_bucket_exp, 4, "Total buckets number is 1 << cache_bucket_exp");
DEFINE_int32(reader_handlers, 32, "Total reader handlers");
DEFINE_string(reader_handlers_type, "cpu", "Type of reader handlers, options: cpu,io");
DEFINE_bool(lookup_concurrently, false,
"whether to traversal partitions concurrently in lookup processor");

namespace nebula {
namespace storage {
Expand Down Expand Up @@ -249,10 +251,15 @@ StorageServiceHandler::future_rebuildEdgeIndex(const cpp2::RebuildIndexRequest&

folly::Future<cpp2::LookUpIndexResp>
StorageServiceHandler::future_lookUpIndex(const cpp2::LookUpIndexRequest& req) {
folly::Executor* executor = nullptr;
if (FLAGS_lookup_concurrently) {
executor = readerPool_.get();
}
auto* processor = LookUpIndexProcessor::instance(kvstore_,
schemaMan_,
indexMan_,
&lookupVerticesQpsStat_,
executor,
&vertexCache_);
RETURN_FUTURE(processor);
}
Expand Down
25 changes: 25 additions & 0 deletions src/storage/index/IndexExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
namespace nebula {
namespace storage {

using PartKeys = ErrorOr<kvstore::ResultCode, std::vector<std::string>>;
using EdgeRows = ErrorOr<kvstore::ResultCode, std::vector<cpp2::Edge>>;
using VertexRows = ErrorOr<kvstore::ResultCode, std::vector<cpp2::VertexIndexData>>;

template<typename RESP>
class IndexExecutor : public BaseProcessor<RESP>
, public IndexPolicyMaker {
Expand All @@ -25,9 +29,11 @@ class IndexExecutor : public BaseProcessor<RESP>
meta::IndexManager* indexMan,
stats::Stats* stats,
VertexCache* cache,
folly::Executor* executor,
bool isEdgeIndex = false)
: BaseProcessor<RESP>(kvstore, schemaMan, stats)
, IndexPolicyMaker(schemaMan, indexMan)
, executor_(executor)
, vertexCache_(cache)
, isEdgeIndex_(isEdgeIndex) {}

Expand Down Expand Up @@ -58,7 +64,19 @@ class IndexExecutor : public BaseProcessor<RESP>
**/
kvstore::ResultCode executeExecutionPlan(PartitionID part);

/**
* Details Scan index parts
**/
folly::Future<std::unordered_map<PartitionID, kvstore::ResultCode>>
executeExecutionPlanConcurrently(const std::vector<PartitionID>& parts);

private:
folly::Future<std::unordered_map<PartitionID, kvstore::ResultCode>>
executeExecutionPlanForEdge(const std::vector<PartitionID>& parts);

folly::Future<std::unordered_map<PartitionID, kvstore::ResultCode>>
executeExecutionPlanForVertex(const std::vector<PartitionID>& parts);

cpp2::ErrorCode checkIndex(IndexID indexId);

cpp2::ErrorCode checkReturnColumns(const std::vector<std::string> &cols);
Expand All @@ -84,8 +102,15 @@ class IndexExecutor : public BaseProcessor<RESP>
folly::StringPiece getIndexVal(const folly::StringPiece& key,
const folly::StringPiece& prop);

/**
* Details Scan index key
**/
kvstore::ResultCode getIndexKey(PartitionID part,
std::vector<std::string>& keys);

protected:
GraphSpaceID spaceId_;
folly::Executor* executor_{nullptr};
VertexCache* vertexCache_{nullptr};
std::shared_ptr<SchemaWriter> schema_{nullptr};
std::vector<cpp2::VertexIndexData> vertexRows_;
Expand Down
180 changes: 173 additions & 7 deletions src/storage/index/IndexExecutor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,180 @@ IndexExecutor<RESP>::normalizeScanPair(const nebula::cpp2::ColumnDef& field,

template <typename RESP>
kvstore::ResultCode IndexExecutor<RESP>::executeExecutionPlan(PartitionID part) {
std::unique_ptr<kvstore::KVIterator> iter;
std::vector<std::string> keys;
auto ret = getIndexKey(part, keys);
if (ret != nebula::kvstore::SUCCEEDED) {
return ret;
}
for (auto& item : keys) {
ret = getDataRow(part, item);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return ret;
}
}
return ret;
}

template <typename RESP>
folly::Future<std::unordered_map<PartitionID, kvstore::ResultCode>>
IndexExecutor<RESP>::executeExecutionPlanConcurrently(
const std::vector<PartitionID>& parts) {
if (isEdgeIndex_) {
return executeExecutionPlanForEdge(parts);
}
return executeExecutionPlanForVertex(parts);
}

template <typename RESP>
folly::Future<std::unordered_map<PartitionID, kvstore::ResultCode>>
IndexExecutor<RESP>::executeExecutionPlanForEdge(
const std::vector<PartitionID>& parts) {
std::vector<folly::Future<std::tuple<PartitionID, EdgeRows>>> results;
for (auto& part : parts) {
folly::Promise<std::tuple<PartitionID, EdgeRows>> pro;
auto f = pro.getFuture();
executor_->add([this, p = std::move(pro), &part] () mutable {
std::tuple<PartitionID, EdgeRows> partEdgeRows;
std::vector<std::string> keys;
auto code = getIndexKey(part, keys);
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Execute execution plan for edge concurrently! "
<< "getIndexKey error, ret = "
<< static_cast<int32_t>(code)
<< ", spaceId = " << spaceId_
<< ", partId = " << part;
partEdgeRows = std::make_tuple(part, code);
p.setValue(partEdgeRows);
return;
}

std::vector<cpp2::Edge> edgeDatas;
for (auto& key : keys) {
cpp2::Edge data;
code = getEdgeRow(part, key, &data);
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "execute execution plan for edge concurrently! "
<< "getDataRow error, ret = "
<< static_cast<int32_t>(code)
<< ", spaceId = " << spaceId_
<< ", partId = " << part;
partEdgeRows = std::make_tuple(part, code);
p.setValue(partEdgeRows);
return;
}
edgeDatas.emplace_back(std::move(data));
}

partEdgeRows = std::make_tuple(part, edgeDatas);
p.setValue(std::move(partEdgeRows));
});
results.emplace_back(std::move(f));
}
folly::Promise<std::unordered_map<PartitionID, kvstore::ResultCode>> resultPro;
auto result = resultPro.getFuture();
folly::collect(results).via(executor_).then([this, pro = std::move(resultPro)](
const std::vector<std::tuple<PartitionID, EdgeRows>>& partEdgeRows) mutable {
std::unordered_map<PartitionID, kvstore::ResultCode> res;
for (const auto& partEdgeRow : partEdgeRows) {
auto part = std::get<0>(partEdgeRow);
auto keysOr = std::get<1>(partEdgeRow);
if (!nebula::ok(keysOr)) {
auto code = nebula::error(keysOr);
res.emplace(part, code);
}
}
if (!res.empty()) {
pro.setValue(res);
return;
}

for (const auto& partEdgeRow : partEdgeRows) {
auto keysOr = std::get<1>(partEdgeRow);
std::vector<cpp2::Edge> edgeRows = nebula::value(keysOr);
edgeRows_.insert(edgeRows_.end(), edgeRows.begin(), edgeRows.end());
}
pro.setValue(res);
});
return result;
}

template <typename RESP>
folly::Future<std::unordered_map<PartitionID, kvstore::ResultCode>>
IndexExecutor<RESP>::executeExecutionPlanForVertex(
const std::vector<PartitionID>& parts) {
std::vector<folly::Future<std::tuple<PartitionID, VertexRows>>> results;
for (auto& part : parts) {
folly::Promise<std::tuple<PartitionID, VertexRows>> pro;
auto f = pro.getFuture();
executor_->add([this, p = std::move(pro), &part] () mutable {
std::tuple<PartitionID, VertexRows> partVertexRows;
std::vector<std::string> keys;
auto code = getIndexKey(part, keys);
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "execute execution plan for vertex concurrently! "
<< "getIndexKey error, ret = "
<< static_cast<int32_t>(code)
<< ", spaceId = " << spaceId_
<< ", partId = " << part;
partVertexRows = std::make_tuple(part, code);
p.setValue(partVertexRows);
return;
}

std::vector<cpp2::VertexIndexData> vertexDatas;
for (auto& key : keys) {
cpp2::VertexIndexData data;
code = getVertexRow(part, key, &data);
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "execute execution plan for vertex concurrently! "
<< "getDataRow error, ret = "
<< static_cast<int32_t>(code)
<< ", spaceId = " << spaceId_
<< ", partId = " << part;
partVertexRows = std::make_tuple(part, code);
p.setValue(partVertexRows);
return;
}
vertexDatas.emplace_back(std::move(data));
}

partVertexRows = std::make_tuple(part, vertexDatas);
p.setValue(std::move(partVertexRows));
});
results.emplace_back(std::move(f));
}
folly::Promise<std::unordered_map<PartitionID, kvstore::ResultCode>> resultPro;
auto result = resultPro.getFuture();
folly::collect(results).via(executor_).then([this, pro = std::move(resultPro)](
const std::vector<std::tuple<PartitionID, VertexRows>>& partVertexRows) mutable {
std::unordered_map<PartitionID, kvstore::ResultCode> res;
for (const auto& partVertexRow : partVertexRows) {
auto part = std::get<0>(partVertexRow);
auto keysOr = std::get<1>(partVertexRow);
if (!nebula::ok(keysOr)) {
auto code = nebula::error(keysOr);
res.emplace(part, code);
}
}
if (!res.empty()) {
pro.setValue(res);
return;
}

for (const auto& partVertexRow : partVertexRows) {
auto keysOr = std::get<1>(partVertexRow);
std::vector<cpp2::VertexIndexData> vertexDatas = nebula::value(keysOr);
vertexRows_.insert(vertexRows_.end(), vertexDatas.begin(), vertexDatas.end());
}
pro.setValue(res);
});
return result;
}

template <typename RESP>
kvstore::ResultCode IndexExecutor<RESP>::getIndexKey(PartitionID part,
std::vector<std::string>& keys) {
std::unique_ptr<kvstore::KVIterator> iter;
auto pair = makeScanPair(part, index_->get_index_id());
if (pair.first.empty() || pair.second.empty()) {
return kvstore::ResultCode::ERR_KEY_NOT_FOUND;
Expand All @@ -235,12 +407,6 @@ kvstore::ResultCode IndexExecutor<RESP>::executeExecutionPlan(PartitionID part)
keys.emplace_back(key);
iter->next();
}
for (auto& item : keys) {
ret = getDataRow(part, item);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return ret;
}
}
return ret;
}

Expand Down
37 changes: 27 additions & 10 deletions src/storage/index/LookUpIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,39 @@ void LookUpIndexProcessor::process(const cpp2::LookUpIndexRequest& req) {
/**
* step 3 : execute index scan.
*/
for (auto partId : req.get_parts()) {
auto code = executeExecutionPlan(partId);
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Execute Execution Plan! ret = " << static_cast<int32_t>(code)
<< ", spaceId = " << spaceId_
<< ", partId = " << partId;
if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) {
this->handleLeaderChanged(spaceId_, partId);
} else {
this->pushResultCode(this->to(code), partId);
if (executor_ == nullptr) {
for (auto partId : req.get_parts()) {
auto code = executeExecutionPlan(partId);
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Execute Execution Plan! ret = " << static_cast<int32_t>(code)
<< ", spaceId = " << spaceId_
<< ", partId = " << partId;
if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) {
this->handleLeaderChanged(spaceId_, partId);
} else {
this->pushResultCode(this->to(code), partId);
}
this->onFinished();
return;
}
}
} else {
auto future = executeExecutionPlanConcurrently(req.get_parts());
auto map = std::move(future).get();
if (!map.empty()) {
for (auto& errorPart : map) {
if (errorPart.second == kvstore::ResultCode::ERR_LEADER_CHANGED) {
this->handleLeaderChanged(spaceId_, errorPart.first);
} else {
this->pushResultCode(this->to(errorPart.second), errorPart.first);
}
}
this->onFinished();
return;
}
}


/**
* step 4 : collect result.
*/
Expand Down
7 changes: 5 additions & 2 deletions src/storage/index/LookUpIndexProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ class LookUpIndexProcessor: public IndexExecutor<cpp2::LookUpIndexResp> {
meta::SchemaManager* schemaMan,
meta::IndexManager* indexMan,
stats::Stats* stats,
folly::Executor* executor,
VertexCache* cache = nullptr) {
return new LookUpIndexProcessor(kvstore, schemaMan, indexMan, stats, cache);
return new LookUpIndexProcessor(kvstore, schemaMan, indexMan, stats, executor, cache);
}

void process(const cpp2::LookUpIndexRequest& req);
Expand All @@ -29,8 +30,10 @@ class LookUpIndexProcessor: public IndexExecutor<cpp2::LookUpIndexResp> {
meta::SchemaManager* schemaMan,
meta::IndexManager* indexMan,
stats::Stats* stats,
folly::Executor* executor,
VertexCache* cache = nullptr)
: IndexExecutor<cpp2::LookUpIndexResp>(kvstore, schemaMan, indexMan, stats, cache) {}
: IndexExecutor<cpp2::LookUpIndexResp>(kvstore, schemaMan,
indexMan, stats, cache, executor) {}
};

} // namespace storage
Expand Down
16 changes: 16 additions & 0 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,22 @@ nebula_add_executable(
gtest
)

nebula_add_executable(
NAME
storage_lookup_concurrently_bm
SOURCES
StorageLookupConcurrentlyBenchmark.cpp
OBJECTS
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
follybenchmark
boost_regex
wangle
gtest
)

nebula_add_test(
NAME
kv_test
Expand Down
Loading

0 comments on commit 4ebcec1

Please sign in to comment.