Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nn.Graph optimizer part 2: add L2, pass job complete, refactor #5604

Merged
merged 21 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
var_conf->mutable_initializer()->mutable_empty_conf();
JUST(GenVariableOpConfParallelDistributionStringByTensor(var_conf, input_tensor));
if (!input_tensor->requires_grad()) { var_conf->set_trainable(false); }
// TODO(chengcheng, xuxiaoyu): Set L1/L2 RegularizerConf by nn.Graph Optimizer
if (input_tensor->requires_grad()) {
double l2 = JUST(ctx.attrs.GetAttr<double>("l2"));
var_conf->mutable_regularizer()->mutable_l1_l2_conf()->set_l2(l2);
}

auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
Expand Down
37 changes: 31 additions & 6 deletions python/oneflow/framework/graph_build_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
from google.protobuf import text_format

import oneflow._oneflow_internal
from oneflow._oneflow_internal.oneflow.core.framework import (
user_op_attr as user_op_attr_cfg,
)
import oneflow.core.job.scope_pb2 as scope_pb2_util
import oneflow.framework.attr_util as attr_util
import oneflow.framework.c_api_util as c_api_util
import oneflow.framework.placement_util as placement_util
import oneflow.framework.scope_util as scope_util
import oneflow.framework.session_context as session_context
from oneflow._oneflow_internal import Tensor as InternalTensor
from oneflow.framework.tensor import Tensor

lazy_mode = oneflow._oneflow_internal.lazy_mode
Expand All @@ -33,7 +35,14 @@
@contextmanager
def graph_build_context(config_proto, session):
prev_scope = oneflow._oneflow_internal.GetCurrentScope()
new_scope = scope_util.MakeInitialScope(config_proto, "cpu", ["0:0"], None, False)
new_scope = scope_util.MakeInitialScope(
config_proto,
"cpu", # NOTE(chengcheng): graph init scope is useless, just set cpu 0:0 for test.
["0:0"],
None, # TODO(): set hierarchy from user graph config
False, # is_mirrored
)

with lazy_mode.gard(True):
with JobBuildAndInferCtx(config_proto):
with BlockScopeContext(prev_scope, new_scope):
Expand Down Expand Up @@ -65,6 +74,7 @@ def __init__(self, prev_scope, new_scope):
self._new_scope = new_scope

def __enter__(self):
assert oneflow._oneflow_internal.GetCurrentScope() is self._prev_scope
oneflow._oneflow_internal.GlobalScopeStackPush(self._new_scope)

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -80,21 +90,25 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def make_new_block_scope(prev_scope, block):
assert prev_scope is not None
assert block is not None

attr_dict = dict()
if block.config.stage_id is not None:
attr_dict["pipeline_stage_id_hint"] = block.config.stage_id
if block.config.activation_checkpointing is not None:
attr_dict["checkpointing"] = block.config.activation_checkpointing

name2default = session_context.GetDefaultSession().scope_attr_name2default_val

def scope_proto_setter(scope_proto):
for (attr_name, py_value) in attr_dict.items():
# set attr
for attr_name, py_value in attr_dict.items():
assert attr_name in name2default
attr_util.SetAttrValue(
scope_proto.mutable_attr_name2attr_value()[attr_name],
py_value,
name2default[attr_name],
)
# append name prefix
scope_proto.clear_scope_op_name_prefixes()
scope_proto.add_scope_op_name_prefixes(block.name_prefix + block.name)

Expand All @@ -114,10 +128,11 @@ def scope_to_proto(scope):


def build_graph_input_arg(op_name, arg):
assert isinstance(arg, (Tensor, InternalTensor))
assert isinstance(arg, Tensor)
input_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)

input_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
op_name, input_conf, ["in_0"], ["out_0"]
)
Expand All @@ -126,28 +141,38 @@ def build_graph_input_arg(op_name, arg):
return lazy_arg


def build_graph_state(op_name, state_tensor):
def build_graph_state(op_name, state_tensor, state_config):
var_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedVariableOpConf()
)

var_op = oneflow._oneflow_internal.one.FeedVariableOpExpr(
op_name, var_conf, ["in_0"], ["out_0"]
)

attrs = oneflow._oneflow_internal.MutableCfgAttrMap()
if state_config is not None:
attr_l2 = user_op_attr_cfg.AttrValue()
attr_l2.set_at_double(state_config.l2)
attrs["l2"] = attr_l2

assert isinstance(state_tensor, Tensor)
lazy_tensor = var_op.apply([state_tensor], attrs)[0]
return lazy_tensor


