Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add_eager_boxing_and_op_interpreter_dispatch_error_info #5819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
88b8538
add_eager_boxing_and_op_interpreter_dispatch_error_info
clackhan Aug 10, 2021
aadfbb8
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
clackhan Aug 10, 2021
b92e393
refine
clackhan Aug 10, 2021
ddd9a9e
refine
clackhan Aug 10, 2021
ae0cfaf
refine
clackhan Aug 10, 2021
53e321a
Merge branch 'add_eager_boxing_and_op_interpreter_dispatch_error_info…
clackhan Aug 10, 2021
0266945
use ErrorString4Inputs
clackhan Aug 10, 2021
82ee3b0
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
clackhan Aug 10, 2021
59e78f4
auto format by CI
oneflow-ci-bot Aug 10, 2021
3861c37
Merge branch 'master' of https://github.com/Oneflow-Inc/oneflow into …
clackhan Aug 12, 2021
2fc0ad8
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
5322db7
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
306f62b
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
e9bd68f
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
4fbd737
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
90dfeac
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
ef0a7b2
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
8d00cfb
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
2878c22
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 12, 2021
401b415
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 13, 2021
6af5f18
Merge branch 'master' into add_eager_boxing_and_op_interpreter_dispat…
oneflow-ci-bot Aug 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#include "oneflow/core/common/constant.h"
#include "oneflow/core/common/decorator.h"
#include "oneflow/core/common/container_util.h"
#include "oneflow/core/job/sbp_parallel.h"
#include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.h"
#include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h"
#include "oneflow/core/framework/op_interpreter/boxing/collective_boxing_interpreter.h"
Expand Down Expand Up @@ -49,6 +50,18 @@ Maybe<Symbol<cfg::SbpParallel>> MakePartialSumSbpParallel() {
return SymbolOf(partial_sum_sbp);
}

std::string GetSupportedBoxingTypeInfo() {
static std::string supported_boxing_type_info =
"============ Supported eager boxing type============\n"
"\'[S(0)] -> [B]\' on GPU\n"
"\'[S(0)] -> [P]\' on GPU\n"
"\'[P] -> [B]\' on GPU\n"
"\'[P] -> [S(0)]\' on GPU\n"
"\'[B] -> [S(0)]\' on GPU\n"
"\'[B] -> [P]\' on GPU or CPU";
return supported_boxing_type_info;
}

Maybe<EagerBoxingInterpreter> GetOneDimNcclCollectiveEagerBoxingInterpreter(
Symbol<cfg::ParallelDistribution> in_nd_sbp, Symbol<cfg::ParallelDistribution> out_nd_sbp) {
static SbpPair2EagerBoxingInterpreter sbp_pair2eager_boxing_interpreter = {
Expand All @@ -63,8 +76,15 @@ Maybe<EagerBoxingInterpreter> GetOneDimNcclCollectiveEagerBoxingInterpreter(
{{*JUST(GetSplitSbpParallel(0)), *JUST(MakePartialSumSbpParallel())}, // S(0) -> P
std::make_shared<NcclS2PBoxingInterpreter>()},
};
return JUST(MapAt(sbp_pair2eager_boxing_interpreter,
std::make_pair(in_nd_sbp->sbp_parallel(0), out_nd_sbp->sbp_parallel(0))));
const auto& key = std::make_pair(in_nd_sbp->sbp_parallel(0), out_nd_sbp->sbp_parallel(0));
CHECK_OR_RETURN(sbp_pair2eager_boxing_interpreter.find(key)
!= sbp_pair2eager_boxing_interpreter.end())
<< "Eager boxing type \'" << ParallelDistributionToString(in_nd_sbp) << " -> "
<< ParallelDistributionToString(out_nd_sbp) << "\'"
<< " not support yet\n"
<< GetSupportedBoxingTypeInfo();

return JUST(MapAt(sbp_pair2eager_boxing_interpreter, key));
}

Maybe<EagerBoxingInterpreter> GetBoxingInterpreter(Symbol<cfg::ParallelDistribution> in_nd_sbp,
Expand All @@ -86,13 +106,22 @@ Maybe<EagerBoxingInterpreter> GetBoxingInterpreter(Symbol<cfg::ParallelDistribut
} else if (in_parallel_desc->device_type() == DeviceType::kGPU) {
return GetOneDimNcclCollectiveEagerBoxingInterpreter(in_nd_sbp, out_nd_sbp);
} else {
UNIMPLEMENTED_THEN_RETURN();
UNIMPLEMENTED_THEN_RETURN()
<< "Eager boxing type \'" << ParallelDistributionToString(in_nd_sbp) << " -> "
<< ParallelDistributionToString(out_nd_sbp) << "\'"
<< " not support yet\n"
<< GetSupportedBoxingTypeInfo();
}
} else {
UNIMPLEMENTED_THEN_RETURN();
UNIMPLEMENTED_THEN_RETURN() << "Eager boxing with different placement not support yet\n"
<< GetSupportedBoxingTypeInfo();
}
} else {
UNIMPLEMENTED_THEN_RETURN();
UNIMPLEMENTED_THEN_RETURN() << "N-dim eager boxing type \'"
<< ParallelDistributionToString(in_nd_sbp) << " -> "
<< ParallelDistributionToString(out_nd_sbp) << "\'"
<< " not support yet\n"
<< GetSupportedBoxingTypeInfo();
}
}

Expand Down
49 changes: 38 additions & 11 deletions oneflow/core/framework/op_interpreter/op_interpreter_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,25 @@ std::shared_ptr<AutogradInterpreter> BuildLazyInterpreter() {
return std::make_shared<AutogradInterpreter>(internal);
}

