Skip to content

Commit 02bdc67

Browse files
authored
optimizer path (#4162)
* optimizer multi-shortest path * new algorithm * fix error
1 parent d6df28c commit 02bdc67

6 files changed

+168
-62
lines changed

src/graph/executor/algo/BFSShortestPathExecutor.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,12 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
135135
std::vector<folly::Future<DataSet>> futures;
136136
for (auto& vid : meetVids) {
137137
batchVids.push_back(vid);
138-
if (i == totalSize - 1 || batchVids.size() == batchSize) {
138+
if (++i == totalSize || batchVids.size() == batchSize) {
139139
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
140140
return doConjunct(vids, oddStep);
141141
});
142142
futures.emplace_back(std::move(future));
143143
}
144-
i++;
145144
}
146145

147146
return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {

src/graph/executor/algo/MultiShortestPathExecutor.cpp

+156-52
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,19 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
3939
})
4040
.thenValue([this](auto&& resp) {
4141
UNUSED(resp);
42-
preLeftPaths_.swap(leftPaths_);
43-
preRightPaths_.swap(rightPaths_);
42+
preRightPaths_ = rightPaths_;
43+
// update history
44+
for (auto& iter : leftPaths_) {
45+
historyLeftPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
46+
std::make_move_iterator(iter.second.end()));
47+
}
48+
for (auto& iter : rightPaths_) {
49+
historyRightPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
50+
std::make_move_iterator(iter.second.end()));
51+
}
4452
leftPaths_.clear();
4553
rightPaths_.clear();
54+
4655
step_++;
4756
DataSet ds;
4857
ds.colNames = pathNode_->colNames();
@@ -58,24 +67,40 @@ void MultiShortestPathExecutor::init() {
5867
for (; rIter->valid(); rIter->next()) {
5968
auto& vid = rIter->getColumn(0);
6069
if (rightVids.emplace(vid).second) {
61-
preRightPaths_[vid].push_back({Path(Vertex(vid, {}), {})});
70+
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
71+
historyRightPaths_[vid].emplace(vid, tmp);
72+
preRightPaths_[vid].emplace(vid, std::move(tmp));
6273
}
6374
}
6475

6576
std::set<Value> leftVids;
6677
for (; lIter->valid(); lIter->next()) {
6778
auto& vid = lIter->getColumn(0);
79+
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
80+
historyLeftPaths_[vid].emplace(vid, std::move(tmp));
6881
leftVids.emplace(vid);
6982
}
7083
for (const auto& leftVid : leftVids) {
7184
for (const auto& rightVid : rightVids) {
7285
if (leftVid != rightVid) {
73-
terminationMap_.insert({leftVid, {rightVid, true}});
86+
terminationMap_.emplace(leftVid, std::make_pair(rightVid, true));
7487
}
7588
}
7689
}
7790
}
7891

