Skip to content

Commit

Permalink
WIP multigpu
Browse files Browse the repository at this point in the history
  • Loading branch information
vince62s committed May 21, 2018
1 parent 641f71b commit dce9b26
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 67 deletions.
2 changes: 1 addition & 1 deletion onmt/inputters/inputter.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def batch_size_fn(new, count, sofar):
return max(src_elements, tgt_elements)
else:
batch_size_fn = None
device = opt.gpuid[0] if opt.gpuid else -1
device = opt.device_id if opt.gpuid else -1

return DatasetLazyIter(datasets, fields, batch_size, batch_size_fn,
device, is_train)
Expand Down
3 changes: 0 additions & 3 deletions onmt/model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,6 @@ def build_model(model_opt, opt, fields, checkpoint):
print('Building model...')
model = build_base_model(model_opt, fields,
use_gpu(opt), checkpoint)
if len(opt.gpuid) > 1:
print('Multi gpu training: ', opt.gpuid)
model = nn.DataParallel(model, device_ids=opt.gpuid, dim=1)
print(model)

return model
2 changes: 1 addition & 1 deletion onmt/models/SRU.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def check_sru_requirement(abort=False):
Return True if check pass; if check fails and abort is True,
raise an Exception, othereise return False.
"""
return True

# Check 1.
try:
if platform.system() == 'Windows':
Expand Down
6 changes: 6 additions & 0 deletions onmt/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ def train_opts(parser):
# GPU
group.add_argument('-gpuid', default=[], nargs='+', type=int,
help="Use CUDA on the listed devices.")
group.add_argument('-gpu_rank', default=0, nargs='+', type=int,
help="Rank the current gpu device.")
group.add_argument('-device_id', default=0, nargs='+', type=int,
help="Rank the current gpu device.")
group.add_argument('-gpu_backend', default='nccl', nargs='+', type=str,
help="Type of torch distributed backend")

group.add_argument('-seed', type=int, default=-1,
help="""Random seed used for the experiments
Expand Down
17 changes: 8 additions & 9 deletions onmt/tests/rebuild_test_models.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# # Retrain the models used for CI.
# # Should be done rarely, indicates a major breaking change.
my_python=python3.6

