Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoParallel mainstem algorithm add mutable_op_ctrl_edge #8033

Merged
merged 2 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions oneflow/core/auto_parallel/sbp_constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ Maybe<void> SbpConstructor::DumpNdSbpSignatureForJob(const OpGraph& op_graph, Jo
op_graph.ForEachNode([&](const OpNode* node) -> void {
SbpNode<NdSbpSignature>* sbp_node = op_name2sbp_node_[node->op().op_name()];
// Update NdSbpSignature
// sbp_node->FinalSbpSignature()->ToProto(
// &(*job->mutable_job_parallel_view_conf()
// ->mutable_op_name2nd_sbp_signature_conf())[node->op().op_name()]);
(*job->mutable_job_parallel_view_conf()
->mutable_op_name2nd_sbp_signature_conf())[node->op().op_name()]
.CopyFrom(*sbp_node->FinalSbpSignature());
Expand Down Expand Up @@ -291,8 +288,9 @@ Maybe<void> SbpConstructor::InitCopyCost(const OpGraph& op_graph) {
}

Maybe<void> SbpConstructor::ApplyMainstemAlgo() {
auto op_node2mutable_op_ctrl_deps = JUST(GetMutableOpCtrlDeps(*op_graph_));
// Compute layer number for each node
int32_t max_MinLayer = sbp_graph_.ComputeLayer(op_name2sbp_node_);
int32_t max_MinLayer = sbp_graph_.ComputeLayer(op_name2sbp_node_, *op_node2mutable_op_ctrl_deps);
// Accumulate cost on the mainstem after initializing computation cost
sbp_graph_.FindMainstem(max_MinLayer, op_name2sbp_node_);
return Maybe<void>::Ok();
Expand Down Expand Up @@ -351,6 +349,52 @@ Maybe<void> SbpConstructor::CheckSbpAgreement(const Job& job) {
return Maybe<void>::Ok();
}

Maybe<HashMap<const OpNode*, HashSet<std::string>>> SbpConstructor::GetMutableOpCtrlDeps(
const OpGraph& op_graph) {
auto IsMutableConsumedLbi = [](const Operator& op, const LogicalBlobId& lbi) -> bool {
for (const std::string& bn : op.input_bns()) {
if (op.BnInOp2Lbi(bn) == lbi && op.InputBlobModifier4Ibn(bn).is_mutable()) { return true; }
}
return false;
};
auto IsReachable = op_graph.MakePredicatorIsOpNameDataOrCtrlReachable();
HashMap<const OpNode*, HashSet<std::string>> op_node2ctrl_in_op_names;
JUST(op_graph.MaybeForEachNode([&](OpNode* op_node) -> Maybe<void> {
if (op_node->op().op_conf().has_variable_conf() == false) { return Maybe<void>::Ok(); }
if (op_node->out_edges().size() <= 1) { return Maybe<void>::Ok(); }
const Operator& variable_op = op_node->op();
const LogicalBlobId& variable_lbi = variable_op.BnInOp2Lbi(variable_op.SoleObn());
const OpNode* mutable_consumer = nullptr;
std::vector<const OperatorConf*> naive_consumers;
naive_consumers.reserve(op_node->out_edges().size());
for (OpEdge* edge : op_node->out_edges()) {
const auto& op_conf = edge->dst_node()->op().op_conf();
if (IsMutableConsumedLbi(edge->dst_node()->op(), variable_lbi)) {
CHECK_OR_RETURN(mutable_consumer == nullptr);
mutable_consumer = edge->dst_node();
} else {
naive_consumers.emplace_back(&op_conf);
}
}
if (mutable_consumer == nullptr) { return Maybe<void>::Ok(); }
for (const auto* fw_bw_op : naive_consumers) {
op_node2ctrl_in_op_names[mutable_consumer].insert(fw_bw_op->name());
}
return Maybe<void>::Ok();
}));
// Filter ctrl edges if all ctrl_in_op_names are reachable
HashMap<const OpNode*, HashSet<std::string>> filter_op_ctrl_deps;
for (const auto& pair : op_node2ctrl_in_op_names) {
const OpNode* op_node = pair.first;
for (const auto& fw_bw_op_name : pair.second) {
if (!IsReachable(fw_bw_op_name, op_node->op().op_name())) {
filter_op_ctrl_deps[op_node].insert(fw_bw_op_name);
}
}
}
return filter_op_ctrl_deps;
}

// Print the graph with SBP in order
void SbpConstructor::PrintSBPGraphDebugInfo() {
// sbp constructor information
Expand Down
5 changes: 4 additions & 1 deletion oneflow/core/auto_parallel/sbp_constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class SbpConstructor final {
use_sbp_collector_(!Global<ResourceDesc, ForSession>::Get()
->resource()
.disable_group_boxing_by_dst_parallel()
&& job->job_conf().enable_auto_parallel_sbp_collector()) {
&& job->job_conf().enable_auto_parallel_sbp_collector()),
op_graph_(&op_graph) {
sbp_graph_.SetWaitTime(job->job_conf().auto_parallel_wait_time());
sbp_graph_.SetTransferCost(job->job_conf().auto_parallel_transfer_cost());
CHECK_JUST(Init(op_graph, job));
Expand All @@ -61,13 +62,15 @@ class SbpConstructor final {
Maybe<void> InitComputationCost(const OpGraph& op_graph);
Maybe<void> InitCopyCost(const OpGraph& op_graph);
Maybe<void> ApplyMainstemAlgo();
Maybe<HashMap<const OpNode*, HashSet<std::string>>> GetMutableOpCtrlDeps(const OpGraph& op_graph);
// Load logical blob ids onto sbp edges
void LoadLbi2SbpEdge(const OpGraph& op_graph);

double cost_ratio_;
bool enable_mainstem_algo_;
bool use_sbp_collector_;
SbpGraph<NdSbpSignature> sbp_graph_;
const OpGraph* op_graph_;
HashMap<std::string, SbpNode<NdSbpSignature>*> op_name2sbp_node_;
};

Expand Down
16 changes: 12 additions & 4 deletions oneflow/core/auto_parallel/sbp_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class SbpGraph {
void DetectAdjustOverlap(double CostRatio);

// Compute the minimum and maximum layer of each node in the graph
int32_t ComputeLayer(oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node);
int32_t ComputeLayer(oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node,
const oneflow::HashMap<const OpNode*, oneflow::HashSet<std::string>>&
op_node2mutable_op_ctrl_deps);

// Find the mianstem of the sbp graph, then reduce the wait time for tributaries
void FindMainstem(int32_t max_MinLayer,
Expand Down Expand Up @@ -912,16 +914,22 @@ void SbpGraph<SbpSignature>::DetectAdjustOverlap(double CostRatio) {
// Compute the minimum and maximum layer of each node in the graph
template<class SbpSignature>
int32_t SbpGraph<SbpSignature>::ComputeLayer(
oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node) {
oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node,
const oneflow::HashMap<const OpNode*, oneflow::HashSet<std::string>>&
op_node2mutable_op_ctrl_deps) {
// Compute minimum layer
for (SbpNode<SbpSignature>* this_node : NodeList) { this_node->GetMinLayer(op_name2sbp_node); }
for (SbpNode<SbpSignature>* this_node : NodeList) {
this_node->GetMinLayer(op_name2sbp_node, op_node2mutable_op_ctrl_deps);
}
// Find the largest minimum layer
int32_t max_MinLayer = -1;
for (SbpNode<SbpSignature>* this_node : NodeList) {
if (max_MinLayer < this_node->MinLayer) { max_MinLayer = this_node->MinLayer; }
}
// Compute maximum layer
for (SbpNode<SbpSignature>* this_node : NodeList) { this_node->SpreadMaxLayer(op_name2sbp_node); }
for (SbpNode<SbpSignature>* this_node : NodeList) {
this_node->SpreadMaxLayer(op_name2sbp_node, op_node2mutable_op_ctrl_deps);
}
for (SbpNode<SbpSignature>* this_node : NodeList) { this_node->LiftMaxLayer(max_MinLayer); }
return max_MinLayer;
}
Expand Down
38 changes: 32 additions & 6 deletions oneflow/core/auto_parallel/sbp_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ class SbpNode {
void DetectSpreadOverlap(double overlap_ratio);

// Get or compute the minimum layer of this node
int32_t GetMinLayer(oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node);
int32_t GetMinLayer(oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node,
const oneflow::HashMap<const OpNode*, oneflow::HashSet<std::string>>&
op_node2mutable_op_ctrl_deps);
// Spread the minimum layer to compute the maximum layer of producers
void SpreadMaxLayer(oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node);
void SpreadMaxLayer(oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node,
const oneflow::HashMap<const OpNode*, oneflow::HashSet<std::string>>&
op_node2mutable_op_ctrl_deps);
// Drop down the maximum layer with the minimum layer form consumer
void DropMaxLayer(int32_t upper_bound);
// Set MaxLayer = MinLayer if this node does not have any consumer
Expand Down Expand Up @@ -681,27 +685,43 @@ void SbpNode<SbpSignature>::DetectSpreadOverlap(double overlap_ratio) {
// Get or compute the minimum layer of this node
template<class SbpSignature>
int32_t SbpNode<SbpSignature>::GetMinLayer(
oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node) {
oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node,
const oneflow::HashMap<const OpNode*, oneflow::HashSet<std::string>>&
op_node2mutable_op_ctrl_deps) {
if (MinLayer >= 0) { return MinLayer; }
if (!op_node) { return MinLayer; }
for (SbpEdge<SbpSignature>* this_edge : EdgesIn) {
int32_t producer_min_layer = this_edge->StartNode->GetMinLayer(op_name2sbp_node);
int32_t producer_min_layer =
this_edge->StartNode->GetMinLayer(op_name2sbp_node, op_node2mutable_op_ctrl_deps);
if (producer_min_layer > MinLayer) { MinLayer = producer_min_layer; }
}
for (const auto& ctrl_in_op_name : op_node->op().op_conf().ctrl_in_op_name()) {
auto it = op_name2sbp_node.find(ctrl_in_op_name);
if (it != op_name2sbp_node.end()) {
int32_t producer_min_layer = it->second->GetMinLayer(op_name2sbp_node);
int32_t producer_min_layer =
it->second->GetMinLayer(op_name2sbp_node, op_node2mutable_op_ctrl_deps);
if (producer_min_layer > MinLayer) { MinLayer = producer_min_layer; }
}
}
if (op_node2mutable_op_ctrl_deps.find(op_node) != op_node2mutable_op_ctrl_deps.end()) {
for (const auto& ctrl_in_op_name : op_node2mutable_op_ctrl_deps.at(op_node)) {
auto it = op_name2sbp_node.find(ctrl_in_op_name);
if (it != op_name2sbp_node.end()) {
int32_t producer_min_layer =
it->second->GetMinLayer(op_name2sbp_node, op_node2mutable_op_ctrl_deps);
if (producer_min_layer > MinLayer) { MinLayer = producer_min_layer; }
}
}
}
return ++MinLayer;
}

// Spread the minimum layer to compute the maximum layer of producers
template<class SbpSignature>
void SbpNode<SbpSignature>::SpreadMaxLayer(
oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node) {
oneflow::HashMap<std::string, SbpNode<SbpSignature>*>& op_name2sbp_node,
const oneflow::HashMap<const OpNode*, oneflow::HashSet<std::string>>&
op_node2mutable_op_ctrl_deps) {
if (MinLayer <= 0) { return; }
int32_t producer_max_lay = MinLayer - 1;
for (SbpEdge<SbpSignature>* this_edge : EdgesIn) {
Expand All @@ -711,6 +731,12 @@ void SbpNode<SbpSignature>::SpreadMaxLayer(
auto it = op_name2sbp_node.find(ctrl_in_op_name);
if (it != op_name2sbp_node.end()) { it->second->DropMaxLayer(producer_max_lay); }
}
if (op_node2mutable_op_ctrl_deps.find(op_node) != op_node2mutable_op_ctrl_deps.end()) {
for (const auto& ctrl_in_op_name : op_node2mutable_op_ctrl_deps.at(op_node)) {
auto it = op_name2sbp_node.find(ctrl_in_op_name);
if (it != op_name2sbp_node.end()) { it->second->DropMaxLayer(producer_max_lay); }
}
}
}

// Drop down the maximum layer with the minimum layer form consumer
Expand Down