def build_graph_output(op_name, out):
assert isinstance(out, InternalTensor)
assert isinstance(out, Tensor)
assert out.is_lazy

output_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf()
)

output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr(
op_name, output_conf, ["in_0"], ["out_0"]
)
attrs = oneflow._oneflow_internal.MutableCfgAttrMap()

eager_out = output_op.apply([out], attrs)[0]
return eager_out
9 changes: 8 additions & 1 deletion python/oneflow/framework/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import oneflow.framework.check_point_v2 as check_point_v2
import oneflow.framework.tensor_str as tensor_str_util
import oneflow.ops.initializer_util as initializer_util
import oneflow._oneflow_internal.lazy_mode as lazy_mode

import numpy as np
from typing import Union
Expand Down Expand Up @@ -70,7 +71,13 @@ def _element_size(self):


def _backward(self, gradient=None, retain_graph=False, create_graph=False):
flow.autograd.backward(self, gradient, retain_graph, create_graph)
if not lazy_mode.is_enabled():
flow.autograd.backward(self, gradient, retain_graph, create_graph)
else:
assert (
self.is_lazy
), "nn.Graph only accept lazy tensor to call backward() in lazy mode."
flow._oneflow_internal.nn.graph.AddTensorAsGraphLoss(self)


def _getitem(self, key):
Expand Down
86 changes: 57 additions & 29 deletions python/oneflow/nn/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import oneflow.framework.c_api_util as c_api_util
import oneflow.framework.graph_build_util as graph_build_util
import oneflow.framework.session_context as session_ctx
from oneflow._oneflow_internal import Tensor as InternalTensor
from oneflow.framework.tensor import Tensor
from oneflow.framework.function_util import FunctionConfig
from oneflow.framework.multi_client_session import MultiClientSession
from oneflow.framework.tensor_tuple_util import convert_to_tensor_tuple
from oneflow.nn.graph_block import Block, BlockType
from oneflow.nn.graph_optimizer import OptimizerConfig
from oneflow.nn.graph_optimizer import OptimizerConfig, VariableConfig
from oneflow.nn.module import Module
from oneflow.nn.optimizer.optimizer import Optimizer
from oneflow.nn.util import add_indent
Expand All @@ -41,9 +41,9 @@ def __init__(self):
self.config.proto.set_job_name(self._name)
self._c_nn_graph = oneflow._oneflow_internal.nn.graph.CNNGraph(self._name)
self._blocks = OrderedDict()
self._optimizers = OrderedDict()
self._optimizers_conf = OrderedDict()
self._variables_conf = OrderedDict()
self._is_compiled = False
self._var2var_op_name = dict()
self._job_proto = None

