Skip to content

Commit

Permalink
Isolated benchmarking support (apache#7662)
Browse files Browse the repository at this point in the history
* Change namespace and make logging functionality changes

* Help comment changes

* Isolated benchmarking support for IO Compute and Communication

* Change logging statements and use asserts

* Change flag check

* Count number of cpu cores

* Logging only for cpu workers

* Add rank to final logging

* Use dummy iter when measuring comm only, use measure_only for all COMM, COMP and IO

* Change COMPUTE to COMP

* Change to .format
  • Loading branch information
anirudh2290 authored and piiswrong committed Sep 4, 2017
1 parent 842c096 commit cdfe8a7
Showing 1 changed file with 72 additions and 12 deletions.
84 changes: 72 additions & 12 deletions benchmark/python/sparse/sparse_end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import time
import argparse
import os
import multiprocessing

MAX_NUM_BATCH = 99999999
COMP = "compute"
COMM = "communication"
IO = "io"

parser = argparse.ArgumentParser(description="Run sparse linear regression " \
"with distributed kvstore",
Expand All @@ -29,13 +35,13 @@
help='number of epochs to train')
parser.add_argument('--batch-size', type=int, default=512,
help='number of examples per batch')
parser.add_argument('--num-batch', type=int, default=99999999,
parser.add_argument('--num-batch', type=int, default=MAX_NUM_BATCH,
help='number of batches per epoch')
parser.add_argument('--dummy-iter', type=int, default=0,
help='whether to use dummy iterator to exclude io cost')
parser.add_argument('--kvstore', type=str, default='local',
parser.add_argument('--kvstore', type=str, default=None,
help='what kvstore to use [local, dist_sync, etc]')
parser.add_argument('--sparse-log-level', type=str, default='INFO',
parser.add_argument('--sparse-log-level', type=str, default='DEBUG',
help='logging level [DEBUG, INFO, ERROR]')
parser.add_argument('--dataset', type=str, default='avazu',
help='what test dataset to use')
Expand All @@ -48,6 +54,9 @@
help='whether to call update_metric')
parser.add_argument('--enable-logging-for', default="0",
help="Enable logging for the specified list of workers")
parser.add_argument('--measure-only', default=None,
help="Measure only",
choices=[IO, COMP, COMM])


def get_libsvm_data(data_dir, data_name, url, data_origin_name):
Expand Down Expand Up @@ -87,16 +96,26 @@ def next(self):
'data_origin_name': 'avazu-app.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2",
'feature_dim': 1000000,
'lc': 1719304,
}

kdda = {
'data_name': 'kdda.t',
'data_origin_name': 'kdda.t.bz2',
'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2",
'feature_dim': 20216830,
'lc': 510302,
}

criteo = {
'data_name': 'criteo.t',
'data_origin_name': 'criteo.t.bz2',
'url': "https://s3-us-west-2.amazonaws.com/sparse-dataset/criteo.t.bz2",
'feature_dim': 8388621,
'lc': 548787,
}

datasets = { 'kdda' : kdda, 'avazu' : avazu }
datasets = { 'kdda' : kdda, 'avazu' : avazu , 'criteo': criteo }


def get_sym(feature_dim):
Expand All @@ -109,6 +128,15 @@ def get_sym(feature_dim):
return model


def row_sparse_push(kv, param_arrays, grad_arrays, param_names):
for index, pair in enumerate(zip(param_arrays, grad_arrays)):
arg_list, grad_list = pair
if grad_list[0] is None:
continue
name = param_names[index]
kv.push(name, grad_list, priority=-index)


def row_sparse_pull(kv, key, data, slices, weight_array, priority):
# if have kvstore, need to pull corresponding rows of
# the weights to each context
Expand Down Expand Up @@ -140,11 +168,21 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority):
dummy_iter = args.dummy_iter
dataset = args.dataset
log_level = args.sparse_log_level
measure_only = args.measure_only
num_cores = multiprocessing.cpu_count()
if measure_only == COMP or measure_only == IO:
assert not kvstore, "when compute_only or io_only is set, kvstore should be None"
num_batch = datasets[dataset]['lc'] / batch_size if num_batch == MAX_NUM_BATCH else num_batch
if measure_only == COMM:
assert (kvstore == "dist_async"), "when communication_only is set kvstore should be dist_async"
num_batch = datasets[dataset]['lc'] / batch_size if num_batch == MAX_NUM_BATCH else num_batch


contexts = mx.context.cpu(0) if args.num_gpu < 1\
else [mx.context.gpu(i) for i in range(args.num_gpu)]

# create kvstore when there are gpus
kv = mx.kvstore.create(kvstore) if args.num_gpu >= 1 else None
kv = mx.kvstore.create(kvstore) if kvstore else None
rank = kv.rank if kv is not None else 0
num_worker = kv.num_workers if kv is not None else 1

Expand Down Expand Up @@ -181,7 +219,7 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority):
train_data = mx.io.LibSVMIter(data_libsvm=path, data_shape=(feature_dim,),
batch_size=batch_size, num_parts=num_worker,
part_index=rank)
if dummy_iter:
if dummy_iter or measure_only == COMP or measure_only == COMM:
train_data = DummyIter(train_data)

# model
Expand Down Expand Up @@ -216,7 +254,12 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority):
logging.debug('start training ...')
start = time.time()
data_iter = iter(train_data)
time_cost_epoch = 0.
sum_cost_epoch = 0.
average_cost_epoch = 0.

for epoch in range(num_epoch):
start_time_epoch = time.time()
nbatch = 0
end_of_batch = False
data_iter.reset()
Expand All @@ -228,9 +271,17 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority):
nbatch += 1
batch = next_batch

mod.forward_backward(batch)
# update parameters
mod.update()
if measure_only != IO and measure_only != COMM:
mod.forward_backward(batch)
# update parameters
mod.update()
if measure_only == COMM:
if nbatch == 1:
mod.forward_backward(batch)
mod.update()
else:
row_sparse_push(kv, mod._exec_group.param_arrays, mod._exec_group.grad_arrays, mod._exec_group.param_names)


try:
# pre fetch next batch
Expand All @@ -246,11 +297,20 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority):
mod.update_metric(metric, batch.label)
else: # call waitall to replace update_metric as sync point
mx.nd.waitall() # sync point for the current minibatch
logging.info('epoch %d, %s' % (epoch, metric.get()))
logging.info('epoch {}, {}'.format(epoch, metric.get()))
end_time_epoch = time.time()
if epoch == 0:
print "num_batches = ", nbatch
logging.debug("num_batches = {}".format(nbatch))
logging.info('|device|num_worker|average_cost_epoch|rank|')
time_cost_epoch = end_time_epoch - start_time_epoch
if epoch > 0:
sum_cost_epoch = sum_cost_epoch + time_cost_epoch
average_cost_epoch = float(sum_cost_epoch) / epoch
logging.info('num_worker = {}, time cost per epoch = {}'.format(str(num_worker), str(time_cost_epoch)))
if args.num_gpu < 1:
logging.info('|cpu/{} cores| {} | {} | {} |'.format(str(num_cores), str(num_worker), str(average_cost_epoch), rank))
if profiler:
mx.profiler.profiler_set_state('stop')
end = time.time()
time_cost = end - start
logging.info('num_worker = ' + str(num_worker) + ', time cost = ' + str(time_cost))
logging.info('num_worker = {}, rank = {}, time cost = {}'.format(str(num_worker), str(rank), str(time_cost)))

0 comments on commit cdfe8a7

Please sign in to comment.