forked from vesoft-inc/nebula
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExecutor.h
126 lines (101 loc) · 3.79 KB
/
Executor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef GRAPH_EXECUTOR_H_
#define GRAPH_EXECUTOR_H_
#include "base/Base.h"
#include "base/Status.h"
#include "cpp/helpers.h"
#include "graph/ExecutionContext.h"
#include "gen-cpp2/common_types.h"
#include "gen-cpp2/storage_types.h"
#include "dataman/RowWriter.h"
#include "meta/SchemaManager.h"
#include "time/Duration.h"
#include "stats/Stats.h"
/**
* Executor is the interface of kinds of specific executors that do the actual execution.
*/
namespace nebula {
namespace graph {
class Executor : public cpp::NonCopyable, public cpp::NonMovable {
public:
explicit Executor(ExecutionContext *ectx, const std::string &statsName = "") {
ectx_ = ectx;
if (!statsName.empty()) {
stats_ = std::make_unique<stats::Stats>("graph", statsName);
}
}
virtual ~Executor() {}
/**
* Do some preparatory works, such as sanitize checking, dependency setup, etc.
*
* `prepare' succeeds only if all its sub-executors are prepared.
* `prepare' works in a synchronous way, once the executor is prepared, it will
* be executed.
*/
virtual Status MUST_USE_RESULT prepare() = 0;
virtual void execute() = 0;
virtual const char* name() const = 0;
enum ProcessControl : uint8_t {
kNext = 0,
kReturn,
};
/**
* Set callback to be invoked when this executor is finished(normally).
*/
void setOnFinish(std::function<void(ProcessControl)> onFinish) {
onFinish_ = onFinish;
}
/**
* When some error happens during an executor's execution, it should invoke its
* `onError_' with a Status that indicates the reason.
*
* An executor terminates its execution via invoking either `onFinish_' or `onError_',
* but should never call them both.
*/
void setOnError(std::function<void(Status)> onError) {
onError_ = onError;
}
/**
* Upon finished successfully, `setupResponse' would be invoked on the last executor.
* Any Executor implementation, which wants to send its meaningful result to the client,
* should override this method.
*/
virtual void setupResponse(cpp2::ExecutionResponse &resp) {
resp.set_error_code(cpp2::ErrorCode::SUCCEEDED);
}
ExecutionContext* ectx() const {
return ectx_;
}
const time::Duration& duration() const {
return duration_;
}
protected:
std::unique_ptr<Executor> makeExecutor(Sentence *sentence);
std::string valueTypeToString(nebula::cpp2::ValueType type);
Status writeVariantType(RowWriter &writer, const VariantType &value);
bool checkValueType(const nebula::cpp2::ValueType &type, const VariantType &value);
StatusOr<cpp2::ColumnValue> toColumnValue(const VariantType& value,
cpp2::ColumnValue::Type type) const;
OptVariantType toVariantType(const cpp2::ColumnValue& value) const;
Status checkIfGraphSpaceChosen() const {
if (ectx()->rctx()->session()->space() == -1) {
return Status::Error("Please choose a graph space with `USE spaceName' firstly");
}
return Status::OK();
}
void doError(Status status, uint32_t count = 1) const;
void doFinish(ProcessControl pro, uint32_t count = 1) const;
protected:
ExecutionContext *ectx_;
std::function<void(ProcessControl)> onFinish_;
std::function<void(Status)> onError_;
time::Duration duration_;
std::unique_ptr<stats::Stats> stats_;
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_EXECUTOR_H_