@property
Expand Down Expand Up @@ -75,7 +75,7 @@ def add_optimizer(
assert isinstance(
optimizer, Optimizer
), "optimizer must be an instance of Optimizer"
self._optimizers[name] = OptimizerConfig(
self._optimizers_conf[name] = OptimizerConfig(
name, optimizer, lr_scheduler, grad_clipping_conf, weight_decay_conf
)

Expand All @@ -87,66 +87,82 @@ def _generate_name(self):
Graph._child_init_cnt[child_name] += 1

def _state(self):
for (_, b) in self._blocks.items():
for _, b in self._blocks.items():
pa_gen = b.parameters(recurse=True)
for pa in pa_gen:
yield pa
bu_gen = b.buffers(recurse=True)
for bu in bu_gen:
yield bu

def _preprocess_state(self):
state_list = list()
def _generate_optimizer_and_variable_configs(self):
if len(self._optimizers_conf) > 0:
self.config._train(True)
for state_block in self._state():
state_list.append(state_block.origin)
if state_block.type == BlockType.PARAMETER:
self._var2var_op_name[state_block.origin] = (
self._variables_conf[state_block.origin] = VariableConfig(
state_block.name_prefix + state_block.name
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

给每个参数构造一个VariableConf


def _complete_graph_config(self):
if len(self._optimizers):
self.config._train(True)
for (name, opt_config) in self._optimizers.items():
self.config.add_optimizer_config(opt_config, self._var2var_op_name)
for name, opt_config in self._optimizers_conf.items():
self.config._generate_optimizer_and_variable_configs(
opt_config, self._variables_conf
)

def _compile(self, *args):
assert not self._is_compiled, (
"nn.Graph " + self._name + " has already been compiled."
)
self._preprocess_state()
self._complete_graph_config()

self._generate_optimizer_and_variable_configs()

session = session_ctx.GetDefaultSession()
assert type(session) is MultiClientSession
session.TryInit()
with graph_build_util.graph_build_context(self.config.proto, session):
# Deal with input
lazy_args = []
lazy_arg_op_names = []
for (idx, arg) in enumerate(args):
for idx, arg in enumerate(args):
op_name = "_" + self.name + "-input_" + str(idx)
lazy_args.append(graph_build_util.build_graph_input_arg(op_name, arg))
lazy_arg_op_names.append(op_name)

# Deal with parameter and buffer
state_op_names = []
state_tensors = []
for state_block in self._state():
op_name = state_block.name_prefix + state_block.name
state_tensor = state_block.origin
state_op_names.append(op_name)
state_tensors.append(state_tensor)
if state_block.type == BlockType.PARAMETER:
state_config = self._variables_conf[state_block.origin]
else:
state_config = None
state_block.set_lazy_origin_builder(
partial(graph_build_util.build_graph_state, op_name, state_tensor)
partial(
graph_build_util.build_graph_state,
op_name,
state_tensor,
state_config,
)
)

self._variables = convert_to_tensor_tuple(state_tensors)

# Deal with module in self.build(*args)
outputs = self.build(*lazy_args)

# Deal with outputs
if not (type(outputs) is tuple or type(outputs) is list):
if outputs is None:
outputs = ()
else:
assert type(outputs) is InternalTensor
assert type(outputs) is Tensor
outputs = (outputs,)
eager_outputs = []
eager_output_op_names = []
for (idx, out) in enumerate(outputs):
for idx, out in enumerate(outputs):
op_name = "_" + self.name + "-output_" + str(idx)
eager_outputs.append(graph_build_util.build_graph_output(op_name, out))
eager_output_op_names.append(op_name)
Expand All @@ -156,19 +172,27 @@ def _compile(self, *args):
eager_outputs = eager_outputs[0]
else:
eager_outputs = tuple(eager_outputs)

self._outputs = convert_to_tensor_tuple(eager_outputs)
self._eager_outputs = eager_outputs

# Register input/output/variable to _c_nn_graph
self._c_nn_graph.register_input_op_names(lazy_arg_op_names)
self._c_nn_graph.register_output_op_names(eager_output_op_names)
self._c_nn_graph.register_variable_op_names_and_tensors(
state_op_names, self._variables
)

# Save job proto for debug
self._job_proto = c_api_util.GetCurrentJob()

# Complie and init Runtime
self._c_nn_graph.complie_and_init_runtime()
self._is_compiled = True
return eager_outputs

def _launch(self, *args):
# oneflow._oneflow_internal.eager.multi_client.Sync() NOTE(chengcheng): Need Sync?
oneflow._oneflow_internal.nn.graph.RunLazyNNGraph(
convert_to_tensor_tuple(args),
self._outputs,
Expand All @@ -183,7 +207,7 @@ def __call__(self, *args):
return self._launch(*args)

def _add_block(self, name: str, module: Module = None) -> None:
"""Adds a module to the current graph as a block.
r"""Adds a module to the current graph as a block.

The block can be accessed as an attribute using the given name.

Expand All @@ -209,7 +233,8 @@ def __setattr__(self, name: str, value=None):
self._add_block(name, value)
elif isinstance(value, Optimizer):
raise AttributeError(
"'{}' object are not allowed to set Optimizer attribute named '{}', please use add_optimizer(...) instead.".format(
"'{}' object are not allowed to set Optimizer attribute named '{}', "
"please use add_optimizer(...) instead.".format(
type(self).__name__, name
)
)
Expand All @@ -230,11 +255,12 @@ def __repr__(self):
lines = None
if len(self._blocks) > 0:
child_lines = []
for (n, m) in self._blocks.items():
for n, m in self._blocks.items():
mod_str = repr(m)
mod_str = add_indent(mod_str, 2)
child_lines.append(mod_str)
lines = child_lines

main_str = "(" + self._name + ":" + self.__class__.__name__ + ":GRAPH): ("
if lines is not None:
main_str += "\n " + "\n ".join(lines) + "\n"
Expand Down Expand Up @@ -266,11 +292,13 @@ def _train(self, mode: bool = True):
else:
self.proto.mutable_predict_conf()

def add_optimizer_config(
self, optimizer_config: OptimizerConfig = None, var2var_op_name: Dict = None
def _generate_optimizer_and_variable_configs(
self,
optimizer_config: OptimizerConfig = None,
variables_conf: OrderedDict = None,
):
optimizer_config.optimizer.add_to_graph_train_config(
self.proto.mutable_train_conf(), var2var_op_name
optimizer_config.generate_optimizer_and_variable_configs(
self.proto.mutable_train_conf(), variables_conf
)


Expand Down
Loading