-
Notifications
You must be signed in to change notification settings - Fork 48
/
train.py
400 lines (291 loc) · 16.3 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
import torch
from torch import nn
import argparse
import os
import pathlib
import importlib
import ssl
import time
import copy
import sys
from datasets import utils as ds_utils
from networks import utils as nt_utils
from runners import utils as rn_utils
from logger import Logger
class TrainingWrapper(object):
@staticmethod
def get_args(parser):
# General options
parser.add('--project_dir', default='.', type=str,
help='root directory of the code')
parser.add('--torch_home', default='', type=str,
help='directory used for storage of the checkpoints')
parser.add('--experiment_name', default='test', type=str,
help='name of the experiment used for logging')
parser.add('--dataloader_name', default='voxceleb2', type=str,
help='name of the file in dataset directory which is used for data loading')
parser.add('--dataset_name', default='voxceleb2_512px', type=str,
help='name of the dataset in the data root folder')
parser.add('--data_root', default=".", type=str,
help='root directory of the data')
parser.add('--debug', action='store_true',
help='turn on the debug mode: fast epoch, useful for testing')
parser.add('--runner_name', default='default', type=str,
help='class that wraps the models and performs training and inference steps')
parser.add('--no_disk_write_ops', action='store_true',
help='avoid doing write operations to disk')
parser.add('--redirect_print_to_file', action='store_true',
help='redirect stdout and stderr to file')
parser.add('--random_seed', default=0, type=int,
help='used for initialization of pytorch and numpy seeds')
# Initialization options
parser.add('--init_experiment_dir', default='', type=str,
help='directory of the experiment used for the initialization of the networks')
parser.add('--init_networks', default='', type=str,
help='list of networks to intialize')
parser.add('--init_which_epoch', default='none', type=str,
help='epoch to initialize from')
parser.add('--which_epoch', default='none', type=str,
help='epoch to continue training from')
# Distributed options
parser.add('--num_gpus', default=1, type=int,
help='>1 enables DDP')
# Training options
parser.add('--num_epochs', default=1000, type=int,
help='number of epochs for training')
parser.add('--checkpoint_freq', default=25, type=int,
help='frequency of checkpoints creation in epochs')
parser.add('--test_freq', default=5, type=int,
help='frequency of testing in epochs')
parser.add('--batch_size', default=1, type=int,
help='batch size across all GPUs')
parser.add('--num_workers_per_process', default=20, type=int,
help='number of workers used for data loading in each process')
parser.add('--skip_test', action='store_true',
help='do not perform testing')
parser.add('--calc_stats', action='store_true',
help='calculate batch norm standing stats')
parser.add('--visual_freq', default=-1, type=int,
help='in iterations, -1 -- output logs every epoch')
# Mixed precision options
parser.add('--use_half', action='store_true',
help='enable half precision calculation')
parser.add('--use_closure', action='store_true',
help='use closure function during optimization (required by LBFGS)')
parser.add('--use_apex', action='store_true',
help='enable apex')
parser.add('--amp_opt_level', default='O0', type=str,
help='full/mixed/half precision, refer to apex.amp docs')
parser.add('--amp_loss_scale', default='dynamic', type=str,
help='fixed or dynamic loss scale')
# Technical options that are set automatically
parser.add('--local_rank', default=0, type=int)
parser.add('--rank', default=0, type=int)
parser.add('--world_size', default=1, type=int)
parser.add('--train_size', default=1, type=int)
# Dataset options
args, _ = parser.parse_known_args()
os.environ['TORCH_HOME'] = args.torch_home
importlib.import_module(f'datasets.{args.dataloader_name}').DatasetWrapper.get_args(parser)
# runner options
importlib.import_module(f'runners.{args.runner_name}').RunnerWrapper.get_args(parser)
return parser
def __init__(self, args, runner=None):
super(TrainingWrapper, self).__init__()
# Initialize and apply general options
ssl._create_default_https_context = ssl._create_unverified_context
torch.backends.cudnn.benchmark = True
torch.manual_seed(args.random_seed)
torch.cuda.manual_seed_all(args.random_seed)
# Set distributed training options
if args.num_gpus > 1 and args.num_gpus <= 8:
args.rank = args.local_rank
args.world_size = args.num_gpus
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl', init_method='env://')
elif args.num_gpus > 8:
raise # Not supported
# Prepare experiment directories and save options
project_dir = pathlib.Path(args.project_dir)
self.checkpoints_dir = project_dir / 'runs' / args.experiment_name / 'checkpoints'
# Store options
if not args.no_disk_write_ops:
os.makedirs(self.checkpoints_dir, exist_ok=True)
self.experiment_dir = project_dir / 'runs' / args.experiment_name
if not args.no_disk_write_ops:
# Redirect stdout
if args.redirect_print_to_file:
logs_dir = self.experiment_dir / 'logs'
os.makedirs(logs_dir, exist_ok=True)
sys.stdout = open(os.path.join(logs_dir, f'stdout_{args.rank}.txt'), 'w')
sys.stderr = open(os.path.join(logs_dir, f'stderr_{args.rank}.txt'), 'w')
if args.rank == 0:
print(args)
with open(self.experiment_dir / 'args.txt', 'wt') as args_file:
for k, v in sorted(vars(args).items()):
args_file.write('%s: %s\n' % (str(k), str(v)))
# Initialize model
self.runner = runner
if self.runner is None:
self.runner = importlib.import_module(f'runners.{args.runner_name}').RunnerWrapper(args)
# Load pre-trained weights (if needed)
init_networks = rn_utils.parse_str_to_list(args.init_networks) if args.init_networks else {}
networks_to_train = self.runner.nets_names_to_train
if args.init_which_epoch != 'none' and args.init_experiment_dir:
for net_name in init_networks:
self.runner.nets[net_name].load_state_dict(torch.load(pathlib.Path(args.init_experiment_dir) / 'checkpoints' / f'{args.init_which_epoch}_{net_name}.pth', map_location='cpu'))
if args.which_epoch != 'none':
for net_name in networks_to_train:
if net_name not in init_networks:
self.runner.nets[net_name].load_state_dict(torch.load(self.checkpoints_dir / f'{args.which_epoch}_{net_name}.pth', map_location='cpu'))
if args.num_gpus > 0:
self.runner.cuda()
if args.rank == 0:
print(self.runner)
def train(self, args):
# Reset amp
if args.use_apex:
from apex import amp
amp.init(False)
# Get dataloaders
train_dataloader = ds_utils.get_dataloader(args, 'train')
if not args.skip_test:
test_dataloader = ds_utils.get_dataloader(args, 'test')
model = runner = self.runner
if args.use_half:
runner.half()
# Initialize optimizers, schedulers and apex
opts = runner.get_optimizers(args)
# Load pre-trained params for optimizers and schedulers (if needed)
if args.which_epoch != 'none' and not args.init_experiment_dir:
for net_name, opt in opts.items():
opt.load_state_dict(torch.load(self.checkpoints_dir / f'{args.which_epoch}_opt_{net_name}.pth', map_location='cpu'))
if args.use_apex and args.num_gpus > 0 and args.num_gpus <= 8:
# Enfornce apex mixed precision settings
nets_list, opts_list = [], []
for net_name in sorted(opts.keys()):
nets_list.append(runner.nets[net_name])
opts_list.append(opts[net_name])
loss_scale = float(args.amp_loss_scale) if args.amp_loss_scale != 'dynamic' else args.amp_loss_scale
nets_list, opts_list = amp.initialize(nets_list, opts_list, opt_level=args.amp_opt_level, num_losses=1, loss_scale=loss_scale)
# Unpack opts_list into optimizers
for net_name, net, opt in zip(sorted(opts.keys()), nets_list, opts_list):
runner.nets[net_name] = net
opts[net_name] = opt
if args.which_epoch != 'none' and not args.init_experiment_dir and os.path.exists(self.checkpoints_dir / f'{args.which_epoch}_amp.pth'):
amp.load_state_dict(torch.load(self.checkpoints_dir / f'{args.which_epoch}_amp.pth', map_location='cpu'))
# Initialize apex distributed data parallel wrapper
if args.num_gpus > 1 and args.num_gpus <= 8:
from apex import parallel
model = parallel.DistributedDataParallel(runner, delay_allreduce=True)
epoch_start = 1 if args.which_epoch == 'none' else int(args.which_epoch) + 1
# Initialize logging
train_iter = epoch_start - 1
if args.visual_freq != -1:
train_iter /= args.visual_freq
logger = Logger(args, self.experiment_dir)
logger.set_num_iter(
train_iter=train_iter,
test_iter=(epoch_start - 1) // args.test_freq)
if args.debug and not args.use_apex:
torch.autograd.set_detect_anomaly(True)
total_iters = 1
for epoch in range(epoch_start, args.num_epochs + 1):
if args.rank == 0:
print('epoch %d' % epoch)
# Train for one epoch
model.train()
time_start = time.time()
# Shuffle the dataset before the epoch
train_dataloader.dataset.shuffle()
for i, data_dict in enumerate(train_dataloader, 1):
# Prepare input data
if args.num_gpus > 0 and args.num_gpus > 0:
for key, value in data_dict.items():
data_dict[key] = value.cuda()
# Convert inputs to FP16
if args.use_half:
for key, value in data_dict.items():
data_dict[key] = value.half()
output_logs = i == len(train_dataloader)
if args.visual_freq != -1:
output_logs = not (total_iters % args.visual_freq)
output_visuals = output_logs and not args.no_disk_write_ops
# Accumulate list of optimizers that will perform opt step
for opt in opts.values():
opt.zero_grad()
# Perform a forward pass
if not args.use_closure:
loss = model(data_dict)
closure = None
if args.use_apex and args.num_gpus > 0 and args.num_gpus <= 8:
# Mixed precision requires a special wrapper for the loss
with amp.scale_loss(loss, opts.values()) as scaled_loss:
scaled_loss.backward()
elif not args.use_closure:
loss.backward()
else:
def closure():
loss = model(data_dict)
loss.backward()
return loss
# Perform steps for all optimizers
for opt in opts.values():
opt.step(closure)
if output_logs:
logger.output_logs('train', runner.output_visuals(), runner.output_losses(), time.time() - time_start)
if args.debug:
break
if args.visual_freq != -1:
total_iters += 1
total_iters %= args.visual_freq
# Increment the epoch counter in the training dataset
train_dataloader.dataset.epoch += 1
# If testing is not required -- continue
if epoch % args.test_freq:
continue
# If skip test flag is set -- only check if a checkpoint if required
if not args.skip_test:
# Calculate "standing" stats for the batch normalization
if args.calc_stats:
runner.calculate_batchnorm_stats(train_dataloader, args.debug)
# Test
time_start = time.time()
model.eval()
for data_dict in test_dataloader:
# Prepare input data
if args.num_gpus > 0:
for key, value in data_dict.items():
data_dict[key] = value.cuda()
# Forward pass
with torch.no_grad():
model(data_dict)
if args.debug:
break
# Output logs
logger.output_logs('test', runner.output_visuals(), runner.output_losses(), time.time() - time_start)
# If creation of checkpoint is not required -- continue
if epoch % args.checkpoint_freq and not args.debug:
continue
# Create or load a checkpoint
if args.rank == 0 and not args.no_disk_write_ops:
with torch.no_grad():
for net_name in runner.nets_names_to_train:
# Save a network
torch.save(runner.nets[net_name].state_dict(), self.checkpoints_dir / f'{epoch}_{net_name}.pth')
# Save an optimizer
torch.save(opts[net_name].state_dict(), self.checkpoints_dir / f'{epoch}_opt_{net_name}.pth')
# Save amp
if args.use_apex:
torch.save(amp.state_dict(), self.checkpoints_dir / f'{epoch}_amp.pth')
return runner
if __name__ == "__main__":
## Parse options ##
parser = argparse.ArgumentParser(conflict_handler='resolve')
parser.add = parser.add_argument
TrainingWrapper.get_args(parser)
args, _ = parser.parse_known_args()
## Initialize the model ##
m = TrainingWrapper(args)
## Perform training ##
nets = m.train(args)