Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimizer path #4162

Merged
merged 3 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
if (++i == totalSize || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
Expand Down
208 changes: 156 additions & 52 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
})
.thenValue([this](auto&& resp) {
UNUSED(resp);
preLeftPaths_.swap(leftPaths_);
preRightPaths_.swap(rightPaths_);
preRightPaths_ = rightPaths_;
// update history
for (auto& iter : leftPaths_) {
historyLeftPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
std::make_move_iterator(iter.second.end()));
}
for (auto& iter : rightPaths_) {
historyRightPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
std::make_move_iterator(iter.second.end()));
}
leftPaths_.clear();
rightPaths_.clear();

step_++;
DataSet ds;
ds.colNames = pathNode_->colNames();
Expand All @@ -58,24 +67,40 @@ void MultiShortestPathExecutor::init() {
for (; rIter->valid(); rIter->next()) {
auto& vid = rIter->getColumn(0);
if (rightVids.emplace(vid).second) {
preRightPaths_[vid].push_back({Path(Vertex(vid, {}), {})});
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
historyRightPaths_[vid].emplace(vid, tmp);
preRightPaths_[vid].emplace(vid, std::move(tmp));
}
}

std::set<Value> leftVids;
for (; lIter->valid(); lIter->next()) {
auto& vid = lIter->getColumn(0);
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
historyLeftPaths_[vid].emplace(vid, std::move(tmp));
leftVids.emplace(vid);
}
for (const auto& leftVid : leftVids) {
for (const auto& rightVid : rightVids) {
if (leftVid != rightVid) {
terminationMap_.insert({leftVid, {rightVid, true}});
terminationMap_.emplace(leftVid, std::make_pair(rightVid, true));
}
}
}
}

std::vector<Path> MultiShortestPathExecutor::createPaths(const std::vector<Path>& paths,
const Edge& edge) {
std::vector<Path> newPaths;
newPaths.reserve(paths.size());
for (const auto& p : paths) {
Path path = p;
path.steps.emplace_back(Step(Vertex(edge.dst, {}), edge.type, edge.name, edge.ranking, {}));
newPaths.emplace_back(std::move(path));
}
return newPaths;
}

Status MultiShortestPathExecutor::buildPath(bool reverse) {
auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter()
: ectx_->getResult(pathNode_->leftInputVar()).iter();
Expand All @@ -96,10 +121,23 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
Path path;
path.src = Vertex(src, {});
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
currentPaths[dst].emplace_back(std::move(path));
auto foundDst = currentPaths.find(dst);
if (foundDst != currentPaths.end()) {
auto foundSrc = foundDst->second.find(src);
if (foundSrc != foundDst->second.end()) {
// same <src, dst>, different edge type or rank
foundSrc->second.emplace_back(std::move(path));
} else {
std::vector<Path> tmp({std::move(path)});
foundDst->second.emplace(src, std::move(tmp));
}
} else {
std::vector<Path> tmp({std::move(path)});
currentPaths[dst].emplace(src, std::move(tmp));
}
}
} else {
auto& historyPaths = reverse ? preRightPaths_ : preLeftPaths_;
auto& historyPaths = reverse ? historyRightPaths_ : historyLeftPaths_;
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
Expand All @@ -108,50 +146,93 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
auto& edge = edgeVal.getEdge();
auto& src = edge.src;
auto& dst = edge.dst;
for (const auto& histPath : historyPaths[src]) {
Path path = histPath;
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
if (path.hasDuplicateVertices()) {
continue;
auto& prePaths = historyPaths[src];

auto foundHistDst = historyPaths.find(dst);
if (foundHistDst == historyPaths.end()) {
// dst not in history
auto foundDst = currentPaths.find(dst);
if (foundDst == currentPaths.end()) {
// dst not in current, new edge
for (const auto& prePath : prePaths) {
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
}
} else {
// dst in current
for (const auto& prePath : prePaths) {
auto newPaths = createPaths(prePath.second, edge);
auto foundSrc = foundDst->second.find(prePath.first);
if (foundSrc == foundDst->second.end()) {
foundDst->second.emplace(prePath.first, std::move(newPaths));
} else {
foundSrc->second.insert(foundSrc->second.begin(),
std::make_move_iterator(newPaths.begin()),
std::make_move_iterator(newPaths.end()));
}
}
}
} else {
// dst in history
auto& historyDstPaths = foundHistDst->second;
for (const auto& prePath : prePaths) {
if (historyDstPaths.find(prePath.first) != historyDstPaths.end()) {
// loop: a->b->c->a or a->b->c->b,
// filter out path that with duplicate vertex or have already been found before
continue;
}
auto foundDst = currentPaths.find(dst);
if (foundDst == currentPaths.end()) {
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
} else {
auto newPaths = createPaths(prePath.second, edge);
auto foundSrc = foundDst->second.find(prePath.first);
if (foundSrc == foundDst->second.end()) {
foundDst->second.emplace(prePath.first, std::move(newPaths));
} else {
foundSrc->second.insert(foundSrc->second.begin(),
std::make_move_iterator(newPaths.begin()),
std::make_move_iterator(newPaths.end()));
}
}
}
currentPaths[dst].emplace_back(std::move(path));
}
}
}

// set nextVid
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
setNextStepVid(currentPaths, nextVidVar);
return Status::OK();
}

DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
Interims::iterator endIter,
bool oddStep) {
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
DataSet ds;
for (; startIter != endIter; ++startIter) {
auto found = rightPaths.find(startIter->first);
if (found == rightPaths.end()) {
continue;
}
for (const auto& lPath : startIter->second) {
const auto& srcVid = lPath.src.vid;
auto range = terminationMap_.equal_range(srcVid);
for (const auto& rPath : found->second) {
const auto& dstVid = rPath.src.vid;
for (auto iter = range.first; iter != range.second; ++iter) {
if (iter->second.first == dstVid) {
auto forwardPath = lPath;
auto backwardPath = rPath;
DataSet MultiShortestPathExecutor::doConjunct(
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
auto buildPaths =
[](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
for (const auto& leftPath : leftPaths) {
for (const auto& rightPath : rightPaths) {
auto forwardPath = leftPath;
auto backwardPath = rightPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
if (forwardPath.hasDuplicateVertices()) {
continue;
}
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
iter->second.second = false;
}
}
};

DataSet ds;
for (const auto& iter : iters) {
const auto& leftPaths = iter.first->second;
const auto& rightPaths = iter.second->second;
for (const auto& leftPath : leftPaths) {
auto range = terminationMap_.equal_range(leftPath.first);
for (const auto& rightPath : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == rightPath.first) {
buildPaths(leftPath.second, rightPath.second, ds);
found->second.second = false;
}
}
}
Expand All @@ -161,28 +242,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
}

folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
size_t batchSize = leftPaths_.size() / static_cast<size_t>(FLAGS_num_operator_threads);
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
size_t leftPathSize = leftPaths_.size();
size_t rightPathSize = rightPaths.size();
std::vector<folly::Future<DataSet>> futures;
size_t i = 0;
std::vector<std::pair<Interims::iterator, Interims::iterator>> pathIters;

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
auto endIter = leftIter;
endIter++;
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
futures.emplace_back(std::move(future));
i = 0;
startIter = endIter;
size_t i = 0;
if (leftPathSize > rightPathSize) {
size_t batchSize = leftPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
auto rightIter = rightPaths.find(leftIter->first);
if (rightIter == rightPaths.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
} else {
size_t batchSize = rightPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) {
auto leftIter = leftPaths_.find(rightIter->first);
if (leftIter == leftPaths_.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
}
if (i != 0) {
auto endIter = leftPaths_.end();
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
auto future =
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
}

Expand Down
13 changes: 8 additions & 5 deletions src/graph/executor/algo/MultiShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,27 @@ class MultiShortestPathExecutor final : public Executor {
folly::Future<Status> execute() override;

private:
// k: dst, v: paths to dst
using Interims = std::unordered_map<Value, std::vector<Path>>;
// key: dst, value: {key : src, value: paths}
using Interims = std::unordered_map<Value, std::unordered_map<Value, std::vector<Path>>>;

void init();
std::vector<Path> createPaths(const std::vector<Path>& paths, const Edge& edge);
Status buildPath(bool reverse);
folly::Future<bool> conjunctPath(bool oddStep);
DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep);
DataSet doConjunct(const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters);
void setNextStepVid(const Interims& paths, const string& var);

private:
const MultiShortestPath* pathNode_{nullptr};
size_t step_{1};
std::string terminationVar_;
// {src, <dst, true>}
std::unordered_multimap<Value, std::pair<Value, bool>> terminationMap_;
Interims leftPaths_;
Interims preLeftPaths_;
Interims preRightPaths_;
Interims rightPaths_;
Interims preRightPaths_;
Interims historyLeftPaths_;
Interims historyRightPaths_;
DataSet currentDs_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/algo/ProduceAllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
if (++i == batchSize) {
auto endIter = leftIter;
endIter++;
auto oddStepFuture = folly::via(
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/test/FindPathTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ TEST_F(FindPathTest, multiSourceShortestPath) {
{
DataSet expectLeftVid;
expectLeftVid.colNames = {nebula::kVid};
for (const auto& vid : {"a", "b", "c", "f", "g"}) {
for (const auto& vid : {"b", "f", "g"}) {
Row row;
row.values.emplace_back(vid);
expectLeftVid.rows.emplace_back(std::move(row));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ DEFINE_int32(num_netio_threads,
"The number of networking threads, 0 for number of physical CPU cores");
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator");
DEFINE_int32(num_operator_threads, 2, "Number of threads to execute a single operator");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
DEFINE_string(listen_netdev, "any", "The network device to listen on");
Expand Down