Skip to content

Commit 7fd1d38

Browse files
committed
Move input rows of Traverse and AppendVertices.
1 parent d6df28c commit 7fd1d38

File tree

5 files changed

+44
-5
lines changed

5 files changed

+44
-5
lines changed

src/graph/context/Iterator.h

+15-3
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class Iterator {
7373

7474
virtual const Row* row() const = 0;
7575

76+
virtual Row moveRow() = 0;
77+
7678
// erase range, no include last position, if last > size(), erase to the end
7779
// position
7880
virtual void eraseRange(size_t first, size_t last) = 0;
@@ -228,6 +230,11 @@ class DefaultIter final : public Iterator {
228230
return nullptr;
229231
}
230232

233+
Row moveRow() override {
234+
DLOG(FATAL) << "This method should not be invoked";
235+
return Row{};
236+
}
237+
231238
private:
232239
void doReset(size_t pos) override {
233240
DCHECK((pos == 0 && size() == 0) || (pos < size()));
@@ -318,6 +325,10 @@ class GetNeighborsIter final : public Iterator {
318325
return currentEdge_;
319326
}
320327

328+
Row moveRow() override {
329+
return std::move(*currentEdge_);
330+
}
331+
321332
private:
322333
void doReset(size_t pos) override {
323334
UNUSED(pos);
@@ -475,6 +486,10 @@ class SequentialIter : public Iterator {
475486

476487
Value getEdge() const override;
477488

489+
Row moveRow() override {
490+
return std::move(*iter_);
491+
}
492+
478493
protected:
479494
const Row* row() const override {
480495
return &*iter_;
@@ -484,9 +499,6 @@ class SequentialIter : public Iterator {
484499
friend class DataCollectExecutor;
485500
friend class AppendVerticesExecutor;
486501
friend class TraverseExecutor;
487-
Row&& moveRow() {
488-
return std::move(*iter_);
489-
}
490502

491503
void doReset(size_t pos) override;
492504

src/graph/executor/Executor.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,27 @@ void Executor::dropBody(const PlanNode *body) {
682682
}
683683
}
684684

685+
bool Executor::movable(const Variable *var) {
686+
// Only support input variables of current executor
687+
DCHECK(std::find(node_->inputVars().begin(), node_->inputVars().end(), DCHECK_NOTNULL(var)) !=
688+
node_->inputVars().end());
689+
// TODO support executor in loop
690+
if (node()->kind() == PlanNode::Kind::kLoop) {
691+
return false;
692+
}
693+
if (node()->loopLayers() != 0) {
694+
// The lifetime of loop body is managed by Loop node
695+
return false;
696+
}
697+
698+
if (node()->kind() == PlanNode::Kind::kSelect) {
699+
return false;
700+
}
701+
// Normal node
702+
// Make sure drop happened-after count decrement
703+
return var->userCount.load(std::memory_order_acquire) == 1;
704+
}
705+
685706
Status Executor::finish(Result &&result) {
686707
if (!FLAGS_enable_lifetime_optimize ||
687708
node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) {

src/graph/executor/Executor.h

+4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
9494
void drop(const PlanNode *node);
9595
void dropBody(const PlanNode *body);
9696

97+
// Check whether the variable is movable, it's movable when reach end of lifetime
98+
// This method shouldn't call after `finish` method!
99+
bool movable(const Variable *var);
100+
97101
// Store the result of this executor to execution context
98102
Status finish(Result &&result);
99103
// Store the default result which not used for later executor

src/graph/executor/query/AppendVerticesExecutor.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,13 @@ Status AppendVerticesExecutor::handleResp(
101101
}
102102

103103
auto *src = av->src();
104+
bool mv = movable(av->inputVars().front());
104105
for (; inputIter->valid(); inputIter->next()) {
105106
auto dstFound = map.find(src->eval(ctx(inputIter.get())));
106107
if (dstFound == map.end()) {
107108
continue;
108109
}
109-
Row row = *inputIter->row();
110+
Row row = mv ? inputIter->moveRow() : *inputIter->row();
110111
row.values.emplace_back(dstFound->second);
111112
ds.rows.emplace_back(std::move(row));
112113
}

src/graph/executor/query/TraverseExecutor.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ Status TraverseExecutor::buildRequestDataSet() {
4848
auto* src = traverse_->src();
4949
QueryExpressionContext ctx(ectx_);
5050

51+
bool mv = movable(traverse_->inputVars().front());
5152
for (; iter->valid(); iter->next()) {
5253
auto vid = src->eval(ctx(iter));
5354
if (!SchemaUtil::isValidVid(vid, vidType)) {
@@ -56,7 +57,7 @@ Status TraverseExecutor::buildRequestDataSet() {
5657
continue;
5758
}
5859
// Need copy here, Argument executor may depends on this variable.
59-
auto prePath = *iter->row();
60+
auto prePath = mv ? iter->moveRow() : *iter->row();
6061
buildPath(prev, vid, std::move(prePath));
6162
if (!uniqueSet.emplace(vid).second) {
6263
continue;

0 commit comments

Comments
 (0)