Skip to content

Commit

Permalink
Support build index offline (vesoft-inc#1566)
Browse files Browse the repository at this point in the history
Because we are only support offline mode, when you want to rebuild index after create it, you should make sure it's no writing to the nebula. The online mode will coming soon.

The command as following:

REBUILD TAG/EDGE INDEX ${INDEX_NAME} OFFLINE;
  • Loading branch information
yaphet authored Mar 9, 2020
1 parent fdeed62 commit ba8b3f6
Show file tree
Hide file tree
Showing 96 changed files with 1,977 additions and 461 deletions.
3 changes: 2 additions & 1 deletion src/common/base/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
#include "interface/gen-cpp2/common_types.h"
#include "common/filter/Expressions.h"

namespace nebula {
using IndexValues = std::vector<std::pair<nebula::cpp2::SupportedType, std::string>>;

namespace nebula {

/**
* VertexKeyUtils:
* type(1) + partId(3) + vertexId(8) + tagId(4) + version(8)
Expand Down
4 changes: 2 additions & 2 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ nebula_add_library(
CreateEdgeIndexExecutor.cpp
DropEdgeIndexExecutor.cpp
DescribeEdgeIndexExecutor.cpp
BuildEdgeIndexExecutor.cpp
RebuildEdgeIndexExecutor.cpp
CreateTagIndexExecutor.cpp
DropTagIndexExecutor.cpp
DescribeTagIndexExecutor.cpp
BuildTagIndexExecutor.cpp
RebuildTagIndexExecutor.cpp
UpdateEdgeExecutor.cpp
AssignmentExecutor.cpp
InterimResult.cpp
Expand Down
12 changes: 6 additions & 6 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include "graph/DropEdgeIndexExecutor.h"
#include "graph/DescribeTagIndexExecutor.h"
#include "graph/DescribeEdgeIndexExecutor.h"
#include "graph/BuildTagIndexExecutor.h"
#include "graph/BuildEdgeIndexExecutor.h"
#include "graph/RebuildTagIndexExecutor.h"
#include "graph/RebuildEdgeIndexExecutor.h"
#include "graph/InsertVertexExecutor.h"
#include "graph/InsertEdgeExecutor.h"
#include "graph/AssignmentExecutor.h"
Expand Down Expand Up @@ -118,11 +118,11 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kDropEdgeIndex:
executor = std::make_unique<DropEdgeIndexExecutor>(sentence, ectx());
break;
case Sentence::Kind::kBuildTagIndex:
executor = std::make_unique<BuildTagIndexExecutor>(sentence, ectx());
case Sentence::Kind::kRebuildTagIndex:
executor = std::make_unique<RebuildTagIndexExecutor>(sentence, ectx());
break;
case Sentence::Kind::kBuildEdgeIndex:
executor = std::make_unique<BuildEdgeIndexExecutor>(sentence, ectx());
case Sentence::Kind::kRebuildEdgeIndex:
executor = std::make_unique<RebuildEdgeIndexExecutor>(sentence, ectx());
break;
case Sentence::Kind::kInsertVertex:
executor = std::make_unique<InsertVertexExecutor>(sentence, ectx());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "graph/BuildEdgeIndexExecutor.h"
#include "graph/RebuildEdgeIndexExecutor.h"

namespace nebula {
namespace graph {

BuildEdgeIndexExecutor::BuildEdgeIndexExecutor(Sentence *sentence,
RebuildEdgeIndexExecutor::RebuildEdgeIndexExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<BuildEdgeIndexSentence*>(sentence);
sentence_ = static_cast<RebuildEdgeIndexSentence*>(sentence);
}

Status BuildEdgeIndexExecutor::prepare() {
Status RebuildEdgeIndexExecutor::prepare() {
return Status::OK();
}

void BuildEdgeIndexExecutor::execute() {
void RebuildEdgeIndexExecutor::execute() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
DCHECK(onError_);
Expand All @@ -28,9 +28,10 @@ void BuildEdgeIndexExecutor::execute() {

auto *mc = ectx()->getMetaClient();
auto *name = sentence_->indexName();
auto isOffline = sentence_->isOffline();
auto spaceId = ectx()->rctx()->session()->space();

auto future = mc->buildEdgeIndex(spaceId, *name);
auto future = mc->rebuildEdgeIndex(spaceId, *name, isOffline);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
namespace nebula {
namespace graph {

class BuildEdgeIndexExecutor final : public Executor {
class RebuildEdgeIndexExecutor final : public Executor {
public:
BuildEdgeIndexExecutor(Sentence *sentence, ExecutionContext *ectx);
RebuildEdgeIndexExecutor(Sentence *sentence, ExecutionContext *ectx);

const char* name() const override {
return "BuildEdgeIndexExecutor";
return "RebuildEdgeIndexExecutor";
}

Status MUST_USE_RESULT prepare() override;

void execute() override;

private:
BuildEdgeIndexSentence *sentence_{nullptr};
RebuildEdgeIndexSentence *sentence_{nullptr};
};

} // namespace graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "graph/BuildTagIndexExecutor.h"
#include "graph/RebuildTagIndexExecutor.h"

namespace nebula {
namespace graph {

BuildTagIndexExecutor::BuildTagIndexExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<BuildTagIndexSentence*>(sentence);
RebuildTagIndexExecutor::RebuildTagIndexExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<RebuildTagIndexSentence*>(sentence);
}

Status BuildTagIndexExecutor::prepare() {
Status RebuildTagIndexExecutor::prepare() {
return Status::OK();
}

void BuildTagIndexExecutor::execute() {
void RebuildTagIndexExecutor::execute() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
DCHECK(onError_);
Expand All @@ -28,9 +28,10 @@ void BuildTagIndexExecutor::execute() {

auto *mc = ectx()->getMetaClient();
auto *name = sentence_->indexName();
auto isOffline = sentence_->isOffline();
auto spaceId = ectx()->rctx()->session()->space();

auto future = mc->buildTagIndex(spaceId, *name);
auto future = mc->rebuildTagIndex(spaceId, *name, isOffline);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,33 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef GRAPH_BUILDTAGINDEXEXECUTOR_H_
#define GRAPH_BUILDTAGINDEXEXECUTOR_H_
#ifndef GRAPH_REBUILDTAGINDEXEXECUTOR_H_
#define GRAPH_REBUILDTAGINDEXEXECUTOR_H_

#include "base/Base.h"
#include "graph/Executor.h"

namespace nebula {
namespace graph {

class BuildTagIndexExecutor final : public Executor {
class RebuildTagIndexExecutor final : public Executor {
public:
BuildTagIndexExecutor(Sentence *sentence, ExecutionContext *ectx);
RebuildTagIndexExecutor(Sentence *sentence, ExecutionContext *ectx);

const char* name() const override {
return "BuildTagIndexExecutor";
return "RebuildTagIndexExecutor";
}

Status MUST_USE_RESULT prepare() override;

void execute() override;

private:
BuildTagIndexSentence *sentence_{nullptr};
RebuildTagIndexSentence *sentence_{nullptr};
};

} // namespace graph
} // namespace nebula

#endif // GRAPH_BUILDTAGINDEXEXECUTOR_H_
#endif // GRAPH_REBUILDTAGINDEXEXECUTOR_H_

116 changes: 101 additions & 15 deletions src/graph/ShowExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ void ShowExecutor::execute() {
case ShowSentence::ShowType::kShowCreateEdgeIndex:
showCreateEdgeIndex();
break;
case ShowSentence::ShowType::kShowTagIndexStatus:
showTagIndexStatus();
break;
case ShowSentence::ShowType::kShowEdgeIndexStatus:
showEdgeIndexStatus();
break;
case ShowSentence::ShowType::kShowSnapshots:
showSnapshots();
break;
Expand Down Expand Up @@ -342,22 +348,25 @@ void ShowExecutor::showTags() {
return;
}

std::unordered_set<TagID> tags;
auto value = std::move(resp).value();
resp_ = std::make_unique<cpp2::ExecutionResponse>();
std::vector<cpp2::RowValue> rows;
std::vector<std::string> header{"ID", "Name"};
resp_->set_column_names(std::move(header));

std::map<nebula::cpp2::TagID, std::string> tagItems;
for (auto &tag : value) {
tagItems.emplace(tag.get_tag_id(), tag.get_tag_name());
}
auto tagID = tag.get_tag_id();
auto iter = tags.find(tagID);
if (iter != tags.end()) {
continue;
}

for (auto &item : tagItems) {
tags.emplace(tagID);
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_integer(item.first);
row[1].set_str(item.second);
row[0].set_integer(tagID);
row[1].set_str(std::move(tag.get_tag_name()));
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
Expand Down Expand Up @@ -386,22 +395,25 @@ void ShowExecutor::showEdges() {
return;
}

std::unordered_set<TagID> edges;
auto value = std::move(resp).value();
resp_ = std::make_unique<cpp2::ExecutionResponse>();
std::vector<cpp2::RowValue> rows;
std::vector<std::string> header{"ID", "Name"};
resp_->set_column_names(std::move(header));

std::map<nebula::cpp2::EdgeType, std::string> edgeItems;
for (auto &edge : value) {
edgeItems.emplace(edge.get_edge_type(), edge.get_edge_name());
}
auto edgeType = edge.get_edge_type();
auto iter = edges.find(edgeType);
if (iter != edges.end()) {
continue;
}

for (auto &item : edgeItems) {
edges.emplace(edgeType);
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_integer(item.first);
row[1].set_str(item.second);
row[0].set_integer(edge.get_edge_type());
row[1].set_str(std::move(edge.get_edge_name()));
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
Expand Down Expand Up @@ -787,7 +799,7 @@ void ShowExecutor::showCreateTagIndex() {
auto& fields = indexItems.get_fields();
buf += indexItems.get_schema_name();
buf += "(";
for (auto column : fields) {
for (auto &column : fields) {
buf += column.name;
buf += ", ";
}
Expand Down Expand Up @@ -846,7 +858,7 @@ void ShowExecutor::showCreateEdgeIndex() {
auto& fields = indexItems.get_fields();
buf += indexItems.get_schema_name();
buf += "(";
for (auto column : fields) {
for (auto &column : fields) {
buf += column.name;
buf += ", ";
}
Expand All @@ -872,6 +884,80 @@ void ShowExecutor::showCreateEdgeIndex() {
std::move(future).via(runner).thenValue(cb).thenError(error);
}

void ShowExecutor::showTagIndexStatus() {
auto spaceId = ectx()->rctx()->session()->space();
auto future = ectx()->getMetaClient()->listTagIndexStatus(spaceId);
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
doError(std::move(resp).status());
return;
}

resp_ = std::make_unique<cpp2::ExecutionResponse>();
std::vector<std::string> header{"Name", "Tag Index Status"};
resp_->set_column_names(std::move(header));

std::vector<cpp2::RowValue> rows;
auto value = std::move(resp).value();
for (auto &status : value) {
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_str(std::move(status.get_name()));
row[1].set_str(std::move(status.get_status()));
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
resp_->set_rows(std::move(rows));
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
doError(Status::Error(folly::stringPrintf("Show tag index status exception : %s",
e.what().c_str())));
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}

void ShowExecutor::showEdgeIndexStatus() {
auto spaceId = ectx()->rctx()->session()->space();
auto future = ectx()->getMetaClient()->listEdgeIndexStatus(spaceId);
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
doError(std::move(resp).status());
return;
}

resp_ = std::make_unique<cpp2::ExecutionResponse>();
std::vector<std::string> header{"Name", "Edge Index Status"};
resp_->set_column_names(std::move(header));

std::vector<cpp2::RowValue> rows;
auto value = std::move(resp).value();
for (auto &status : value) {
std::vector<cpp2::ColumnValue> row;
row.resize(2);
row[0].set_str(std::move(status.get_name()));
row[1].set_str(std::move(status.get_status()));
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
resp_->set_rows(std::move(rows));
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
doError(Status::Error(folly::stringPrintf("Show edge index status exception : %s",
e.what().c_str())));
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}

void ShowExecutor::showSnapshots() {
auto future = ectx()->getMetaClient()->listSnapshots();
auto *runner = ectx()->rctx()->runner();
Expand Down Expand Up @@ -917,7 +1003,7 @@ void ShowExecutor::showSnapshots() {
auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
doError(Status::Error(folly::stringPrintf("Show snapshots exception : %s",
e.what().c_str())));
e.what().c_str())));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
Expand Down
3 changes: 3 additions & 0 deletions src/graph/ShowExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "base/Base.h"
#include "graph/Executor.h"
#include <unordered_set>

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -36,6 +37,8 @@ class ShowExecutor final : public Executor {
void showCreateEdge();
void showCreateTagIndex();
void showCreateEdgeIndex();
void showTagIndexStatus();
void showEdgeIndexStatus();
void showSnapshots();
void showCharset();
void showCollation();
Expand Down
Loading

0 comments on commit ba8b3f6

Please sign in to comment.