Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Sparse mega pr #168

Merged
merged 66 commits into from
Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
c5f6648
[WIP] Sparse Tensor (#5800)
eric-haibin-lin Jun 26, 2017
db65770
move storage type vector from nnvm to mxnet (#7054)
eric-haibin-lin Jul 15, 2017
e2607da
fix failed tests. add back 64bit support for dot
eric-haibin-lin Jul 17, 2017
978748e
Improve copy sparse tensors (#7003)
reminisce Jul 15, 2017
ce0fec8
bug fix for IdentityComputeRsp
eric-haibin-lin Jul 18, 2017
a2b3d3e
fix lint
eric-haibin-lin Jul 18, 2017
27c9ac0
add data partition for libsvm iter (#7027)
eric-haibin-lin Jul 21, 2017
3a394ea
fix ndarray namespace
eric-haibin-lin Jul 22, 2017
cf61a9e
remove sparse embedding (#7165)
eric-haibin-lin Jul 23, 2017
fe62976
remove untested gpu operators (#7172)
eric-haibin-lin Jul 24, 2017
4de0fdd
Fix ndarray aux data issue (#7098)
reminisce Jul 25, 2017
a472b61
Support K-dimensional row-sparse tensor (#7179)
eric-haibin-lin Jul 25, 2017
6a01b6e
Improve sparse ndarray error message (#7181)
eric-haibin-lin Jul 25, 2017
05ddf38
construct row_sparse ndarray for dist-async
eric-haibin-lin Jun 26, 2017
f57fc3c
Merge remote-tracking branch 'upstream/master' into dmlc-sparse-squash
eric-haibin-lin Jul 26, 2017
0ed14d1
fix DotCsrRspRspImpl error message (#7191)
stefanhenneking Jul 26, 2017
f0af872
GPU implementation of cast_storage (dense to csr) (#7081)
stefanhenneking Jul 27, 2017
6f0719f
Sparse square sum (#7206)
reminisce Jul 27, 2017
ec2c4bf
Modify and Add documentation for mx.nd.zeros (#7197)
anirudh2290 Jul 27, 2017
88eaac6
Merge remote-tracking branch 'upstream/master' into dmlc-sparse-squash
eric-haibin-lin Jul 27, 2017
3b94a3c
Expose kWriteInplace for imperative execution (fcompute_ex and fstate…
eric-haibin-lin Jul 28, 2017
55e4763
Operator add_n for row sparse ndarrays (#7244)
reminisce Aug 1, 2017
7e1647c
GPU implementation of cast_storage (dense to rsp) (#7223)
stefanhenneking Aug 1, 2017
5905ddc
merge with dmlc/master
eric-haibin-lin Aug 2, 2017
d8a9aba
resolve merge conflict in ndarray.load
eric-haibin-lin Aug 2, 2017
f686174
Improve StatefulOp/FCompute storage fallback (#134)
eric-haibin-lin Aug 2, 2017
d0579c4
update sparse ndarray api (#139)
eric-haibin-lin Aug 3, 2017
56b5a63
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 3, 2017
325f4db
Handle ograd_stype='row_sparse' for square_sum backward (#143)
reminisce Aug 3, 2017
5866b2b
Sparse retain improvement (#138)
reminisce Aug 5, 2017
9298bfa
ignoring variables in SimpleBind that is used on python's sparse bran…
sergeykolychev Aug 5, 2017
1f07771
add bias term to fm test (#145)
eric-haibin-lin Aug 5, 2017
d511938
merge with upstream/master. resolve conflict in c_api_ndarray.cc
eric-haibin-lin Aug 5, 2017
6956431
update ndarray.nd, remove `invoke` from excluded members (#137)
eric-haibin-lin Aug 6, 2017
6c9a350
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 6, 2017
66b7b8a
support storage fallback with mutable inputs (#147)
eric-haibin-lin Aug 6, 2017
cf8ddcf
Merge branch 'sparse' of https://github.com/eric-haibin-lin/mxnet int…
eric-haibin-lin Aug 6, 2017
0396c9a
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 7, 2017
2dc7dc9
Code changes based on reviews (#144)
eric-haibin-lin Aug 8, 2017
f318c9d
small edits according to reviews (#151)
eric-haibin-lin Aug 8, 2017
85cbc60
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 8, 2017
fc1aa6e
fix lint (#152)
eric-haibin-lin Aug 8, 2017
9ba96b9
resolve conflict in ndarray.py and capi
eric-haibin-lin Aug 8, 2017
6cbdf98
resolve conflicts in license header
eric-haibin-lin Aug 8, 2017
253ae57
add license to all new files in sparse brnach (#154)
eric-haibin-lin Aug 9, 2017
b2ad302
Allocate temp data on the fly for some casting operations (#149)
cjolivier01 Aug 9, 2017
129148c
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 9, 2017
d6f987d
fix utf8 encoding in sparse ndarray
eric-haibin-lin Aug 9, 2017
955e97f
Merge branch 'sparse' of https://github.com/eric-haibin-lin/mxnet int…
eric-haibin-lin Aug 9, 2017
bc33101
Extending the GPU dot operator (#7226)
stefanhenneking Aug 9, 2017
8040953
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 9, 2017
2d93d72
Add get_synthetic_dataset function to util (#146)
anirudh2290 Aug 10, 2017
80a590d
temporary fix for batch norm storage fallback (#156)
eric-haibin-lin Aug 10, 2017
92f54d2
support random_uniform/normal/gamma with row_sparse output (#155)
eric-haibin-lin Aug 10, 2017
17bfa4e
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 10, 2017
ef3b442
Merge remote-tracking branch 'upstream/master' into sparse
eric-haibin-lin Aug 10, 2017
a44afed
Square sum backward support one more case (#161)
reminisce Aug 10, 2017
ceca9b6
Add documentation for sparse ops (#148)
eric-haibin-lin Aug 11, 2017
1c60a05
A few fixes (#163)
eric-haibin-lin Aug 11, 2017
04e9129
Merge branch 'sparse' of https://github.com/eric-haibin-lin/mxnet int…
eric-haibin-lin Aug 12, 2017
8ebc012
merge with upstream/master
eric-haibin-lin Aug 12, 2017
889a09e
Minor fixes sparse ops (#160)
stefanhenneking Aug 14, 2017
6b0cac1
sparse Adam optimizer (#164)
eric-haibin-lin Aug 14, 2017
eeff444
kvstore.row_sparse_pull for GPU and end-to-end benchmark: CPU vs. mul…
reminisce Aug 15, 2017
54f698b
fix bug in adam update (#167)
eric-haibin-lin Aug 15, 2017
6fa078e
change sparse example from regression to classification (#165)
eric-haibin-lin Aug 15, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
construct row_sparse ndarray for dist-async
fix bug in rsp add

rsp sync push

race condition for push

fix bug in rsp pull. refactor test

cleanup comments

refactor dist server

fix lint

fix storage shape issue with the new ndarray constructor

data sharding draft;

fix lint. add comment

add support for zeros gradients

use std::upper_bound/lower_bound

remove special init function for rowsparse dist kvstore

temporary support for inplace operators for sparse

add test. fix return type

store kRowSparseNDArray in kv server

remove fcomp_ex sgd with dns weight and rsp gradient

bug fix in sparse retain

sparse pull c_api

revise rowsparse pull api

use engine to compute unique to ensure thread safety

add rowsparse pull to dist-kv

fix lint

add example for rsp_pull

remove name2idx;

add sparse_pull_dict param to module

fix unit test and  c rowid conversion

support str key type in kvstore (apache#6765)

* update kvstore unit test

* update model/module.py

* fix lint

* remove int keys in kvstore

* update cast to str function

* remove _cast_to_str_keys

* fix lint

* always cast to str

Conflicts:
	include/mxnet/c_api.h
	include/mxnet/kvstore.h
	python/mxnet/kvstore.py
	python/mxnet/model.py
	python/mxnet/module/module.py
	src/c_api/c_api.cc
	src/kvstore/kvstore_local.h
	tests/python/unittest/test_kvstore.py

update module API for other submodules

update stypes in kvstore after refactoring

change type of size from size_t to int64_t

add sparse linear regression example

remove sparse_pull_dict from module

fix init_optim for seq_module. update sparse example

resolve conflict for binary add rsp rsp

Conflicts:
	python/mxnet/kvstore.py
	tests/python/unittest/test_kvstore.py
  • Loading branch information
eric-haibin-lin committed Jul 26, 2017
commit 05ddf38b185dffe43d7a4a2644d771eeb67536e0
15 changes: 15 additions & 0 deletions example/sparse/get_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# pylint: skip-file
import os, gzip
import pickle as pickle
import sys

def get_libsvm_data(data_dir, data_name, url, data_origin_name):
if not os.path.isdir(data_dir):
os.system("mkdir " + data_dir)
os.chdir(data_dir)
if (not os.path.exists(data_name)):
import urllib
zippath = os.path.join(data_dir, data_origin_name)
urllib.urlretrieve(url, zippath)
os.system("bzip2 -d %r" % data_origin_name)
os.chdir("..")
178 changes: 178 additions & 0 deletions example/sparse/linear_regression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import mxnet as mx
from mxnet.test_utils import *
from get_data import get_libsvm_data
import time
import argparse
import os

parser = argparse.ArgumentParser(description="Run sparse linear regression " \
"with distributed kvstore",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--profiler', type=int, default=0,
help='whether to use profiler')
parser.add_argument('--num-epoch', type=int, default=1,
help='number of epochs to train')
parser.add_argument('--batch-size', type=int, default=512,
help='number of examples per batch')
parser.add_argument('--num-batch', type=int, default=99999999,
help='number of batches per epoch')
parser.add_argument('--dummy-iter', type=int, default=0,
help='whether to use dummy iterator to exclude io cost')
parser.add_argument('--kvstore', type=str, default='dist_sync',
help='what kvstore to use [local, dist_sync, etc]')
parser.add_argument('--log-level', type=str, default='debug',
help='logging level [debug, info, error]')
parser.add_argument('--dataset', type=str, default='avazu',
help='what test dataset to use')

class DummyIter(mx.io.DataIter):
"A dummy iterator that always return the same batch, used for speed testing"
def __init__(self, real_iter):
super(DummyIter, self).__init__()
self.real_iter = real_iter
self.provide_data = real_iter.provide_data
self.provide_label = real_iter.provide_label
self.batch_size = real_iter.batch_size

for batch in real_iter:
self.the_batch = batch
break

def __iter__(self):
return self

def next(self):
return self.the_batch

# testing dataset sources
avazu = {
'data_name': 'avazu-app.t',
'data_origin_name': 'avazu-app.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2",
'feature_dim': 1000000,
}

kdda = {
'data_name': 'kdda.t',
'data_origin_name': 'kdda.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2",
'feature_dim': 20216830,
}

datasets = { 'kdda' : kdda, 'avazu' : avazu }

def regression_model(feature_dim):
initializer = mx.initializer.Normal()
x = mx.symbol.Variable("data", stype='csr')
norm_init = mx.initializer.Normal(sigma=0.01)
v = mx.symbol.Variable("v", shape=(feature_dim, 1), init=norm_init, stype='row_sparse')
embed = mx.symbol.dot(x, v)
y = mx.symbol.Variable("softmax_label")
model = mx.symbol.LinearRegressionOutput(data=embed, label=y, name="out")
return model

if __name__ == '__main__':

# arg parser
args = parser.parse_args()
num_epoch = args.num_epoch
num_batch = args.num_batch
kvstore = args.kvstore
profiler = args.profiler > 0
batch_size = args.batch_size
dummy_iter = args.dummy_iter
dataset = args.dataset
log_level = args.log_level

# create kvstore
kv = mx.kvstore.create(kvstore)
rank = kv.rank
num_worker = kv.num_workers

# only print log for rank 0 worker
import logging
if rank != 0:
log_level = logging.ERROR
elif log_level == 'DEBUG':
log_level = logging.DEBUG
else:
log_level = logging.INFO
head = '%(asctime)-15s %(message)s'
logging.basicConfig(level=log_level, format=head)

# dataset
assert(dataset in datasets), "unknown dataset " + dataset
metadata = datasets[dataset]
feature_dim = metadata['feature_dim']
if logging:
logging.debug('preparing data ... ')
data_dir = os.path.join(os.getcwd(), 'data')
path = os.path.join(data_dir, metadata['data_name'])
if not os.path.exists(path):
get_libsvm_data(data_dir, metadata['data_name'], metadata['url'],
metadata['data_origin_name'])
assert os.path.exists(path)

# data iterator
train_data = mx.io.LibSVMIter(data_libsvm=path, data_shape=(feature_dim,),
batch_size=batch_size, num_parts=num_worker,
part_index=rank)
if dummy_iter:
train_data = DummyIter(train_data)

# model
model = regression_model(feature_dim)

# module
mod = mx.mod.Module(symbol=model, data_names=['data'], label_names=['softmax_label'])
mod.bind(data_shapes=train_data.provide_data, label_shapes=train_data.provide_label)
mod.init_params(initializer=mx.init.Uniform(scale=.1))
sgd = mx.optimizer.SGD(momentum=0.0, clip_gradient=5.0,
learning_rate=0.1, rescale_grad=1.0/batch_size/num_worker)
mod.init_optimizer(optimizer=sgd, kvstore=kv)
# use accuracy as the metric
metric = mx.metric.create('MSE')

# start profiler
if profiler:
import random
name = 'profile_output_' + str(num_worker) + '.json'
mx.profiler.profiler_set_config(mode='all', filename=name)
mx.profiler.profiler_set_state('run')

logging.debug('start training ...')
start = time.time()
data_iter = iter(train_data)
for epoch in range(num_epoch):
nbatch = 0
end_of_batch = False
data_iter.reset()
metric.reset()
next_batch = next(data_iter)
while not end_of_batch:
nbatch += 1
batch = next_batch
# TODO(haibin) remove extra copy after Jun's change
row_ids = batch.data[0].indices.copyto(mx.cpu())
# pull sparse weight
index = mod._exec_group.param_names.index('v')
kv.row_sparse_pull('v', mod._exec_group.param_arrays[index],
priority=-index, row_ids=[row_ids])
mod.forward_backward(batch)
# update parameters
mod.update()
try:
# pre fetch next batch
next_batch = next(data_iter)
if nbatch == num_batch:
raise StopIteration
except StopIteration:
end_of_batch = True
# accumulate prediction accuracy
mod.update_metric(metric, batch.label)
logging.info('epoch %d, %s' % (epoch, metric.get()))
if profiler:
mx.profiler.profiler_set_state('stop')
end = time.time()
time_cost = end - start
logging.info('num_worker = ' + str(num_worker) + ', time cost = ' + str(time_cost))
20 changes: 20 additions & 0 deletions include/mxnet/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,26 @@ MXNET_DLL int MXKVStorePullEx(KVStoreHandle handle,
const char** keys,
NDArrayHandle* vals,
int priority);

/*!
* \brief pull a list of (key, value) pairs from the kvstore, where each key is a string.
* The NDArray pulled back will be in row_sparse storage with only the specified
* row_ids present based row_ids (others rows are zeros).
* \param handle handle to the kvstore
* \param num the number of key-value pairs
* \param keys the list of keys
* \param vals the list of values
* \param row_ids the list of row_id NDArrays
* \param priority the priority of the action
* \return 0 when success, -1 when failure happens
*/
MXNET_DLL int MXKVStorePullRowSparse(KVStoreHandle handle,
mx_uint num,
const char** keys,
NDArrayHandle* vals,
const NDArrayHandle* row_ids,
int priority);

/*!
* \brief user-defined updater for the kvstore
* It's this updater's responsibility to delete \a recv and \a local
Expand Down
24 changes: 24 additions & 0 deletions include/mxnet/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define MXNET_KVSTORE_H_
#include <dmlc/io.h>
#include <vector>
#include <utility>
#include <unordered_map>
#include <string>
#include <functional>
Expand Down Expand Up @@ -155,6 +156,29 @@ class KVStore {
const std::vector<NDArray*>& values,
int priority = 0) = 0;

/*!
* \brief pull a list of key-value pairs from the store.
* The NDArray pulled back will be in row_sparse storage with only the
* specified row_ids present (others rows are zeros).
* \param keys the list of keys
* \param values the list of buffers - row_id pairs
* \param priority the priority of the action.
*/
virtual void PullRowSparse(const std::vector<int>& str_keys,
const std::vector<std::pair<NDArray*, NDArray>>& val_rowids,
const int priority = 0) = 0;

/*!
* \brief pull a list of key-value pairs from the store, where each key is a string.
* The NDArray pulled back will be in row_sparse storage with only the
* specified row_ids present (others rows are zeros).
* \param keys the list of keys in string format
* \param values the list of buffers - row_id pairs
* \param priority the priority of the action.
*/
virtual void PullRowSparse(const std::vector<std::string>& str_keys,
const std::vector<std::pair<NDArray*, NDArray>>& val_rowids,
const int priority = 0) = 0;

/**
* \brief the prototype of user-defined updater
Expand Down
11 changes: 8 additions & 3 deletions include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,13 @@ class NDArray {
return shape_;
}
/*!
* \return the shape of underlying chunk which stores the NDArray values.
* For default storage, it is the same as shape(). For row-sparse storage, it is the shape of
* \return the shape of underlying chunk which stores the NDArray data/value.
* It is only intended for non-default storage. For row-sparse storage, it is the shape of
* the tensor which stores the non-zero values.
*/
inline const TShape &storage_shape() const {
CHECK(ptr_ != nullptr);
CHECK_NE(storage_type(), kDefaultStorage);
return ptr_->storage_shape;
}

Expand Down Expand Up @@ -276,7 +277,11 @@ class NDArray {
if (is_none()) return false;
auto stype = storage_type();
CHECK_NE(stype, kDefaultStorage);
if (stype == kRowSparseStorage || stype == kCSRStorage) {
if (stype == kRowSparseStorage) {
CHECK_EQ(aux_shape(rowsparse::kIdx)[0], storage_shape()[0]);
return aux_shape(0).Size() != 0;
} else if (stype == kCSRStorage) {
CHECK_EQ(aux_shape(csr::kIdx)[0], storage_shape()[0]);
return aux_shape(0).Size() != 0;
} else {
LOG(FATAL) << "Unknown storage type";
Expand Down
Loading