Skip to content

Commit

Permalink
include mutatable inputs in storage fallback. refactor executor
Browse files Browse the repository at this point in the history
add fallback test for rms prop and adam

fix lint

fix lint

fix test in optimizer
  • Loading branch information
eric-haibin-lin committed Aug 5, 2017
1 parent d511938 commit 4702073
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 119 deletions.
14 changes: 8 additions & 6 deletions python/mxnet/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,10 @@ def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8,
self.epsilon = epsilon

def create_state(self, index, weight):
return (zeros(weight.shape, weight.context, dtype=weight.dtype), # mean
zeros(weight.shape, weight.context, dtype=weight.dtype)) # variance
return (zeros(weight.shape, weight.context, dtype=weight.dtype,
stype=weight.stype), # mean
zeros(weight.shape, weight.context, dtype=weight.dtype,
stype=weight.stype)) # variance

def update(self, index, weight, grad, state):
assert(isinstance(weight, NDArray))
Expand Down Expand Up @@ -649,11 +651,11 @@ def __init__(self, learning_rate=0.001, gamma1=0.9, gamma2=0.9,
def create_state(self, index, weight):
if self.centered:
return (
zeros(weight.shape, weight.context), # n
zeros(weight.shape, weight.context), # g
zeros(weight.shape, weight.context)) # delta
zeros(weight.shape, weight.context, stype=weight.stype), # n
zeros(weight.shape, weight.context, stype=weight.stype), # g
zeros(weight.shape, weight.context, stype=weight.stype)) # delta
else:
return (zeros(weight.shape, weight.context),) # n
return (zeros(weight.shape, weight.context, stype=weight.stype),) # n

