Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimizer path #4162

Merged
merged 3 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
optimizer multi-shortest path
  • Loading branch information
nevermore3 committed Apr 18, 2022
commit 026844692774c58d1e65dcced24303ca87f9f267
128 changes: 82 additions & 46 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void MultiShortestPathExecutor::init() {
for (const auto& leftVid : leftVids) {
for (const auto& rightVid : rightVids) {
if (leftVid != rightVid) {
terminationMap_.insert({leftVid, {rightVid, true}});
terminationMap_.emplace(std::make_pair(leftVid, rightVid), true);
}
}
}
Expand Down Expand Up @@ -124,35 +124,48 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
return Status::OK();
}

DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
Interims::iterator endIter,
bool oddStep) {
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
DataSet MultiShortestPathExecutor::doConjunct(
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
DataSet ds;
for (; startIter != endIter; ++startIter) {
auto found = rightPaths.find(startIter->first);
if (found == rightPaths.end()) {
continue;
}
for (const auto& lPath : startIter->second) {
const auto& srcVid = lPath.src.vid;
auto range = terminationMap_.equal_range(srcVid);
for (const auto& rPath : found->second) {
const auto& dstVid = rPath.src.vid;
for (auto iter = range.first; iter != range.second; ++iter) {
if (iter->second.first == dstVid) {
auto forwardPath = lPath;
auto backwardPath = rPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
if (forwardPath.hasDuplicateVertices()) {
continue;
}
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
iter->second.second = false;
for (const auto& iter : iters) {
const auto& leftPaths = iter.first->second;
const auto& rightPaths = iter.second->second;
if (leftPaths.size() < rightPaths.size()) {
for (const auto& leftPath : leftPaths) {
const auto& srcVid = leftPath.src.vid;
for (const auto& rightPath : rightPaths) {
const auto& dstVid = rightPath.src.vid;
auto found = terminationMap_.find({srcVid, dstVid});
if (found == terminationMap_.end()) {
continue;
}
auto forwardPath = leftPath;
auto backwardPath = rightPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
found->second = false;
}
}
} else {
for (const auto& rightPath : rightPaths) {
const auto& dstVid = rightPath.src.vid;
for (const auto& leftPath : leftPaths) {
const auto& srcVid = leftPath.src.vid;
auto found = terminationMap_.find({srcVid, dstVid});
if (found == terminationMap_.end()) {
continue;
}
auto forwardPath = leftPath;
auto backwardPath = rightPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
found->second = false;
}
}
}
Expand All @@ -161,28 +174,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
}

folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
size_t batchSize = leftPaths_.size() / static_cast<size_t>(FLAGS_num_operator_threads);
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
size_t leftPathSize = leftPaths_.size();
size_t rightPathSize = rightPaths.size();
std::vector<folly::Future<DataSet>> futures;
size_t i = 0;
std::vector<std::pair<Interims::iterator, Interims::iterator>> pathIters;

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
auto endIter = leftIter;
endIter++;
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
futures.emplace_back(std::move(future));
i = 0;
startIter = endIter;
size_t i = 0;
if (leftPathSize > rightPathSize) {
size_t batchSize = leftPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
auto rightIter = rightPaths.find(leftIter->first);
if (rightIter == rightPaths.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
} else {
size_t batchSize = rightPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) {
auto leftIter = leftPaths_.find(rightIter->first);
if (leftIter == leftPaths_.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
}
if (i != 0) {
auto endIter = leftPaths_.end();
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
auto future =
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
}

Expand All @@ -192,7 +228,7 @@ folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
}

for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) {
if (!iter->second.second) {
if (!iter->second) {
iter = terminationMap_.erase(iter);
} else {
++iter;
Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/algo/MultiShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ class MultiShortestPathExecutor final : public Executor {
void init();
Status buildPath(bool reverse);
folly::Future<bool> conjunctPath(bool oddStep);
DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep);
DataSet doConjunct(const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters);
void setNextStepVid(const Interims& paths, const string& var);

private:
const MultiShortestPath* pathNode_{nullptr};
size_t step_{1};
std::string terminationVar_;
std::unordered_multimap<Value, std::pair<Value, bool>> terminationMap_;
std::unordered_map<std::pair<Value, Value>, bool> terminationMap_;
Interims leftPaths_;
Interims preLeftPaths_;
Interims preRightPaths_;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ DEFINE_int32(num_netio_threads,
"The number of networking threads, 0 for number of physical CPU cores");
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator");
DEFINE_int32(num_operator_threads, 2, "Number of threads to execute a single operator");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
DEFINE_string(listen_netdev, "any", "The network device to listen on");
Expand Down