forked from vesoft-inc/nebula
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathShortestPathExecutor.cpp
62 lines (56 loc) · 2.19 KB
/
ShortestPathExecutor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.
#include "graph/executor/algo/ShortestPathExecutor.h"
#include "graph/executor/algo/BatchShortestPath.h"
#include "graph/executor/algo/SingleShortestPath.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/SchemaUtil.h"
#include "sys/sysinfo.h"
using nebula::storage::StorageClient;
DEFINE_uint32(num_path_thread, 0, "number of concurrent threads when do shortest path");
namespace nebula {
namespace graph {
folly::Future<Status> ShortestPathExecutor::execute() {
SCOPED_TIMER(&execTime_);
if (FLAGS_num_path_thread == 0) {
FLAGS_num_path_thread = get_nprocs_conf();
}
DataSet result;
result.colNames = pathNode_->colNames();
HashSet startVids;
HashSet endVids;
size_t rowSize = checkInput(startVids, endVids);
std::unique_ptr<ShortestPathBase> pathPtr = nullptr;
if (rowSize <= FLAGS_num_path_thread) {
pathPtr = std::make_unique<SingleShortestPath>(pathNode_, qctx_, &otherStats_);
} else {
pathPtr = std::make_unique<BatchShortestPath>(pathNode_, qctx_, &otherStats_);
}
auto status = pathPtr->execute(startVids, endVids, &result).get();
NG_RETURN_IF_ERROR(status);
return finish(ResultBuilder().value(Value(std::move(result))).build());
}
size_t ShortestPathExecutor::checkInput(HashSet& startVids, HashSet& endVids) {
auto iter = ectx_->getResult(pathNode_->inputVar()).iter();
const auto& vidType = *(qctx()->rctx()->session()->space().spaceDesc.vid_type_ref());
for (; iter->valid(); iter->next()) {
auto start = iter->getColumn(0);
auto end = iter->getColumn(1);
if (!SchemaUtil::isValidVid(start, vidType) || !SchemaUtil::isValidVid(end, vidType)) {
LOG(ERROR) << "Mismatched shortestPath vid type. start type : " << start.type()
<< ", end type: " << end.type()
<< ", space vid type: " << SchemaUtil::typeToString(vidType);
continue;
}
if (start == end) {
// continue or return error
continue;
}
startVids.emplace(std::move(start));
endVids.emplace(std::move(end));
}
return startVids.size() * endVids.size();
}
} // namespace graph
} // namespace nebula