forked from dbolya/yolact
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathyolact.py
462 lines (355 loc) · 19.1 KB
/
yolact.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
import torch, torchvision
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models.resnet import Bottleneck
import numpy as np
from itertools import product
from math import sqrt
from data.config import cfg, mask_type
from layers import Detect
from layers.interpolate import InterpolateModule
from backbone import construct_backbone
import torch.backends.cudnn as cudnn
from utils import timer
from utils.functions import MovingAverage
class PredictionModule(nn.Module):
"""
The (c) prediction module adapted from DSSD:
https://arxiv.org/pdf/1701.06659.pdf
Note that this is slightly different to the module in the paper
because the Bottleneck block actually has a 3x3 convolution in
the middle instead of a 1x1 convolution. Though, I really can't
be arsed to implement it myself, and, who knows, this might be
better.
Args:
- in_channels: The input feature size.
- out_channels: The output feature size (must be a multiple of 4).
- aspect_ratios: A list of lists of priorbox aspect ratios (one list per scale).
- scales: A list of priorbox scales relative to this layer's convsize.
For instance: If this layer has convouts of size 30x30 for
an image of size 600x600, the 'default' (scale
of 1) for this layer would produce bounding
boxes with an area of 20x20px. If the scale is
.5 on the other hand, this layer would consider
bounding boxes with area 10x10px, etc.
- parent: If parent is a PredictionModule, this module will use all the layers
from parent instead of from this module.
"""
def __init__(self, in_channels, out_channels=1024, aspect_ratios=[[1]], scales=[1], parent=None):
super().__init__()
self.num_classes = cfg.num_classes
self.mask_dim = cfg.mask_dim
self.num_priors = sum(len(x) for x in aspect_ratios)
self.parent = [parent] # Don't include this in the state dict
if cfg.mask_proto_prototypes_as_features:
out_channels += self.mask_dim
if parent is None:
if cfg.use_prediction_module:
self.block = Bottleneck(in_channels, out_channels // 4)
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=True)
self.bn = nn.BatchNorm2d(out_channels)
self.bbox_layer = nn.Conv2d(out_channels, self.num_priors * 4, kernel_size=3, padding=1)
self.conf_layer = nn.Conv2d(out_channels, self.num_priors * self.num_classes, kernel_size=3, padding=1)
self.mask_layer = nn.Conv2d(out_channels, self.num_priors * self.mask_dim, kernel_size=3, padding=1)
# What is this ugly lambda doing in the middle of all this clean prediction module code?
def make_extra(num_layers):
if num_layers == 0:
return lambda x: x
else:
# Looks more complicated than it is. This just creates an array of num_layers alternating conv-relu
return nn.Sequential(*sum([[
nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
nn.ReLU(inplace=True)
] for _ in range(num_layers)], []))
self.bbox_extra, self.conf_extra, self.mask_extra = [make_extra(x) for x in cfg.extra_layers]
if cfg.mask_type == mask_type.lincomb and cfg.mask_proto_coeff_gate:
self.gate_layer = nn.Conv2d(out_channels, self.num_priors * self.mask_dim, kernel_size=3, padding=1)
self.aspect_ratios = aspect_ratios
self.scales = scales
self.priors = None
self.last_conv_size = None
def forward(self, x):
"""
Args:
- x: The convOut from a layer in the backbone network
Size: [batch_size, in_channels, conv_h, conv_w])
Returns a tuple (bbox_coords, class_confs, mask_output, prior_boxes) with sizes
- bbox_coords: [batch_size, conv_h*conv_w*num_priors, 4]
- class_confs: [batch_size, conv_h*conv_w*num_priors, num_classes]
- mask_output: [batch_size, conv_h*conv_w*num_priors, mask_dim]
- prior_boxes: [conv_h*conv_w*num_priors, 4]
"""
# In case we want to use another module's layers
src = self if self.parent[0] is None else self.parent[0]
conv_h = x.size(2)
conv_w = x.size(3)
if cfg.use_prediction_module:
# The two branches of PM design (c)
a = src.block(x)
b = src.conv(x)
b = src.bn(b)
b = F.relu(b)
# TODO: Possibly switch this out for a product
x = a + b
bbox_x = src.bbox_extra(x)
conf_x = src.conf_extra(x)
mask_x = src.mask_extra(x)
bbox = src.bbox_layer(bbox_x).permute(0, 2, 3, 1).contiguous().view(x.size(0), -1, 4)
conf = src.conf_layer(conf_x).permute(0, 2, 3, 1).contiguous().view(x.size(0), -1, self.num_classes)
mask = src.mask_layer(mask_x).permute(0, 2, 3, 1).contiguous().view(x.size(0), -1, self.mask_dim)
# See box_utils.decode for an explaination of this
if cfg.use_yolo_regressors:
bbox[:, :, :2] = torch.sigmoid(bbox[:, :, :2]) - 0.5
bbox[:, :, 0] /= conv_w
bbox[:, :, 1] /= conv_h
if cfg.mask_type == mask_type.direct:
mask = torch.sigmoid(mask)
elif cfg.mask_type == mask_type.lincomb:
mask = cfg.mask_proto_coeff_activation(mask)
if cfg.mask_proto_coeff_gate:
gate = src.gate_layer(x).permute(0, 2, 3, 1).contiguous().view(x.size(0), -1, self.mask_dim)
mask = mask * torch.sigmoid(gate)
priors = self.make_priors(conv_h, conv_w)
return (bbox, conf, mask, priors)
def make_priors(self, conv_h, conv_w):
""" Note that priors are [x,y,width,height] where (x,y) is the center of the box. """
with timer.env('makepriors'):
if self.last_conv_size != (conv_w, conv_h):
prior_data = []
# Iteration order is important (it has to sync up with the convout)
for j, i in product(range(conv_h), range(conv_w)):
# +0.5 because priors are in center-size notation
x = (i + 0.5) / conv_w
y = (j + 0.5) / conv_h
for scale, ars in zip(self.scales, self.aspect_ratios):
for ar in ars:
w = scale * ar / conv_w
h = scale / ar / conv_h
prior_data += [x, y, w, h]
self.priors = torch.Tensor(prior_data).view(-1, 4)
self.last_conv_size = (conv_w, conv_h)
return self.priors
class FPN(nn.Module):
"""
Implements a general version of the FPN introduced in
https://arxiv.org/pdf/1612.03144.pdf
Prameters (in cfg.fpn):
- num_features (int): The number of output features in the fpn layers.
- interpolation_mode (str): The mode to pass to F.interpolate.
- num_downsample (int): The number of downsampled layers to add onto the selected layers.
These extra layers are downsampled from the last selected layer.
Args:
- in_channels (list): For each conv layer you supply in the forward pass,
how many features will it have?
"""
def __init__(self, in_channels):
super().__init__()
self.lat_layers = nn.ModuleList([
nn.Conv2d(x, cfg.fpn.num_features, kernel_size=1)
for x in reversed(in_channels) # Reversed because we loop backward
])
self.pred_layers = nn.ModuleList([
nn.Conv2d(cfg.fpn.num_features, cfg.fpn.num_features, kernel_size=3)
for _ in in_channels
])
def forward(self, convouts):
"""
Args:
- convouts (list): A list of convouts for the corresponding layers in in_channels.
Returns:
- A list of FPN convouts in the same order as x with extra downsample layers if requested.
"""
# Fill this backward, then reverse later
out = []
x = 0
# Loop backward through the layers
for i, (conv, lat, pred) in enumerate(zip(reversed(convouts), self.lat_layers, self.pred_layers)):
if i > 0:
_, _, h, w = conv.size()
x = F.interpolate(x, size=(h, w), mode=cfg.fpn.interpolation_mode, align_corners=False)
x = x + lat(conv)
out.append(F.relu(pred(x)))
out.reverse()
# In the original paper, this takes care of P6
for _ in range(cfg.fpn.num_downsample):
# I decied against putting the stride in the conv layers because the prediction module conv layers
# are shared, so it would be hard to add stride to them. I should probably have shared the weights
# and not the conv layers themselves, but eh, it was easier that way. I doubt this is that slow either.
out.append(out[-1][:, :, ::2, ::2]) # A stride 2 view on out[-1] along both height and width
return out
class Yolact(nn.Module):
"""
██╗ ██╗ ██████╗ ██╗ █████╗ ██████╗████████╗
╚██╗ ██╔╝██╔═══██╗██║ ██╔══██╗██╔════╝╚══██╔══╝
╚████╔╝ ██║ ██║██║ ███████║██║ ██║
╚██╔╝ ██║ ██║██║ ██╔══██║██║ ██║
██║ ╚██████╔╝███████╗██║ ██║╚██████╗ ██║
╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝
You can set the arguments by chainging them in the backbone config object in config.py.
Parameters (in cfg.backbone):
- selected_layers: The indices of the conv layers to use for prediction.
- pred_scales: A list with len(selected_layers) containing tuples of scales (see PredictionModule)
- pred_aspect_ratios: A list of lists of aspect ratios with len(selected_layers) (see PredictionModule)
"""
def __init__(self):
super().__init__()
self.backbone = construct_backbone(cfg.backbone)
# Compute mask_dim here and add it back to the config. Make sure Yolact's constructor is called early!
if cfg.mask_type == mask_type.direct:
cfg.mask_dim = cfg.mask_size**2
elif cfg.mask_type == mask_type.lincomb:
if cfg.mask_proto_use_grid:
self.grid = torch.Tensor(np.load(cfg.mask_proto_grid_file))
self.num_grids = self.grid.size(0)
else:
self.num_grids = 0
self.proto_src = cfg.mask_proto_src
in_channels = 3 if self.proto_src is None else self.backbone.channels[self.proto_src]
in_channels += self.num_grids
def make_layer(layer_cfg):
nonlocal in_channels
num_channels = layer_cfg[0]
kernel_size = layer_cfg[1]
# Possible patterns:
# ( 256, 3, {}) -> conv
# ( 256,-2, {}) -> deconv
# (None,-2, {}) -> bilinear interpolate
#
# You know it would have probably been simpler just to adopt a 'c' 'd' 'u' naming scheme.
# Whatever, it's too late now.
if kernel_size > 0:
layer = nn.Conv2d(in_channels, num_channels, kernel_size, **layer_cfg[2])
else:
if num_channels is None:
layer = InterpolateModule(scale_factor=-kernel_size, mode='bilinear', align_corners=False, **layer_cfg[2])
else:
layer = nn.ConvTranspose2d(in_channels, num_channels, -kernel_size, **layer_cfg[2])
in_channels = num_channels if num_channels is not None else in_channels
return [layer, nn.ReLU(inplace=True)]
# The -1 here is to remove the last relu because we might want to change it to another function
self.proto_net = nn.Sequential(*(sum([make_layer(x) for x in cfg.mask_proto_net], [])[:-1]))
cfg.mask_dim = in_channels
if cfg.mask_proto_bias:
cfg.mask_dim += 1
self.selected_layers = cfg.backbone.selected_layers
self.prediction_layers = nn.ModuleList()
src_channels = self.backbone.channels
if cfg.fpn is not None:
# Some hacky rewiring to accomodate the FPN
self.fpn = FPN([src_channels[i] for i in self.selected_layers])
self.selected_layers = list(range(len(self.selected_layers) + cfg.fpn.num_downsample))
src_channels = [cfg.fpn.num_features] * len(self.selected_layers)
for idx, layer_idx in enumerate(self.selected_layers):
# If we're sharing prediction module weights, have every module's parent be the first one
parent = None
if cfg.share_prediction_module and idx > 0:
parent = self.prediction_layers[0]
pred = PredictionModule(src_channels[layer_idx], src_channels[layer_idx],
aspect_ratios = cfg.backbone.pred_aspect_ratios[idx],
scales = cfg.backbone.pred_scales[idx],
parent = parent)
self.prediction_layers.append(pred)
# For use in evaluation
self.detect = Detect(cfg.num_classes, bkg_label=0, top_k=200, conf_thresh=0.01, nms_thresh=0.45)
def save_weights(self, path):
""" Saves the model's weights using compression because the file sizes were getting too big. """
torch.save(self.state_dict(), path)
def load_weights(self, path):
""" Loads weights from a compressed save file. """
state_dict = torch.load(path)
# For backward compatability, remove these (the new variable is called layers)
keys = list(state_dict.keys())
for key in keys:
if key.startswith('backbone.layer') and not key.startswith('backbone.layers'):
del state_dict[key]
self.load_state_dict(state_dict)
def init_weights(self, backbone_path):
""" Initialize weights for training. """
# Initialize the backbone with the pretrained weights.
self.backbone.init_backbone(backbone_path)
# Initialize the rest of the conv layers with xavier
for module in self.modules():
if isinstance(module, nn.Conv2d) and module not in self.backbone.backbone_modules:
nn.init.xavier_uniform_(module.weight.data)
if module.bias is not None:
module.bias.data.zero_()
def forward(self, x):
""" The input should be of size [batch_size, 3, img_h, img_w] """
with timer.env('pass1'):
outs = self.backbone(x)
if cfg.fpn is not None:
with timer.env('fpn'):
# Use backbone.selected_layers because we overwrote self.selected_layers
outs = [outs[i] for i in cfg.backbone.selected_layers]
outs = self.fpn(outs)
if cfg.mask_type == mask_type.lincomb:
with timer.env('proto'):
proto_x = x if self.proto_src is None else outs[self.proto_src]
if self.num_grids > 0:
grids = self.grid.repeat(proto_x.size(0), 1, 1, 1)
proto_x = torch.cat([proto_x, grids], dim=1)
proto_out = self.proto_net(proto_x)
proto_out = cfg.mask_proto_prototype_activation(proto_out)
if cfg.mask_proto_prototypes_as_features:
# Clone here because we don't want to permute this, though idk if contiguous makes this unnecessary
proto_downsampled = proto_out.clone()
if cfg.mask_proto_prototypes_as_features_no_grad:
proto_downsampled = proto_out.detach()
# Move the features last so the multipliaction is easy
proto_out = proto_out.permute(0, 2, 3, 1).contiguous()
if cfg.mask_proto_bias:
bias_shape = [x for x in proto_out.size()]
bias_shape[-1] = 1
proto_out = torch.cat([proto_out, torch.ones(*bias_shape)], -1)
with timer.env('pass2'):
pred_outs = ([], [], [], [])
for idx, pred_layer in zip(self.selected_layers, self.prediction_layers):
pred_x = outs[idx]
if cfg.mask_type == mask_type.lincomb and cfg.mask_proto_prototypes_as_features:
# Scale the prototypes down to the current prediction layer's size and add it as inputs
proto_downsampled = F.interpolate(proto_downsampled, size=outs[idx].size()[2:], mode='bilinear', align_corners=False)
pred_x = torch.cat([pred_x, proto_downsampled], dim=1)
p = pred_layer(pred_x)
for out, pred in zip(pred_outs, p):
out.append(pred)
pred_outs = [torch.cat(x, -2) for x in pred_outs]
if cfg.mask_type == mask_type.lincomb:
pred_outs.append(proto_out)
if self.training:
return pred_outs
else:
pred_outs[1] = F.softmax(pred_outs[1], -1) # Softmax the conf output
return self.detect(*pred_outs)
# Some testing code
if __name__ == '__main__':
from utils.functions import init_console
init_console()
net = Yolact()
net.train()
net.init_weights(backbone_path='weights/' + cfg.backbone.path)
# GPU
net = net.cuda()
cudnn.benchmark = True
torch.set_default_tensor_type('torch.cuda.FloatTensor')
x = torch.zeros((1, 3, cfg.max_size, cfg.max_size))
y = net(x)
for p in net.prediction_layers:
print(p.last_conv_size)
print()
for a in y:
print(a.size(), torch.sum(a))
exit()
net(x)
# timer.disable('pass2')
avg = MovingAverage()
try:
while True:
timer.reset()
with timer.env('everything else'):
net(x)
avg.add(timer.total_time())
print('\033[2J') # Moves console cursor to 0,0
timer.print_stats()
print('Avg fps: %.2f\tAvg ms: %.2f ' % (1/avg.get_avg(), avg.get_avg()*1000))
except KeyboardInterrupt:
pass