-
Notifications
You must be signed in to change notification settings - Fork 6.8k
rsp push and rsp pull for comm device, used in kvstore('device') #8732
Changes from 5 commits
fb0077e
5288bc6
865a117
296f122
8b8c14f
4fb29ae
c37ee41
96c7a2f
0990c69
32f25c8
079981f
0e4a1c6
6a18d83
31bfad8
910d4fa
9dca449
20b28eb
4084fc2
75c0656
690ec92
0c833ed
1723594
b0d53ad
9e06a08
d84bf47
5f55545
f16faa1
72e752d
5695b52
66ae47d
8179fab
134b98f
c96b158
1b09d09
e0a68c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -489,11 +489,7 @@ class CommDevice : public Comm { | |
|
||
void Init(int key, const NDArrayStorageType stype, const TShape& shape, | ||
int dtype = mshadow::kFloat32) override { | ||
if (stype == kDefaultStorage) { | ||
sorted_key_attrs_.push_back(std::make_tuple(key, shape, dtype)); | ||
} else { | ||
LOG(FATAL) << "storage type " << stype << " not implemented for device yet"; | ||
} | ||
sorted_key_attrs_.push_back(std::make_tuple(key, shape, dtype, stype)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using |
||
} | ||
|
||
void InitBuffersAndComm(const std::vector<NDArray>& src) { | ||
|
@@ -526,26 +522,66 @@ class CommDevice : public Comm { | |
InitBuffersAndComm(src); | ||
auto& buf = merge_buf_[key]; | ||
std::vector<NDArray> reduce(src.size()); | ||
CopyFromTo(src[0], &(buf.merged), priority); | ||
reduce[0] = buf.merged; | ||
|
||
if (buf.copy_buf.empty()) { | ||
// TODO(mli) this results in large device memory usage for huge ndarray, | ||
// such as the largest fullc in VGG. consider to do segment reduce with | ||
// NDArray.Slice or gpu direct memory access. for the latter, we need to | ||
// remove some ctx check, and also it reduces 20% perf | ||
buf.copy_buf.resize(src.size()-1); | ||
if (buf.merged.storage_type() == kDefaultStorage) { | ||
CopyFromTo(src[0], &(buf.merged), priority); | ||
reduce[0] = buf.merged; | ||
|
||
if (buf.copy_buf.empty()) { | ||
// TODO(mli) this results in large device memory usage for huge ndarray, | ||
// such as the largest fullc in VGG. consider to do segment reduce with | ||
// NDArray.Slice or gpu direct memory access. for the latter, we need to | ||
// remove some ctx check, and also it reduces 20% perf | ||
buf.copy_buf.resize(src.size()-1); | ||
for (size_t i = 0; i < src.size()-1; ++i) { | ||
buf.copy_buf[i] = NDArray( | ||
buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); | ||
} | ||
} | ||
for (size_t i = 0; i < src.size()-1; ++i) { | ||
buf.copy_buf[i] = NDArray( | ||
buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); | ||
CopyFromTo(src[i+1], &(buf.copy_buf[i]), priority); | ||
reduce[i+1] = buf.copy_buf[i]; | ||
} | ||
} | ||
for (size_t i = 0; i < src.size()-1; ++i) { | ||
CopyFromTo(src[i+1], &(buf.copy_buf[i]), priority); | ||
reduce[i+1] = buf.copy_buf[i]; | ||
} | ||
|
||
ElementwiseSum(reduce, &buf.merged); | ||
ElementwiseSum(reduce, &buf.merged); | ||
} else { | ||
std::vector<Engine::VarHandle> const_vars(src.size()); | ||
if (buf.copy_buf.empty()) { | ||
buf.copy_buf.resize(src.size()); | ||
for (size_t j = 0; j < src.size(); ++j) { | ||
buf.copy_buf[j] = NDArray( | ||
buf.merged.storage_type(), buf.merged.shape(), buf.merged.ctx(), | ||
true, buf.merged.dtype()); | ||
} | ||
} | ||
for (size_t i = 0; i < src.size(); ++i) { | ||
CopyFromTo(src[i], &(buf.copy_buf[i]), priority); | ||
reduce[i] = buf.copy_buf[i]; | ||
const_vars[i] = reduce[i].var(); | ||
} | ||
auto result = buf.merged; | ||
Engine::Get()->PushAsync( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be moved to ndarray.cc instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this should move into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we extend the |
||
[reduce, result, this](RunContext rctx, Engine::CallbackOnComplete on_complete) { | ||
NDArray out = result; | ||
Resource rsc = ResourceManager::Get()->Request(rctx.ctx, | ||
ResourceRequest(ResourceRequest::kTempSpace)); | ||
switch (result.ctx().dev_mask()) { | ||
case cpu::kDevMask: { | ||
mxnet::ndarray::ElementwiseSum(rctx.get_stream<cpu>(), rsc, reduce, &out); | ||
break; | ||
} | ||
#if MXNET_USE_CUDA | ||
case gpu::kDevMask: { | ||
mxnet::ndarray::ElementwiseSum(rctx.get_stream<gpu>(), rsc, reduce, &out); | ||
break; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
#endif | ||
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; | ||
} | ||
on_complete(); | ||
}, result.ctx(), const_vars, {result.var()}, | ||
FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreReduce")); | ||
} | ||
return buf.merged; | ||
} | ||
|
||
|
@@ -619,7 +655,62 @@ class CommDevice : public Comm { | |
const std::vector<std::pair<NDArray*, NDArray>>& dst, | ||
const bool use_copy, | ||
const int priority) override { | ||
LOG(FATAL) << "Not implemented yet"; | ||
using namespace mshadow; | ||
CHECK_EQ(src.storage_type(), kRowSparseStorage) | ||
<< "BroadcastRowSparse expects row-sparse src NDArray"; | ||
|
||
bool is_same_rowid = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add some brief description explaining the optimization |
||
for (size_t i = 1; i < dst.size(); ++i) { | ||
if (dst[i].second.var() != dst[0].second.var()) { | ||
is_same_rowid = false; | ||
} | ||
} | ||
|
||
for (size_t i = 0; i < dst.size(); ++i) { | ||
if (is_same_rowid && i != 0) { | ||
CopyFromTo(*dst[0].first, dst[i].first, priority); | ||
continue; | ||
} | ||
|
||
NDArray* out = dst[i].first; | ||
NDArray row_id = dst[i].second; | ||
if (use_copy) { | ||
CopyFromTo(src, out, priority); | ||
} else { | ||
CHECK_EQ(out->storage_type(), kRowSparseStorage) | ||
<< "BroadcastRowSparse expects row_sparse dst NDArray"; | ||
const bool is_diff_ctx = out->ctx() != src.ctx(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we assuming src is always on GPU? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. src is not assumed to be on gpu. Actually src is always on cpu. As you can see in https://github.com/apache/incubator-mxnet/blob/master/src/kvstore/kvstore_local.h#L233, src is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's true at the beginning. But as soon as you push some gradients on GPU, it copies the weight from pinned_ctx to GPU. See There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nonetheless, I think performing sparse retain before the copy makes more sense since the source array is usually very large. |
||
NDArray src_gpu = is_diff_ctx? NDArray(kRowSparseStorage, src.shape(), | ||
out->ctx(), true, src.dtype(), src.aux_types()) : src; | ||
if (is_diff_ctx) { | ||
CopyFromTo(src, &src_gpu, priority); | ||
} | ||
NDArray row_id_gpu = NDArray(row_id.shape(), out->ctx(), false, kInt64); | ||
const TBlob& indices = row_id_gpu.data(); | ||
CopyFromTo(row_id, &row_id_gpu, priority); | ||
|
||
Engine::Get()->PushAsync([=](RunContext rctx, Engine::CallbackOnComplete on_complete) { | ||
NDArray temp = *out; | ||
switch (temp.ctx().dev_mask()) { | ||
case cpu::kDevMask: { | ||
mxnet::common::SparseRetainOpForwardRspWrapper<cpu>(rctx.get_stream<cpu>(), | ||
src_gpu, indices, kWriteTo, &temp); | ||
break; | ||
} | ||
#if MXNET_USE_CUDA | ||
case gpu::kDevMask: { | ||
mxnet::common::SparseRetainOpForwardRspWrapper<gpu>(rctx.get_stream<gpu>(), | ||
src_gpu, indices, kWriteTo, &temp); | ||
break; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is |
||
#endif | ||
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; | ||
} | ||
on_complete(); | ||
}, out->ctx(), {src_gpu.var(), row_id_gpu.var()}, {out->var()}, | ||
FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain")); | ||
} | ||
} | ||
} | ||
|
||
private: | ||
|
@@ -665,7 +756,7 @@ class CommDevice : public Comm { | |
#endif | ||
} | ||
|
||
using KeyAttrs = std::tuple<int, TShape, int>; | ||
using KeyAttrs = std::tuple<int, TShape, int, NDArrayStorageType>; | ||
// try to allocate buff on device evenly | ||
void InitMergeBuffer(const std::vector<Context>& devs) { | ||
std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), []( | ||
|
@@ -679,8 +770,9 @@ class CommDevice : public Comm { | |
} | ||
for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { | ||
int key = std::get<0>(sorted_key_attrs_[i]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. const int |
||
TShape s = std::get<1>(sorted_key_attrs_[i]); | ||
TShape shape = std::get<1>(sorted_key_attrs_[i]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. const TShape& |
||
int type = std::get<2>(sorted_key_attrs_[i]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. const int |
||
NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: const? |
||
auto& buf = merge_buf_[key]; | ||
Context ctx; | ||
size_t min_size = std::numeric_limits<size_t>::max(); | ||
|
@@ -691,8 +783,12 @@ class CommDevice : public Comm { | |
min_size = size; | ||
} | ||
} | ||
buf.merged = NDArray(s, ctx, false, type); | ||
ctx_info[ctx.dev_id].second += s.Size(); | ||
if (stype == kDefaultStorage) { | ||
buf.merged = NDArray(shape, ctx, false, type); | ||
} else { | ||
buf.merged = NDArray(stype, shape, ctx, true, type); | ||
} | ||
ctx_info[ctx.dev_id].second += shape.Size(); | ||
} | ||
inited_ = true; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,21 +26,20 @@ | |
str_keys = ['b', 'c', 'd'] | ||
|
||
|
||
def init_kv_with_str(stype='default'): | ||
def init_kv_with_str(stype='default', kv_type='local'): | ||
"""init kv """ | ||
kv = mx.kv.create() | ||
kv = mx.kv.create(kv_type) | ||
# single | ||
kv.init('a', mx.nd.zeros(shape, stype=stype)) | ||
# list | ||
kv.init(str_keys, [mx.nd.zeros(shape=shape, stype=stype)] * len(keys)) | ||
return kv | ||
|
||
|
||
@unittest.skip("Test fails intermittently. Temporarily disabled until fixed. Tracked at https://github.com/apache/incubator-mxnet/issues/8262") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test should be fixed in #8838 |
||
def test_row_sparse_pull(): | ||
kv = init_kv_with_str('row_sparse') | ||
def test_row_sparse_pull(kv_type='device'): | ||
kv = init_kv_with_str('row_sparse', kv_type) | ||
kv.init('e', mx.nd.ones(shape).tostype('row_sparse')) | ||
|
||
kv.push('e', [mx.nd.ones(shape, ctx=mx.cpu(i)).tostype('row_sparse') for i in range(2)]) | ||
def check_row_sparse_pull(kv, count, ctx=default_context()): | ||
num_rows = shape[0] | ||
vals = [] | ||
|
@@ -59,7 +58,7 @@ def check_row_sparse_pull(kv, count, ctx=default_context()): | |
excluded_row_ids = np.setdiff1d(all_row_ids, row_id.asnumpy()) | ||
for row in range(num_rows): | ||
expected_val = np.zeros_like(retained[row]) | ||
expected_val += 0 if row in excluded_row_ids else 1 | ||
expected_val += 0 if row in excluded_row_ids else 2 | ||
assert_almost_equal(retained[row], expected_val) | ||
|
||
check_row_sparse_pull(kv, 1, mx.gpu(0)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure functions in
.h
are documented. Should add some description forCastStorageDispatch
too...