Skip to content

Commit

Permalink
Support stats of all executor in graphd (#1451)
Browse files Browse the repository at this point in the history
* Support all stats in graphd

* Modify log level

* address yixinglu's comment

Co-authored-by: dutor <440396+dutor@users.noreply.github.com>
  • Loading branch information
laura-ding and dutor committed Dec 23, 2019
1 parent f3f5cbd commit e574db3
Show file tree
Hide file tree
Showing 57 changed files with 370 additions and 619 deletions.
11 changes: 7 additions & 4 deletions src/common/stats/StatsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ int32_t StatsManager::registerStats(folly::StringPiece counterName) {
auto& sm = get();

std::string name = counterName.toString();
folly::RWSpinLock::WriteHolder wh(sm.nameMapLock_);
auto it = sm.nameMap_.find(name);
if (it != sm.nameMap_.end()) {
LOG(INFO) << "The counter \"" << name << "\" already exists";
VLOG(2) << "The counter \"" << name << "\" already exists";
return it->second;
}

Expand All @@ -66,15 +67,14 @@ int32_t StatsManager::registerHisto(folly::StringPiece counterName,
StatsManager::VT bucketSize,
StatsManager::VT min,
StatsManager::VT max) {
LOG(INFO) << "registerHisto, bucketSize: " << bucketSize
<< ", min: " << min << ", max: " << max;
using std::chrono::seconds;

auto& sm = get();
std::string name = counterName.toString();
folly::RWSpinLock::WriteHolder wh(sm.nameMapLock_);
auto it = sm.nameMap_.find(name);
if (it != sm.nameMap_.end()) {
LOG(ERROR) << "The counter \"" << name << "\" already exists";
VLOG(2) << "The counter \"" << name << "\" already exists";
return it->second;
}

Expand All @@ -89,6 +89,9 @@ int32_t StatsManager::registerHisto(folly::StringPiece counterName,
StatsType(60, {seconds(60), seconds(600), seconds(3600)}))));
int32_t index = - sm.histograms_.size();
sm.nameMap_[name] = index;

LOG(INFO) << "registerHisto, bucketSize: " << bucketSize
<< ", min: " << min << ", max: " << max;
return index;
}

Expand Down
1 change: 1 addition & 0 deletions src/common/stats/StatsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class StatsManager final {
// <counter_name> => index
// when index > 0, (index - 1) is the index of stats_ list
// when index < 0, [- (index + 1)] is the index of histograms_ list
folly::RWSpinLock nameMapLock_;
std::unordered_map<std::string, int32_t> nameMap_;

// All time series stats
Expand Down
14 changes: 6 additions & 8 deletions src/graph/AlterEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace nebula {
namespace graph {

AlterEdgeExecutor::AlterEdgeExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
ExecutionContext *ectx)
: Executor(ectx, "alter_edge") {
sentence_ = static_cast<AlterEdgeSentence*>(sentence);
}

Expand All @@ -39,8 +40,7 @@ Status AlterEdgeExecutor::getSchema() {
void AlterEdgeExecutor::execute() {
auto status = getSchema();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
doError(std::move(status));
return;
}
auto *mc = ectx()->getMetaClient();
Expand All @@ -51,18 +51,16 @@ void AlterEdgeExecutor::execute() {
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
doError(resp.status());
return;
}

DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal error"));
};

std::move(future).via(runner).thenValue(cb).thenError(error);
Expand Down
14 changes: 6 additions & 8 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace nebula {
namespace graph {

AlterTagExecutor::AlterTagExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
ExecutionContext *ectx)
: Executor(ectx, "alter_tag") {
sentence_ = static_cast<AlterTagSentence*>(sentence);
}

Expand All @@ -39,8 +40,7 @@ Status AlterTagExecutor::getSchema() {
void AlterTagExecutor::execute() {
auto status = getSchema();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
doError(std::move(status));
return;
}

Expand All @@ -52,18 +52,16 @@ void AlterTagExecutor::execute() {
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
doError(resp.status());
return;
}

DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal error"));
};

std::move(future).via(runner).thenValue(cb).thenError(error);
Expand Down
12 changes: 5 additions & 7 deletions src/graph/AssignmentExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace graph {


AssignmentExecutor::AssignmentExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
ExecutionContext *ectx)
: Executor(ectx, "assignment") {
sentence_ = static_cast<AssignmentSentence*>(sentence);
}

Expand All @@ -24,12 +25,10 @@ Status AssignmentExecutor::prepare() {
executor_ = TraverseExecutor::makeTraverseExecutor(sentence_->sentence(), ectx());

auto onError = [this] (Status s) {
DCHECK(onError_);
onError_(std::move(s));
doError(std::move(s));
};
auto onFinish = [this] (Executor::ProcessControl ctr) {
DCHECK(onFinish_);
onFinish_(ctr);
doFinish(ctr);
};
auto onResult = [this] (std::unique_ptr<InterimResult> result) {
ectx()->variableHolder()->add(*var_, std::move(result));
Expand All @@ -52,8 +51,7 @@ Status AssignmentExecutor::prepare() {
void AssignmentExecutor::execute() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
doError(std::move(status));
return;
}
executor_->execute();
Expand Down
35 changes: 13 additions & 22 deletions src/graph/BalanceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace nebula {
namespace graph {

BalanceExecutor::BalanceExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
ExecutionContext *ectx)
: Executor(ectx, "balance") {
sentence_ = static_cast<BalanceSentence*>(sentence);
}

Expand All @@ -34,7 +35,7 @@ void BalanceExecutor::execute() {
showBalancePlan();
break;
case BalanceSentence::SubType::kUnknown:
onError_(Status::Error("Type unknown"));
doError(Status::Error("Type unknown"));
break;
}
}
Expand All @@ -45,24 +46,20 @@ void BalanceExecutor::balanceLeader() {

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
doError(std::move(resp).status());
return;
}
auto ret = std::move(resp).value();
if (!ret) {
DCHECK(onError_);
onError_(Status::Error("Balance leader failed"));
doError(Status::Error("Balance leader failed"));
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal error"));
return;
};

Expand All @@ -81,8 +78,7 @@ void BalanceExecutor::balanceData(bool isStop) {

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
doError(std::move(resp).status());
return;
}
auto balanceId = std::move(resp).value();
Expand All @@ -100,14 +96,12 @@ void BalanceExecutor::balanceData(bool isStop) {

resp_->set_rows(std::move(rows));

DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal error"));
return;
};

Expand All @@ -120,8 +114,7 @@ void BalanceExecutor::showBalancePlan() {
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
doError(std::move(resp).status());
return;
}
auto tasks = std::move(resp).value();
Expand Down Expand Up @@ -171,14 +164,12 @@ void BalanceExecutor::showBalancePlan() {
rows.emplace_back();
rows.back().set_columns(std::move(row));
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
doFinish(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal error"));
return;
};

Expand Down
Loading

0 comments on commit e574db3

Please sign in to comment.