def update(self, index, weight, grad, state):
assert(isinstance(weight, NDArray))
Expand Down
79 changes: 52 additions & 27 deletions src/c_api/c_api_ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void SetShapeType(const nnvm::Op* op,
void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
std::vector<engine::VarHandle> *p_write_vars,
std::vector<Resource> *p_requested,
std::vector<uint32_t> *p_auxidx,
std::vector<uint32_t> *p_mutate_idx,
const nnvm::Op* op,
const nnvm::NodeAttrs& attrs,
const Context& ctx,
Expand All @@ -235,7 +235,7 @@ void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
std::vector<engine::VarHandle>& read_vars = *p_read_vars;
std::vector<engine::VarHandle>& write_vars = *p_write_vars;
std::vector<Resource>& requested = *p_requested;
std::vector<uint32_t>& auxidx = *p_auxidx;
std::vector<uint32_t>& mutate_idx = *p_mutate_idx;

if (tmp_resource.count(op)) {
int ntmp = 0;
Expand All @@ -261,9 +261,9 @@ void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
write_vars.push_back(i.var());
}
if (mutate.count(op)) {
auxidx = mutate[op](attrs);
std::sort(auxidx.begin(), auxidx.end());
for (auto & i : auxidx) {
mutate_idx = mutate[op](attrs);
std::sort(mutate_idx.begin(), mutate_idx.end());
for (auto & i : mutate_idx) {
write_vars.push_back(ndinputs[i].var());
}
}
Expand Down Expand Up @@ -293,36 +293,48 @@ void PushFCompute(const FCompute& fn,
const std::vector<engine::VarHandle>& write_vars,
const std::vector<Resource>& requested,
const std::vector<NDArray>& ndinputs,
const std::vector<NDArray>& ndoutputs) {
const std::vector<NDArray>& ndoutputs,
const std::vector<uint32_t>& mutate_idx) {
using namespace common;
bool is_train = AutogradRuntime::Get()->IsTraining();
Engine::Get()->PushAsync(
[ctx, attrs, fn, ndinputs, ndoutputs, requested, is_train](
[ctx, attrs, fn, ndinputs, ndoutputs, requested, is_train, mutate_idx](
RunContext rctx,
engine::CallbackOnComplete on_complete) {
std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
// pre-fcompute and post-fcompute storage fallback src NDArrays and dst NDArrays
std::vector<NDArray> pre_temp_src, pre_temp_dst, post_temp_dst, post_temp_src;
// mapping from index in input_blobs to index in pre_temp_dst
std::unordered_map<uint32_t, uint32_t> in_temp_idx_map;
// populate input blobs and output blobs
SetupDefaultBlobs(ndinputs, &input_blobs, &pre_temp_src, &pre_temp_dst, &in_temp_idx_map);
SetupDefaultBlobs(ndoutputs, &output_blobs, &post_temp_dst, &post_temp_src);
// add mutable inputs to post temp list
for (const auto idx : mutate_idx) {
if (in_temp_idx_map.find(idx) != in_temp_idx_map.end()) {
post_temp_src.push_back(pre_temp_dst[in_temp_idx_map[idx]]);
post_temp_dst.push_back(ndinputs[idx]);
}
}
OpContext opctx{is_train, rctx,
engine::CallbackOnComplete(),
requested};
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (ctx.dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<gpu>(pre_temp_src, pre_temp_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<gpu>(post_temp_src, post_temp_dst, opctx);
rctx.get_stream<gpu>()->Wait();
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<cpu>(pre_temp_src, pre_temp_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<cpu>(post_temp_src, post_temp_dst, opctx);
}
on_complete();
}, ctx, read_vars, write_vars, FnProperty::kNormal,
Expand Down Expand Up @@ -365,7 +377,8 @@ void PushOperator(const OpStatePtr& state,
const std::vector<engine::VarHandle>& write_vars,
const std::vector<Resource>& requested,
const std::vector<NDArray>& ndinputs,
const std::vector<NDArray>& ndoutputs) {
const std::vector<NDArray>& ndoutputs,
const std::vector<uint32_t>& mutate_idx) {
using namespace common;
static auto& fexec_type = nnvm::Op::GetAttr<FExecType>("FExecType");

Expand All @@ -379,28 +392,40 @@ void PushOperator(const OpStatePtr& state,
if (fcompute != nullptr) {
CHECK(exec_type == ExecType::kSync || exec_type == ExecType::kAsync);
Engine::Get()->PushAsync(
[state, fcompute, ndinputs, ndoutputs, requested, is_train, exec_type](
[state, fcompute, ndinputs, ndoutputs, requested, is_train, exec_type, mutate_idx](
RunContext rctx,
engine::CallbackOnComplete on_complete) {
OpContext opctx{is_train, rctx, on_complete, requested};

std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
// pre-fcompute and post-fcompute storage fallback src NDArrays and dst NDArrays
std::vector<NDArray> pre_temp_src, pre_temp_dst, post_temp_dst, post_temp_src;
// mapping from index in input_blobs to index in pre_temp_dst
std::unordered_map<uint32_t, uint32_t> in_temp_idx_map;
// populate input blobs and output blobs
SetupDefaultBlobs(ndinputs, &input_blobs, &pre_temp_src, &pre_temp_dst, &in_temp_idx_map);
SetupDefaultBlobs(ndoutputs, &output_blobs, &post_temp_dst, &post_temp_src);
// add mutable inputs to post temp list
for (const auto idx : mutate_idx) {
if (in_temp_idx_map.find(idx) != in_temp_idx_map.end()) {
post_temp_src.push_back(pre_temp_dst[in_temp_idx_map[idx]]);
post_temp_dst.push_back(ndinputs[idx]);
}
}
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<gpu>(pre_temp_src, pre_temp_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<gpu>(temp_out_dst, post_temp_dst, opctx);
>>>>>>> include mutatable inputs in storage fallback. refactor executor
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<cpu>(pre_temp_src, pre_temp_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<cpu>(post_temp_src, post_temp_dst, opctx);
}
if (exec_type == ExecType::kSync) {
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
Expand Down Expand Up @@ -463,8 +488,8 @@ void ImperativeInvokeImpl(const Context& default_ctx,

std::vector<engine::VarHandle> read_vars, write_vars;
std::vector<Resource> requested;
std::vector<uint32_t> auxidx;
SetDependency(&read_vars, &write_vars, &requested, &auxidx,
std::vector<uint32_t> mutate_idx;
SetDependency(&read_vars, &write_vars, &requested, &mutate_idx,
op, attrs, ctx, ndinputs, ndoutputs);

FCompute fn = common::GetFCompute<FCompute>(op, "FCompute", ctx);
Expand All @@ -482,7 +507,7 @@ void ImperativeInvokeImpl(const Context& default_ctx,
attrs, &ndinputs, &ndoutputs);
}
PushFCompute(fn, op, attrs, ctx, read_vars, write_vars,
requested, ndinputs, ndoutputs);
requested, ndinputs, ndoutputs, mutate_idx);
} else if (createop.count(op)) {
auto state =
createop[op](attrs, ctx, ret->arg_shapes, ret->arg_types);
Expand All @@ -492,7 +517,7 @@ void ImperativeInvokeImpl(const Context& default_ctx,
}
write_vars.push_back(state.get_var());
PushOperator(state, op, attrs, ctx, read_vars, write_vars,
requested, ndinputs, ndoutputs);
requested, ndinputs, ndoutputs, mutate_idx);
} else {
LOG(FATAL)
<< "Operator " << op->name << " is not implemented for "
Expand Down
36 changes: 27 additions & 9 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,30 @@ template<typename xpu>
void CastStorageDispatch(const OpContext& ctx, const NDArray& input, const NDArray& output);

/*
* \brief get the corresponding tensor blobs from default storage NDArrays.
* If any NDArray is of non-default storage, it will be added to `temp_src`
* \return true if any input storage needs to be casted
* \brief setup default-storage tblobs from source NDArrays. If any source NDArray has non-default
* storage, it creates a temp NDArray with default storage and uses the temp tblob. The
* function also records the indices of non-default source NDArrays and the indices of
* their corresponding temporary NDArrays in the temp array.
* \param src list of source NDArray
* \param blobs list of tblobs to return
* \param temp_src list of source NDArrays which requires temporary default storage representation
* \param temp_dst list of temporary destination NDArrays for default storage representation
* \param idx_map mapping from indices in source NDArrays to indices in temp_dst. When not set,
indices are not recorded
* \return true if any source NDArray need to cast storage
*/
inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst) {
inline bool SetupDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst,
std::unordered_map<uint32_t, uint32_t> *idx_map = nullptr) {
bool require_cast = false;
for (size_t i = 0; i < src.size(); i++) {
auto& nd = src[i];
if (nd.storage_type() != kDefaultStorage) {
if (idx_map != nullptr) {
(*idx_map)[i] = temp_dst->size();
}
NDArray temp(nd.shape(), nd.ctx(), false);
temp_src->emplace_back(nd);
temp_dst->emplace_back(temp);
Expand All @@ -56,10 +68,15 @@ inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
}

/*
* \brief cast the NDArrays in `src` to NDArrays in `dst`. This is only used
* for storage fallback mechanism in executor.
* \brief cast the NDArrays in `src` and store the result in NDArrays in `dst`.
* This is only used for storage fallback in executor.
* When storage_fallback is false, and `MXNET_EXEC_STORAGE_FALLBACK` == 0,
* storage fallback is disallowed.
* \param src list of source NDArray to cast
* \param dst list of destionation NDArray which hold the result of cast_storage operation
* \param ctx operator context for cast_storage operation
* \param storage_fallback whether storage_fallback is allowed. When set to false,
* its value depends on `MXNET_EXEC_STORAGE_FALLBACK`.
*/
template <typename xpu>
inline void CastNonDefaultStorage(const std::vector<NDArray>& src,
Expand Down Expand Up @@ -89,6 +106,7 @@ inline bool ContainsNonDefaultStorage(const StorageTypeVector& vstorage) {
return false;
}

// Check if any NDArray in the list has default storage
inline bool ContainsDefaultStorage(const std::vector<NDArray>& ndarrays) {
for (const auto &nd : ndarrays) {
if (nd.storage_type() == kDefaultStorage) {
Expand Down
Loading

0 comments on commit 4702073

Please sign in to comment.