forked from iotb415/DDP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aws.py
248 lines (189 loc) · 9.25 KB
/
aws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import time
import sys
import torch
if __name__ == '__main__':
torch.multiprocessing.set_start_method('spawn')
import torch.nn as nn
import torch.nn.parallel
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torch.multiprocessing import Pool, Process
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def accuracy(output, target, topk=(1,)):
"""Computes the precision@k for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter()
data_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to train mode 转到训练模式
model.train()
end = time.time()
for i, (input, target) in enumerate(train_loader):
# measure data loading time 计算加载数据的时间
data_time.update(time.time() - end)
# Create non_blocking tensors for distributed training 为分布式训练创建 non_blocking 张量
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# compute output 计算输出
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss 计算准确率并记录 loss
prec1, prec5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), input.size(0))
top1.update(prec1[0], input.size(0))
top5.update(prec5[0], input.size(0))
# compute gradients in a backward pass 在反向传播中计算梯度
optimizer.zero_grad()
loss.backward()
# Call step of optimizer to update model params 调用一个 optimizer 步骤来更新模型参数
optimizer.step()
# measure elapsed time 计算花费的时间
batch_time.update(time.time() - end)
end = time.time()
if i % 10 == 0:
print('Epoch: [{0}][{1}/{2}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
epoch, i, len(train_loader), batch_time=batch_time,
data_time=data_time, loss=losses, top1=top1, top5=top5))
def adjust_learning_rate(initial_lr, optimizer, epoch):
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
lr = initial_lr * (0.1 ** (epoch // 30))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def validate(val_loader, model, criterion):
batch_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to evaluate mode 转到验证模式
model.eval()
with torch.no_grad():
end = time.time()
for i, (input, target) in enumerate(val_loader):
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# compute output 计算输出
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss 计算准确率并记录 loss
prec1, prec5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), input.size(0))
top1.update(prec1[0], input.size(0))
top5.update(prec5[0], input.size(0))
# measure elapsed time 计算花费时间
batch_time.update(time.time() - end)
end = time.time()
if i % 100 == 0:
print('Test: [{0}/{1}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
i, len(val_loader), batch_time=batch_time, loss=losses,
top1=top1, top5=top5))
print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}'
.format(top1=top1, top5=top5))
return top1.avg
print("Collect Inputs...")
# Batch Size for training and testing 训练和测试的 batch size
batch_size = 32
# Number of additional worker processes for dataloading 数据加载的额外工作进程数
workers = 2
# Number of epochs to train for 训练的 epoch 数
num_epochs = 2
# Starting Learning Rate 初始学习率
starting_lr = 0.1
# Number of distributed processes 分布式进程数
world_size = 4
# Distributed backend type 分布式后端类型
dist_backend = 'nccl'
# Url used to setup distributed training 设置分布式训练的 url
dist_url = "tcp://172.31.22.234:23456"
print("Initialize Process Group...")
# Initialize Process Group 初始化进程组
# v1 - init with url 使用 url 初始化
dist.init_process_group(backend=dist_backend, init_method=dist_url, rank=int(sys.argv[1]), world_size=world_size)
# v2 - init with file 使用文件初始化
# dist.init_process_group(backend="nccl", init_method="file:///home/ubuntu/pt-distributed-tutorial/trainfile", rank=int(sys.argv[1]), world_size=world_size)
# v3 - init with environment variables 使用环境变量初始化
# dist.init_process_group(backend="nccl", init_method="env://", rank=int(sys.argv[1]), world_size=world_size)
# Establish Local Rank and set device on this node 设置节点的本地化编号和设备
local_rank = int(sys.argv[2])
dp_device_ids = [local_rank]
torch.cuda.set_device(local_rank)
print("Initialize Model...")
# Construct Model 构建模型
model = models.resnet18(pretrained=False).cuda()
# Make model DistributedDataParallel
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=dp_device_ids, output_device=local_rank)
# define loss function (criterion) and optimizer 定义 loss 函数和 optimizer
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), starting_lr, momentum=0.9, weight_decay=1e-4)
print("Initialize Dataloaders...")
# Define the transform for the data. Notice, we must resize to 224x224 with this dataset and model. 定义数据的变换。尺寸转为224x224
transform = transforms.Compose(
[transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# Initialize Datasets. STL10 will automatically download if not present 初始化数据集。如果没有STL10数据集则会自动下载
trainset = datasets.STL10(root='./data', split='train', download=True, transform=transform)
valset = datasets.STL10(root='./data', split='test', download=True, transform=transform)
# Create DistributedSampler to handle distributing the dataset across nodes when training 创建分布式采样器来控制训练中节点间的数据分发
# This can only be called after torch.distributed.init_process_group is called 这个只能在 torch.distributed.init_process_group 被调用后调用
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
# Create the Dataloaders to feed data to the training and validation steps 创建数据加载器,在训练和验证步骤中喂数据
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=False)
best_prec1 = 0
for epoch in range(num_epochs):
# Set epoch count for DistributedSampler 为分布式采样器设置 epoch 数
train_sampler.set_epoch(epoch)
# Adjust learning rate according to schedule 调整学习率
adjust_learning_rate(starting_lr, optimizer, epoch)
# train for one epoch 训练1个 epoch
print("\nBegin Training Epoch {}".format(epoch+1))
train(train_loader, model, criterion, optimizer, epoch)
# evaluate on validation set 在验证集上验证
print("Begin Validation @ Epoch {}".format(epoch+1))
prec1 = validate(val_loader, model, criterion)
# remember best prec@1 and save checkpoint if desired 保存最佳的prec@1,如果需要的话保存检查点
# is_best = prec1 > best_prec1
best_prec1 = max(prec1, best_prec1)
print("Epoch Summary: ")
print("\tEpoch Accuracy: {}".format(prec1))
print("\tBest Accuracy: {}".format(best_prec1))