Skip to content

Commit 10e3d0c

Browse files
committed
optimizer multi-shortest path
1 parent 2e3b01a commit 10e3d0c

File tree

3 files changed

+85
-49
lines changed

3 files changed

+85
-49
lines changed

src/graph/executor/algo/MultiShortestPathExecutor.cpp

+82-46
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ void MultiShortestPathExecutor::init() {
7070
for (const auto& leftVid : leftVids) {
7171
for (const auto& rightVid : rightVids) {
7272
if (leftVid != rightVid) {
73-
terminationMap_.insert({leftVid, {rightVid, true}});
73+
terminationMap_.emplace(std::make_pair(leftVid, rightVid), true);
7474
}
7575
}
7676
}
@@ -124,35 +124,48 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
124124
return Status::OK();
125125
}
126126

127-
DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
128-
Interims::iterator endIter,
129-
bool oddStep) {
130-
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
127+
DataSet MultiShortestPathExecutor::doConjunct(
128+
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
131129
DataSet ds;
132-
for (; startIter != endIter; ++startIter) {
133-
auto found = rightPaths.find(startIter->first);
134-
if (found == rightPaths.end()) {
135-
continue;
136-
}
137-
for (const auto& lPath : startIter->second) {
138-
const auto& srcVid = lPath.src.vid;
139-
auto range = terminationMap_.equal_range(srcVid);
140-
for (const auto& rPath : found->second) {
141-
const auto& dstVid = rPath.src.vid;
142-
for (auto iter = range.first; iter != range.second; ++iter) {
143-
if (iter->second.first == dstVid) {
144-
auto forwardPath = lPath;
145-
auto backwardPath = rPath;
146-
backwardPath.reverse();
147-
forwardPath.append(std::move(backwardPath));
148-
if (forwardPath.hasDuplicateVertices()) {
149-
continue;
150-
}
151-
Row row;
152-
row.values.emplace_back(std::move(forwardPath));
153-
ds.rows.emplace_back(std::move(row));
154-
iter->second.second = false;
130+
for (const auto& iter : iters) {
131+
const auto& leftPaths = iter.first->second;
132+
const auto& rightPaths = iter.second->second;
133+
if (leftPaths.size() < rightPaths.size()) {
134+
for (const auto& leftPath : leftPaths) {
135+
const auto& srcVid = leftPath.src.vid;
136+
for (const auto& rightPath : rightPaths) {
137+
const auto& dstVid = rightPath.src.vid;
138+
auto found = terminationMap_.find({srcVid, dstVid});
139+
if (found == terminationMap_.end()) {
140+
continue;
141+
}
142+
auto forwardPath = leftPath;
143+
auto backwardPath = rightPath;
144+
backwardPath.reverse();
145+
forwardPath.append(std::move(backwardPath));
146+
Row row;
147+
row.values.emplace_back(std::move(forwardPath));
148+
ds.rows.emplace_back(std::move(row));
149+
found->second = false;
150+
}
151+
}
152+
} else {
153+
for (const auto& rightPath : rightPaths) {
154+
const auto& dstVid = rightPath.src.vid;
155+
for (const auto& leftPath : leftPaths) {
156+
const auto& srcVid = leftPath.src.vid;
157+
auto found = terminationMap_.find({srcVid, dstVid});
158+
if (found == terminationMap_.end()) {
159+
continue;
155160
}
161+
auto forwardPath = leftPath;
162+
auto backwardPath = rightPath;
163+
backwardPath.reverse();
164+
forwardPath.append(std::move(backwardPath));
165+
Row row;
166+
row.values.emplace_back(std::move(forwardPath));
167+
ds.rows.emplace_back(std::move(row));
168+
found->second = false;
156169
}
157170
}
158171
}
@@ -161,28 +174,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
161174
}
162175

163176
folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
164-
size_t batchSize = leftPaths_.size() / static_cast<size_t>(FLAGS_num_operator_threads);
177+
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
178+
size_t leftPathSize = leftPaths_.size();
179+
size_t rightPathSize = rightPaths.size();
165180
std::vector<folly::Future<DataSet>> futures;
166-
size_t i = 0;
181+
std::vector<std::pair<Interims::iterator, Interims::iterator>> pathIters;
167182

168-
auto startIter = leftPaths_.begin();
169-
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
170-
if (i++ == batchSize) {
171-
auto endIter = leftIter;
172-
endIter++;
173-
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
174-
return doConjunct(startIter, endIter, oddStep);
175-
});
176-
futures.emplace_back(std::move(future));
177-
i = 0;
178-
startIter = endIter;
183+
size_t i = 0;
184+
if (leftPathSize > rightPathSize) {
185+
size_t batchSize = leftPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
186+
pathIters.reserve(batchSize);
187+
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
188+
auto rightIter = rightPaths.find(leftIter->first);
189+
if (rightIter == rightPaths.end()) {
190+
continue;
191+
}
192+
pathIters.emplace_back(leftIter, rightIter);
193+
if (++i == batchSize) {
194+
auto future = folly::via(
195+
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
196+
futures.emplace_back(std::move(future));
197+
pathIters.reserve(batchSize);
198+
i = 0;
199+
}
200+
}
201+
} else {
202+
size_t batchSize = rightPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
203+
pathIters.reserve(batchSize);
204+
for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) {
205+
auto leftIter = leftPaths_.find(rightIter->first);
206+
if (leftIter == leftPaths_.end()) {
207+
continue;
208+
}
209+
pathIters.emplace_back(leftIter, rightIter);
210+
if (++i == batchSize) {
211+
auto future = folly::via(
212+
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
213+
futures.emplace_back(std::move(future));
214+
pathIters.reserve(batchSize);
215+
i = 0;
216+
}
179217
}
180218
}
181219
if (i != 0) {
182-
auto endIter = leftPaths_.end();
183-
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
184-
return doConjunct(startIter, endIter, oddStep);
185-
});
220+
auto future =
221+
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
186222
futures.emplace_back(std::move(future));
187223
}
188224

@@ -192,7 +228,7 @@ folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
192228
}
193229

194230
for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) {
195-
if (!iter->second.second) {
231+
if (!iter->second) {
196232
iter = terminationMap_.erase(iter);
197233
} else {
198234
++iter;

src/graph/executor/algo/MultiShortestPathExecutor.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ class MultiShortestPathExecutor final : public Executor {
6363
void init();
6464
Status buildPath(bool reverse);
6565
folly::Future<bool> conjunctPath(bool oddStep);
66-
DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep);
66+
DataSet doConjunct(const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters);
6767
void setNextStepVid(const Interims& paths, const string& var);
6868

6969
private:
7070
const MultiShortestPath* pathNode_{nullptr};
7171
size_t step_{1};
7272
std::string terminationVar_;
73-
std::unordered_multimap<Value, std::pair<Value, bool>> terminationMap_;
73+
std::unordered_map<std::pair<Value, Value>, bool> terminationMap_;
7474
Interims leftPaths_;
7575
Interims preLeftPaths_;
7676
Interims preRightPaths_;

src/graph/service/GraphFlags.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ DEFINE_int32(num_netio_threads,
1818
"The number of networking threads, 0 for number of physical CPU cores");
1919
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
2020
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
21-
DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator");
21+
DEFINE_int32(num_operator_threads, 2, "Number of threads to execute a single operator");
2222
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
2323
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
2424
DEFINE_string(listen_netdev, "any", "The network device to listen on");

0 commit comments

Comments
 (0)