Maybe<AutogradInterpreter> GetInterpreter(const TensorTuple& inputs,
const OpExprInterpContext& ctx) {
std::string ErrorString4Inputs(const TensorTuple& inputs, const OpExpr& op_expr) {
std::stringstream error_str;
error_str << "Got input tensors with inconsistent attributes!\n"
<< "op_type_name: " << op_expr.op_type_name() << "\n"
<< "attributes of inputs is:\n";
int32_t idx = 0;
for (const auto& tensor : inputs) {
if (tensor->is_local()) {
error_str << "local";
} else {
error_str << "consistent";
}
if (++idx != inputs.size()) { error_str << ", "; }
}
return error_str.str();
}

Maybe<AutogradInterpreter> GetInterpreter(const TensorTuple& inputs, const OpExprInterpContext& ctx,
const OpExpr& op_expr) {
static const auto& g_lazy_interpreter = BuildLazyInterpreter();
static const auto& g_eager_consistent_interpreter = BuildEagerInterpreter(/*is_mirrored=*/false);
static const auto& g_eager_mirrored_interpreter = BuildEagerInterpreter(/*is_mirrored=*/true);
Expand All @@ -68,24 +85,34 @@ Maybe<AutogradInterpreter> GetInterpreter(const TensorTuple& inputs,
if (inputs.size() == 1) {
// do nothing
} else if (inputs.size() == 2) {
CHECK_OR_RETURN(inputs.at(1)->is_consistent()); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(1)->is_consistent())
<< ErrorString4Inputs(inputs, op_expr); // unroll loop for efficiency
} else if (inputs.size() == 3) {
CHECK_OR_RETURN(inputs.at(1)->is_consistent()); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(2)->is_consistent()); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(1)->is_consistent())
<< ErrorString4Inputs(inputs, op_expr); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(2)->is_consistent())
<< ErrorString4Inputs(inputs, op_expr); // unroll loop for efficiency
} else {
for (const auto& tensor : inputs) { CHECK_OR_RETURN(tensor->is_consistent()); }
for (const auto& tensor : inputs) {
CHECK_OR_RETURN(tensor->is_consistent()) << ErrorString4Inputs(inputs, op_expr);
}
}
return g_eager_consistent_interpreter;
} else {
if (inputs.size() == 1) {
// do nothing
} else if (inputs.size() == 2) {
CHECK_OR_RETURN(inputs.at(1)->is_local()); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(1)->is_local())
<< ErrorString4Inputs(inputs, op_expr); // unroll loop for efficiency
} else if (inputs.size() == 3) {
CHECK_OR_RETURN(inputs.at(1)->is_local()); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(2)->is_local()); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(1)->is_local())
<< ErrorString4Inputs(inputs, op_expr); // unroll loop for efficiency
CHECK_OR_RETURN(inputs.at(2)->is_local())
<< ErrorString4Inputs(inputs, op_expr); // unroll loop for efficiency
} else {
for (const auto& tensor : inputs) { CHECK_OR_RETURN(tensor->is_local()); }
for (const auto& tensor : inputs) {
CHECK_OR_RETURN(tensor->is_local()) << ErrorString4Inputs(inputs, op_expr);
}
}
return g_eager_mirrored_interpreter;
}
Expand Down Expand Up @@ -115,7 +142,7 @@ template<>
/* static */ Maybe<void> OpInterpUtil::Dispatch(const OpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs,
const OpExprInterpContext& ctx) {
return JUST(GetInterpreter(inputs, ctx))->Apply(op_expr, inputs, outputs, ctx);
return JUST(GetInterpreter(inputs, ctx, op_expr))->Apply(op_expr, inputs, outputs, ctx);
}

/* static */ Maybe<cfg::OpAttribute> OpInterpUtil::AddOpAndInferOpAttribute(
Expand Down
18 changes: 18 additions & 0 deletions oneflow/core/job/sbp_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ std::string SbpParallelToString(const cfg::SbpParallel& sbp_parallel) {
return sbp_str;
}

std::string ParallelDistributionToString(const Symbol<cfg::ParallelDistribution> nd_sbp) {
static HashMap<Symbol<cfg::ParallelDistribution>, std::string>* nd_sbp2str =
new HashMap<Symbol<cfg::ParallelDistribution>, std::string>();
auto iter = nd_sbp2str->find(nd_sbp);
if (iter == nd_sbp2str->end()) {
std::stringstream nd_sbp_str;
nd_sbp_str << "[";
int32_t idx = 0;
for (const auto& sbp_parallel : nd_sbp->sbp_parallel()) {
nd_sbp_str << SbpParallelToString(sbp_parallel);
if (++idx != nd_sbp->sbp_parallel_size()) { nd_sbp_str << ", "; }
}
nd_sbp_str << "]";
iter = nd_sbp2str->emplace(nd_sbp, nd_sbp_str.str()).first;
}
return iter->second;
}

void SbpSignatureToParallelDistributionSignature(
const cfg::SbpSignature& sbp_signature, cfg::ParallelDistributionSignature* nd_sbp_signature) {
for (const auto& pair : sbp_signature.bn_in_op2sbp_parallel()) {
Expand Down
1 change: 1 addition & 0 deletions oneflow/core/job/sbp_parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void SortSbpSignatureListByCopyCost(
bool IsValidSbpParallelString(const std::string& sbp_str);
bool ParseSbpParallelFromString(const std::string& sbp_str, cfg::SbpParallel* sbp_parallel);
std::string SbpParallelToString(const cfg::SbpParallel& sbp_parallel);
std::string ParallelDistributionToString(const Symbol<cfg::ParallelDistribution> nd_sbp);

void SbpSignatureToParallelDistributionSignature(
const cfg::SbpSignature& sbp_signature, cfg::ParallelDistributionSignature* nd_sbp_signature);
Expand Down