Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LazyInterpret for UserOpExpr #5544

Merged
merged 11 commits into from
Jul 20, 2021
100 changes: 64 additions & 36 deletions oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));

const std::string& op_name = op_conf.name();

// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl;

int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
Expand All @@ -113,7 +110,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten

CHECK_OR_RETURN(!outputs->at(0).get());
(*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true));
TensorNameScope::Global()->Record(outputs->at(0), op_name + "/" + obn);
TensorNameScope::Global()->Record(outputs->at(0), GenLogicalBlobName(op_conf.name(), obn));
return Maybe<void>::Ok();
}

Expand Down Expand Up @@ -149,11 +146,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));

const std::string& op_name = op_conf.name();

// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl;

int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
Expand All @@ -171,9 +165,9 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
CHECK_OR_RETURN(!outputs->at(0).get());
(*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true));
// NOTE(chengcheng): Record variable op output LazyTenosr
TensorNameScope::Global()->Record(outputs->at(0), op_name + "/" + obn);
TensorNameScope::Global()->Record(outputs->at(0), GenLogicalBlobName(op_conf.name(), obn));
// NOTE(chengcheng): Record EagerTensor as variable tensor name
TensorNameScope::Global()->Record(input_tensor, op_name + "/" + obn);
TensorNameScope::Global()->Record(input_tensor, GenLogicalBlobName(op_conf.name(), obn));
return Maybe<void>::Ok();
}

Expand Down Expand Up @@ -211,11 +205,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const T
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));

const std::string& op_name = op_conf.name();

// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl;

int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
Expand All @@ -239,26 +230,63 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const T
Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) const {
CHECK_EQ_OR_RETURN(inputs.size(), op_expr.input_size());
const auto& scope = JUST(GetCurrentScope());
auto op_conf = JUST(OpInterpUtil::GenBuiltinOpConf(op_expr, ctx.attrs));
int64_t symbol_id = JUST(scope->symbol_id());
op_conf->set_scope_symbol_id(symbol_id);
if (!op_conf->has_device_tag()) {
op_conf->set_device_tag(scope->device_parallel_desc_symbol()->device_tag());
}
// TODO(chengcheng): Handle special UserOp such as:
// 1. [Source UserOp] : OFRecordReader, CoinFlip
// 2. [Change Placement/ParallelDesc UserOp] : to/to_consistent/parallel_cast
// 3. [Multi-Inputs & Different ParallelDesc for each input UserOp] : like there are 2 inputs,
// one from CPU and the other from GPU.
// ..., etc.

const auto& scope = JUST(GetCurrentScope());
int64_t old_scope_symbol_id = JUST(scope->symbol_id());
// TODO(chengcheng): New parallel desc scope from all inputs tensors.
op_conf->set_scope_symbol_id(old_scope_symbol_id);

// NOTE(chengcheng):
// Normal UserOp inputs size >= 1 for infer parallel_desc.
// if inputs size == 0, need handle in SourceUserOp impl.
CHECK_GE_OR_RETURN(inputs.size(), 1);
const std::string device_tag = GetDeviceTagOfTensor(inputs.at(0));
op_conf->set_device_tag(device_tag);
for (int i = 0; i < inputs.size(); ++i) {
const auto& input_tensor = inputs.at(i);
CHECK_OR_RETURN(device_tag == GetDeviceTagOfTensor(input_tensor));
const std::string& ibn = op_expr.indexed_ibns().at(i);
const std::string& tensor_name = TensorNameScope::Global()->Lookup(inputs[i]);
ReplaceInputLbnInOpCustomizedConf(op_conf.get(), ibn, tensor_name);
// TODO(chengcheng): check inputs tensor placement equal, and create parallel scope? or set in
// python.
const std::string& lbn = TensorNameScope::Global()->Lookup(inputs[i]);
if (lbn.empty()) {
CHECK_OR_RETURN(input_tensor->is_eager()); // NOTE(chengcheng): lazy_tensor MUST has lbn.

// TODO(chengcheng):
// this is free EagerTensor which NOT captured by nn.Graph (inputs/params).
// Need Create a VariableOpConf for this inputs tensor, and Record name for itself.
UNIMPLEMENTED();
}
CHECK_OR_RETURN(!lbn.empty()); // NOTE(chengcheng): lbn must not empty now.
ReplaceInputLbnInOpCustomizedConf(op_conf.get(), ibn, lbn);
}
const auto& session = JUST(GetDefaultSession());
bool is_mirrored_strategy_enabled = JUST(session->IsMirroredStrategyEnabled());
const auto& op_attribute =
JUST(OpInterpUtil::AddOpAndInferOpAttribute(*op_conf, is_mirrored_strategy_enabled));
OpAttribute proto_op_attribute;
op_attribute->ToProto(&proto_op_attribute);

auto infer_ctx = JUST(GetCurInferCtx());
// NOTE(chengcheng): MUST reset unique op name before InferCtx::AddOp
const std::string new_op_name = *JUST(infer_ctx->NewUniqueOpNameByFunctionalOpConf(*op_conf));

// NOTE(chengcheng): for UserOp, NOT only reset op_name, but also the output values.
op_conf->set_name(new_op_name);
for (auto& pair : *(op_conf->mutable_user_conf()->mutable_output())) {
auto& list_s = pair.second;
for (int i = 0; i < list_s.s_size(); ++i) {
std::string old_lbn = list_s.s(i);
LogicalBlobId old_lbi = GenLogicalBlobId(old_lbn);
// NOTE(chengcheng): MUST change the old_lbn to new op name.
std::string new_lbn = GenLogicalBlobName(new_op_name, old_lbi.blob_name());
list_s.set_s(i, new_lbn);
}
}

// temp debug log
std::cout << "cclog: Lazy nn.Graph add UserOp: " << op_conf->DebugString() << std::endl;

OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(*op_conf));