92+
std::vector<Path> MultiShortestPathExecutor::createPaths(const std::vector<Path>& paths,
93+
const Edge& edge) {
94+
std::vector<Path> newPaths;
95+
newPaths.reserve(paths.size());
96+
for (const auto& p : paths) {
97+
Path path = p;
98+
path.steps.emplace_back(Step(Vertex(edge.dst, {}), edge.type, edge.name, edge.ranking, {}));
99+
newPaths.emplace_back(std::move(path));
100+
}
101+
return newPaths;
102+
}
103+
79104
Status MultiShortestPathExecutor::buildPath(bool reverse) {
80105
auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter()
81106
: ectx_->getResult(pathNode_->leftInputVar()).iter();
@@ -96,10 +121,23 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
96121
Path path;
97122
path.src = Vertex(src, {});
98123
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
99-
currentPaths[dst].emplace_back(std::move(path));
124+
auto foundDst = currentPaths.find(dst);
125+
if (foundDst != currentPaths.end()) {
126+
auto foundSrc = foundDst->second.find(src);
127+
if (foundSrc != foundDst->second.end()) {
128+
// same <src, dst>, different edge type or rank
129+
foundSrc->second.emplace_back(std::move(path));
130+
} else {
131+
std::vector<Path> tmp({std::move(path)});
132+
foundDst->second.emplace(src, std::move(tmp));
133+
}
134+
} else {
135+
std::vector<Path> tmp({std::move(path)});
136+
currentPaths[dst].emplace(src, std::move(tmp));
137+
}
100138
}
101139
} else {
102-
auto& historyPaths = reverse ? preRightPaths_ : preLeftPaths_;
140+
auto& historyPaths = reverse ? historyRightPaths_ : historyLeftPaths_;
103141
for (; iter->valid(); iter->next()) {
104142
auto edgeVal = iter->getEdge();
105143
if (UNLIKELY(!edgeVal.isEdge())) {
@@ -108,50 +146,93 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
108146
auto& edge = edgeVal.getEdge();
109147
auto& src = edge.src;
110148
auto& dst = edge.dst;
111-
for (const auto& histPath : historyPaths[src]) {
112-
Path path = histPath;
113-
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
114-
if (path.hasDuplicateVertices()) {
115-
continue;
149+
auto& prePaths = historyPaths[src];
150+
151+
auto foundHistDst = historyPaths.find(dst);
152+
if (foundHistDst == historyPaths.end()) {
153+
// dst not in history
154+
auto foundDst = currentPaths.find(dst);
155+
if (foundDst == currentPaths.end()) {
156+
// dst not in current, new edge
157+
for (const auto& prePath : prePaths) {
158+
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
159+
}
160+
} else {
161+
// dst in current
162+
for (const auto& prePath : prePaths) {
163+
auto newPaths = createPaths(prePath.second, edge);
164+
auto foundSrc = foundDst->second.find(prePath.first);
165+
if (foundSrc == foundDst->second.end()) {
166+
foundDst->second.emplace(prePath.first, std::move(newPaths));
167+
} else {
168+
foundSrc->second.insert(foundSrc->second.begin(),
169+
std::make_move_iterator(newPaths.begin()),
170+
std::make_move_iterator(newPaths.end()));
171+
}
172+
}
173+
}
174+
} else {
175+
// dst in history
176+
auto& historyDstPaths = foundHistDst->second;
177+
for (const auto& prePath : prePaths) {
178+
if (historyDstPaths.find(prePath.first) != historyDstPaths.end()) {
179+
// loop: a->b->c->a or a->b->c->b,
180+
// filter out path that with duplicate vertex or have already been found before
181+
continue;
182+
}
183+
auto foundDst = currentPaths.find(dst);
184+
if (foundDst == currentPaths.end()) {
185+
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
186+
} else {
187+
auto newPaths = createPaths(prePath.second, edge);
188+
auto foundSrc = foundDst->second.find(prePath.first);
189+
if (foundSrc == foundDst->second.end()) {
190+
foundDst->second.emplace(prePath.first, std::move(newPaths));
191+
} else {
192+
foundSrc->second.insert(foundSrc->second.begin(),
193+
std::make_move_iterator(newPaths.begin()),
194+
std::make_move_iterator(newPaths.end()));
195+
}
196+
}
116197
}
117-
currentPaths[dst].emplace_back(std::move(path));
118198
}
119199
}
120200
}
201+
121202
// set nextVid
122203
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
123204
setNextStepVid(currentPaths, nextVidVar);
124205
return Status::OK();
125206
}
126207

127-
DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
128-
Interims::iterator endIter,
129-
bool oddStep) {
130-
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
131-
DataSet ds;
132-
for (; startIter != endIter; ++startIter) {
133-
auto found = rightPaths.find(startIter->first);
134-
if (found == rightPaths.end()) {
135-
continue;
136-
}
137-
for (const auto& lPath : startIter->second) {
138-
const auto& srcVid = lPath.src.vid;
139-
auto range = terminationMap_.equal_range(srcVid);
140-
for (const auto& rPath : found->second) {
141-
const auto& dstVid = rPath.src.vid;
142-
for (auto iter = range.first; iter != range.second; ++iter) {
143-
if (iter->second.first == dstVid) {
144-
auto forwardPath = lPath;
145-
auto backwardPath = rPath;
208+
DataSet MultiShortestPathExecutor::doConjunct(
209+
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
210+
auto buildPaths =
211+
[](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
212+
for (const auto& leftPath : leftPaths) {
213+
for (const auto& rightPath : rightPaths) {
214+
auto forwardPath = leftPath;
215+
auto backwardPath = rightPath;
146216
backwardPath.reverse();
147217
forwardPath.append(std::move(backwardPath));
148-
if (forwardPath.hasDuplicateVertices()) {
149-
continue;
150-
}
151218
Row row;
152219
row.values.emplace_back(std::move(forwardPath));
153220
ds.rows.emplace_back(std::move(row));
154-
iter->second.second = false;
221+
}
222+
}
223+
};
224+
225+
DataSet ds;
226+
for (const auto& iter : iters) {
227+
const auto& leftPaths = iter.first->second;
228+
const auto& rightPaths = iter.second->second;
229+
for (const auto& leftPath : leftPaths) {
230+
auto range = terminationMap_.equal_range(leftPath.first);
231+
for (const auto& rightPath : rightPaths) {
232+
for (auto found = range.first; found != range.second; ++found) {
233+
if (found->second.first == rightPath.first) {
234+
buildPaths(leftPath.second, rightPath.second, ds);
235+
found->second.second = false;
155236
}
156237
}
157238
}
@@ -161,28 +242,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
161242
}
162243

163244
folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
164-
size_t batchSize = leftPaths_.size() / static_cast<size_t>(FLAGS_num_operator_threads);
245+
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
246+
size_t leftPathSize = leftPaths_.size();
247+
size_t rightPathSize = rightPaths.size();
165248
std::vector<folly::Future<DataSet>> futures;
166-
size_t i = 0;
249+
std::vector<std::pair<Interims::iterator, Interims::iterator>> pathIters;
167250

168-
auto startIter = leftPaths_.begin();
169-
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
170-
if (i++ == batchSize) {
171-
auto endIter = leftIter;
172-
endIter++;
173-
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
174-
return doConjunct(startIter, endIter, oddStep);
175-
});
176-
futures.emplace_back(std::move(future));
177-
i = 0;
178-
startIter = endIter;
251+
size_t i = 0;
252+
if (leftPathSize > rightPathSize) {
253+
size_t batchSize = leftPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
254+
pathIters.reserve(batchSize);
255+
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
256+
auto rightIter = rightPaths.find(leftIter->first);
257+
if (rightIter == rightPaths.end()) {
258+
continue;
259+
}
260+
pathIters.emplace_back(leftIter, rightIter);
261+
if (++i == batchSize) {
262+
auto future = folly::via(
263+
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
264+
futures.emplace_back(std::move(future));
265+
pathIters.reserve(batchSize);
266+
i = 0;
267+
}
268+
}
269+
} else {
270+
size_t batchSize = rightPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
271+
pathIters.reserve(batchSize);
272+
for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) {
273+
auto leftIter = leftPaths_.find(rightIter->first);
274+
if (leftIter == leftPaths_.end()) {
275+
continue;
276+
}
277+
pathIters.emplace_back(leftIter, rightIter);
278+
if (++i == batchSize) {
279+
auto future = folly::via(
280+
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
281+
futures.emplace_back(std::move(future));
282+
pathIters.reserve(batchSize);
283+
i = 0;
284+
}
179285
}
180286
}
181287
if (i != 0) {
182-
auto endIter = leftPaths_.end();
183-
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
184-
return doConjunct(startIter, endIter, oddStep);
185-
});
288+
auto future =
289+
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
186290
futures.emplace_back(std::move(future));
187291
}
188292

src/graph/executor/algo/MultiShortestPathExecutor.h

+8-5
Original file line numberDiff line numberDiff line change
@@ -57,24 +57,27 @@ class MultiShortestPathExecutor final : public Executor {
5757
folly::Future<Status> execute() override;
5858

5959
private:
60-
// k: dst, v: paths to dst
61-
using Interims = std::unordered_map<Value, std::vector<Path>>;
60+
// key: dst, value: {key : src, value: paths}
61+
using Interims = std::unordered_map<Value, std::unordered_map<Value, std::vector<Path>>>;
6262

6363
void init();
64+
std::vector<Path> createPaths(const std::vector<Path>& paths, const Edge& edge);
6465
Status buildPath(bool reverse);
6566
folly::Future<bool> conjunctPath(bool oddStep);
66-
DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep);
67+
DataSet doConjunct(const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters);
6768
void setNextStepVid(const Interims& paths, const string& var);
6869

6970
private:
7071
const MultiShortestPath* pathNode_{nullptr};
7172
size_t step_{1};
7273
std::string terminationVar_;
74+
// {src, <dst, true>}
7375
std::unordered_multimap<Value, std::pair<Value, bool>> terminationMap_;
7476
Interims leftPaths_;
75-
Interims preLeftPaths_;
76-
Interims preRightPaths_;
7777
Interims rightPaths_;
78+
Interims preRightPaths_;
79+
Interims historyLeftPaths_;
80+
Interims historyRightPaths_;
7881
DataSet currentDs_;
7982
};
8083

src/graph/executor/algo/ProduceAllPathsExecutor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {
132132

133133
auto startIter = leftPaths_.begin();
134134
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
135-
if (i++ == batchSize) {
135+
if (++i == batchSize) {
136136
auto endIter = leftIter;
137137
endIter++;
138138
auto oddStepFuture = folly::via(

src/graph/executor/test/FindPathTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ TEST_F(FindPathTest, multiSourceShortestPath) {
710710
{
711711
DataSet expectLeftVid;
712712
expectLeftVid.colNames = {nebula::kVid};
713-
for (const auto& vid : {"a", "b", "c", "f", "g"}) {
713+
for (const auto& vid : {"b", "f", "g"}) {
714714
Row row;
715715
row.values.emplace_back(vid);
716716
expectLeftVid.rows.emplace_back(std::move(row));

src/graph/service/GraphFlags.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ DEFINE_int32(num_netio_threads,
1818
"The number of networking threads, 0 for number of physical CPU cores");
1919
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
2020
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
21-
DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator");
21+
DEFINE_int32(num_operator_threads, 2, "Number of threads to execute a single operator");
2222
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
2323
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
2424
DEFINE_string(listen_netdev, "any", "The network device to listen on");

0 commit comments

Comments
 (0)