export CUDA_VISIBLE_DEVICES=0,1
############### TEST regular RNN choose either -rnn_type LSTM / GRU / SRU and set input_feed 0 for SRU
if true; then
if false; then
rm data/*.pt
$my_python preprocess.py -train_src data/src-train.txt -train_tgt data/tgt-train.txt -valid_src data/src-val.txt -valid_tgt data/tgt-val.txt -save_data data/data -src_vocab_size 1000 -tgt_vocab_size 1000

Expand Down Expand Up @@ -45,16 +45,15 @@ mv /tmp/tmp*e8.pt onmt/tests/test_model2.pt
rm /tmp/tmp*.pt
fi
############### TEST TRANSFORMER
if false; then
if true; then
rm data/*.pt
$my_python preprocess.py -train_src data/src-train.txt -train_tgt data/tgt-train.txt -valid_src data/src-val.txt -valid_tgt data/tgt-val.txt -save_data data/data -src_vocab_size 1000 -tgt_vocab_size 1000 -share_vocab


$my_python train.py -data data/data -save_model /tmp/tmp -batch_type tokens -batch_size 1024 -accum_count 4 \
-layers 4 -rnn_size 256 -word_vec_size 256 -encoder_type transformer -decoder_type transformer -share_embedding \
-epochs 10 -gpuid 0 -max_generator_batches 4 -dropout 0.1 -normalization tokens \
-max_grad_norm 0 -optim sparseadam -decay_method noam -learning_rate 2 -label_smoothing 0.1 \
-position_encoding -param_init 0 -warmup_steps 100 -param_init_glorot -adam_beta2 0.998
$my_python train.py -data data/data -save_model /tmp/tmp -batch_type tokens -batch_size 1024 -accum_count 1 \
-layers 2 -rnn_size 256 -word_vec_size 256 -encoder_type transformer -decoder_type transformer -share_embedding \
-epochs 10 -gpuid 0 1 -max_generator_batches 4 -dropout 0.1 -normalization tokens \
-max_grad_norm 0 -optim adam -decay_method noam -learning_rate 2 -label_smoothing 0.1 \
-position_encoding -param_init 0 -warmup_steps 100 -param_init_glorot -adam_beta2 0.998 -seed 1111
#
mv /tmp/tmp*e10.pt onmt/tests/test_model.pt
rm /tmp/tmp*.pt
Expand Down
83 changes: 53 additions & 30 deletions onmt/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from __future__ import division
from __future__ import print_function

import math

import torch
import torch.nn as nn

Expand Down Expand Up @@ -40,11 +42,13 @@ def build_trainer(opt, model, fields, optim, data_type, model_saver=None):
shard_size = opt.max_generator_batches
norm_method = opt.normalization
grad_accum_count = opt.accum_count
nb_gpu = len(opt.gpuid)
gpu_rank = opt.gpu_rank

report_manager = onmt.utils.build_report_manager(opt)
trainer = onmt.Trainer(model, train_loss, valid_loss, optim,
trunc_size, shard_size, data_type,
norm_method, grad_accum_count, report_manager,
norm_method, grad_accum_count, nb_gpu, gpu_rank, report_manager,
model_saver=model_saver)
return trainer

Expand Down Expand Up @@ -76,7 +80,7 @@ class Trainer(object):

def __init__(self, model, train_loss, valid_loss, optim,
trunc_size=0, shard_size=32, data_type='text',
norm_method="sents", grad_accum_count=1, report_manager=None,
norm_method="sents", grad_accum_count=1, nb_gpu=1, gpu_rank=1, report_manager=None,
model_saver=None):
# Basic attributes.
self.model = model
Expand All @@ -88,6 +92,8 @@ def __init__(self, model, train_loss, valid_loss, optim,
self.data_type = data_type
self.norm_method = norm_method
self.grad_accum_count = grad_accum_count
self.nb_gpu = nb_gpu
self.gpu_rank = gpu_rank
self.report_manager = report_manager
self.model_saver = model_saver

Expand All @@ -100,6 +106,7 @@ def __init__(self, model, train_loss, valid_loss, optim,
# Set model in training mode.
self.model.train()


def train(self, train_iter_fct, valid_iter_fct, start_epoch, end_epoch):
"""
The main training loops.
Expand Down Expand Up @@ -143,7 +150,9 @@ def train(self, train_iter_fct, valid_iter_fct, start_epoch, end_epoch):
self.epoch_step(valid_stats.ppl(), epoch)

# 4. Drop a checkpoint if needed.
self.maybe_drop_checkpoint(epoch, valid_stats)
if self.gpu_rank == 0:
self.maybe_drop_checkpoint(epoch, valid_stats)


def train_epoch(self, train_iter, epoch):
""" Train next epoch.
Expand Down Expand Up @@ -171,40 +180,44 @@ def train_epoch(self, train_iter, epoch):
# Dynamic batching
num_batches = -1

for _, batch in enumerate(train_iter):
cur_dataset = train_iter.get_cur_dataset()
self.train_loss.cur_dataset = cur_dataset

true_batchs.append(batch)
accum += 1
if self.norm_method == "tokens":
num_tokens = batch.tgt[1:].data.view(-1) \
.ne(self.train_loss.padding_idx).sum()
normalization += num_tokens
else:
normalization += batch.batch_size

if accum == self.grad_accum_count:
self._gradient_accumulation(
true_batchs, total_stats,
report_stats, normalization)

report_stats = self.report_training(
epoch, idx, num_batches,
self.optim.learning_rate,
report_stats)

true_batchs = []
accum = 0
normalization = 0
idx += 1
for i, batch in enumerate(train_iter):
if ( i % self.nb_gpu == self.gpu_rank ) and \
(i < (len(train_iter) - len(train_iter) % self.nb_gpu)):
cur_dataset = train_iter.get_cur_dataset()
self.train_loss.cur_dataset = cur_dataset

true_batchs.append(batch)
accum += 1
if self.norm_method == "tokens":
num_tokens = batch.tgt[1:].data.view(-1) \
.ne(self.train_loss.padding_idx).sum()
normalization += num_tokens
else:
normalization += batch.batch_size

if accum == self.grad_accum_count:
self._gradient_accumulation(
true_batchs, total_stats,
report_stats, normalization)

report_stats = self.report_training(
epoch, idx, num_batches,
self.optim.learning_rate,
report_stats)

true_batchs = []
accum = 0
normalization = 0
idx += 1

if true_batchs:
self._gradient_accumulation(
true_batchs, total_stats,
report_stats, normalization)
true_batchs = []

print("there was %d batches" % i)

return total_stats

def validate(self, valid_iter):
Expand Down Expand Up @@ -320,6 +333,16 @@ def _gradient_accumulation(self, true_batchs, total_stats,
batch, outputs, attns, j,
trunc_size, self.shard_size, normalization)

# 3.bis Multi GPU gradient gather
if self.nb_gpu > 1:
grads = [p.grad.data for p in self.model.parameters() if p.requires_grad]
onmt.utils.multi_utils.all_reduce_and_rescale_tensors(grads, float(self.nb_gpu))
else:
for p in self.model.parameters():
if p.requires_grad:
p.grad.data.div_(float(1))


# 4. Update the parameters and statistics.
if self.grad_accum_count == 1:
self.optim.step()
Expand Down
1 change: 1 addition & 0 deletions onmt/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from onmt.utils.rnn_factory import rnn_factory
from onmt.utils.cnn_factory import StackedCNN
from onmt.utils.loss import build_loss_compute
from onmt.utils.multi_utils import is_master, multi_init, all_reduce_and_rescale_tensors
11 changes: 5 additions & 6 deletions onmt/utils/multi_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import math
import pickle

import random
import torch.distributed


Expand All @@ -12,12 +12,11 @@ def is_master(opt):
def multi_init(opt):
if len(opt.gpuid) == 1:
raise ValueError('Cannot initialize multiprocess with one gpu only')

dist_init_method = 'tcp://localhost:10000'
dist_world_size = len(opt.gpuid)
torch.distributed.init_process_group(
backend=opt.gpu_backend, init_method='tcp://localhost:{port}'.format(
port=random.randint(10000, 20000)),
world_size=len(opt.gpuid), rank=opt.gpu_rank)

backend=opt.gpu_backend, init_method=dist_init_method,
world_size=dist_world_size, rank=opt.gpu_rank)
opt.gpu_rank = torch.distributed.get_rank()
if not is_master(opt):
suppress_output()
Expand Down
5 changes: 0 additions & 5 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ def main(opt):
if torch.cuda.is_available() and not opt.gpuid:
print("WARNING: You have a CUDA device, should run with -gpuid 0")

if opt.gpuid:
cuda.set_device(opt.gpuid[0])
if opt.seed > 0:
torch.cuda.manual_seed(opt.seed)

if len(opt.gpuid) > 1:
multi_main(opt)
else:
Expand Down
11 changes: 6 additions & 5 deletions train_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from __future__ import print_function
from __future__ import division

import argparse
import os

import random
import signal
import torch

import onmt.opts as opts
Expand All @@ -15,7 +15,6 @@
def main(opt):
""" Spawns 1 process per GPU """
nb_gpu = len(opt.gpuid)

mp = torch.multiprocessing.get_context('spawn')

# Create a thread to listen for errors in the child processes.
Expand All @@ -27,8 +26,10 @@ def main(opt):
for i in range(nb_gpu):
opt.gpu_rank = i
opt.device_id = i
procs.append(mp.Process(target=run, opt=(opt, error_queue, ), daemon=True))

procs.append(mp.Process(target=run, args=(opt, error_queue, ), daemon=True))
procs[i].start()
print(" Starting process pid: %d " % procs[i].pid)
error_handler.add_child(procs[i].pid)
for p in procs:
p.join()
Expand All @@ -44,7 +45,7 @@ def run(opt, error_queue):
except Exception:
# propagate exception to parent process, keeping original traceback
import traceback
error_queue.put((opt.gpu, traceback.format_exc()))
error_queue.put((opt.gpu_rank, traceback.format_exc()))


class ErrorHandler(object):
Expand Down
9 changes: 2 additions & 7 deletions train_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ def training_opt_postprocessing(opt):
opt.dec_layers = opt.layers

opt.brnn = (opt.encoder_type == "brnn")
if opt.seed > 0:
random.seed(opt.seed)
torch.manual_seed(opt.seed)

if opt.rnn_type == "SRU" and not opt.gpuid:
raise AssertionError("Using SRU requires -gpuid set.")
Expand All @@ -67,17 +64,15 @@ def training_opt_postprocessing(opt):
print("WARNING: You have a CUDA device, should run with -gpuid 0")

if opt.gpuid:
cuda.set_device(opt.gpuid[0])
torch.cuda.set_device(opt.device_id)
if opt.seed > 0:
torch.cuda.manual_seed(opt.seed)

if len(opt.gpuid) > 1:
sys.stderr.write("Sorry, multigpu isn't supported yet, coming soon!\n")
sys.exit(1)
return opt


def main(opt):
print(" main de train_single ")
opt = training_opt_postprocessing(opt)

# Load checkpoint if we resume from a previous training.
Expand Down

0 comments on commit dce9b26

Please sign in to comment.