int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(*op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
Expand All @@ -268,16 +296,16 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu
CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size());
for (int i = 0; i < op_expr.output_size(); ++i) {
const std::string& obn = op_expr.indexed_obns().at(i);
const auto& parallel_attr = JUST(
compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, proto_op_attribute, obn));
const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(proto_op_attribute, obn));
const auto& parallel_attr =
JUST(compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, op_attr, obn));
const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(op_attr, obn));
if (!(outputs->at(i).get())) {
(*outputs)[i] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true));
} else {
// TODO(hjchen2) Reset shape, dtype and so on.
// TODO(chengcheng, hjchen2) Reset shape, dtype and so on for InplaceUserOp.
UNIMPLEMENTED();
}
TensorNameScope::Global()->Record(outputs->at(i), op_expr.op_name() + "/" + obn);
TensorNameScope::Global()->Record(outputs->at(i), GenLogicalBlobName(new_op_name, obn));
}
return Maybe<void>::Ok();
}
Expand Down
5 changes: 1 addition & 4 deletions oneflow/core/job/job_build_and_infer_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1319,12 +1319,9 @@ Maybe<std::string> JobBuildAndInferCtx::NewUniqueOpNameByFunctionalOpConf(
} else {
op_type_name = "SystemOp";
}
std::string op_name = op_name_prefix + op_type_name + "-" + std::to_string(unique_op_name_index_);
std::string op_name = op_name_prefix + op_type_name + "_" + std::to_string(unique_op_name_index_);
++unique_op_name_index_;

// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
return op_name;
}

Expand Down
146 changes: 146 additions & 0 deletions oneflow/python/test/graph/test_user_op_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest

import numpy as np
import os

os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "12139"
os.environ["WORLD_SIZE"] = "1"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"

import oneflow
import oneflow.experimental as flow
import oneflow.python.framework.session_context as session_ctx
import oneflow._oneflow_internal
from oneflow.python.framework.multi_client_session import MultiClientSession
import oneflow.python.framework.c_api_util as c_api_util


@flow.unittest.skip_unless_1n1d()
class TestUserOpGraph(unittest.TestCase):
def test_user_op_graph(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(
oneflow.python.framework.env_util.HasAllMultiClientEnvVars()
)

x0 = flow.Tensor(20, 30)
weight0 = flow.Tensor(30, 50)
x1 = flow.Tensor(50, 70)

# NOTE(chengcheng): this tiny net is:
# x0 * weight0 -> out0
# relu(out0) -> y0
# y0 * x1 -> out1
# relu(out1) -> y1

flow.nn.init.uniform_(x0, a=-1.0, b=1.0)
flow.nn.init.uniform_(x1, a=-1.0, b=1.0)
flow.nn.init.uniform_(weight0, a=-1.0, b=1.0)

session = session_ctx.GetDefaultSession()
test_case.assertTrue(isinstance(session, MultiClientSession))
session.TryInit()

with oneflow._oneflow_internal.lazy_mode.gard(True):

oneflow._oneflow_internal.JobBuildAndInferCtx_Open(
"cc_test_user_op_expr_job"
)
job_conf = (
oneflow._oneflow_internal.oneflow.core.job.job_conf.JobConfigProto()
)
job_conf.set_job_name("cc_test_user_op_expr_job")
job_conf.mutable_predict_conf()
c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf)

# input_conf.set_in_0("EagerTensorInput")
# input_conf.set_out_0("out_0")

x0_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)
x0_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
"cc_Input_0", x0_conf, ["in_0"], ["out_0"]
)
x1_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)
x1_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
"cc_Input_1", x1_conf, ["in_0"], ["out_0"]
)
weight0_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedVariableOpConf()
)
weight0_op = oneflow._oneflow_internal.one.FeedVariableOpExpr(
"cc_Variable_0", weight0_conf, ["in_0"], ["out_0"]
)
output_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf()
)
output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr(
"cc_Output_0", output_conf, ["in_0"], ["out_0"]
)

attrs = oneflow._oneflow_internal.MutableCfgAttrMap()

if not x0.is_determined:
x0.determine()
x0_tensor_in_c = x0._local_or_consistent_tensor
if not x1.is_determined:
x1.determine()
x1_tensor_in_c = x1._local_or_consistent_tensor
if not weight0.is_determined:
weight0.determine()
weight0_tensor_in_c = weight0._local_or_consistent_tensor

x0_lazy_tensor = x0_op.apply([x0_tensor_in_c], attrs)[0]
x1_lazy_tensor = x1_op.apply([x1_tensor_in_c], attrs)[0]
weight0_lazy_tensor = weight0_op.apply([weight0_tensor_in_c], attrs)[0]

test_case.assertEqual(x0_lazy_tensor.shape, (20, 30))
test_case.assertTrue(x0_lazy_tensor.is_lazy)
test_case.assertEqual(weight0_lazy_tensor.shape, (30, 50))
test_case.assertTrue(weight0_lazy_tensor.is_lazy)
test_case.assertEqual(x1_lazy_tensor.shape, (50, 70))
test_case.assertTrue(x1_lazy_tensor.is_lazy)

out0 = flow.F.matmul(x0_lazy_tensor, weight0_lazy_tensor)
test_case.assertEqual(out0.shape, (20, 50))
test_case.assertTrue(out0.is_lazy)

y0 = flow.F.relu(out0)
test_case.assertEqual(y0.shape, (20, 50))
test_case.assertTrue(y0.is_lazy)

out1 = flow.F.matmul(y0, x1_lazy_tensor)
test_case.assertEqual(out1.shape, (20, 70))
test_case.assertTrue(out1.is_lazy)

y1 = flow.F.relu(out1)
test_case.assertEqual(y1.shape, (20, 70))
test_case.assertTrue(y1.is_lazy)

eager_output = output_op.apply([y1], attrs)[0]
test_case.assertEqual(eager_output.shape, (20, 70))
test_case.assertTrue(not eager_output.is_lazy)


if __name__ == "__main__":
unittest.main()