-
Notifications
You must be signed in to change notification settings - Fork 6
/
MyModel.py
123 lines (106 loc) · 5.03 KB
/
MyModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.preprocessing.image import ImageDataGenerator, NumpyArrayIterator
from tensorflow.keras.initializers import VarianceScaling
import numpy as np
def autoencoder(dims, act='relu'):
"""
Fully connected auto-encoder model, symmetric.
Arguments:
dims: list of number of units in each layer of encoder. dims[0] is input dim, dims[-1] is units in hidden layer.
The decoder is symmetric with encoder. So number of layers of the auto-encoder is 2*len(dims)-1
act: activation, not applied to Input, Hidden and Output layers
return:
(ae_model, encoder_model), Model of autoencoder and model of encoder
"""
n_stacks = len(dims) - 1
init = VarianceScaling(scale=1. / 3., mode='fan_in', distribution='uniform')
# input
x = Input(shape=(dims[0],), name='input')
h = x
# internal layers in encoder
for i in range(n_stacks-1):
h = Dense(dims[i + 1], activation=act, kernel_initializer=init, name='encoder_%d' % i)(h)
# hidden layer
h = Dense(dims[-1], kernel_initializer=init, name='encoder_%d' % (n_stacks - 1))(h)
y = h
# internal layers in decoder
for i in range(n_stacks-1, 0, -1):
y = Dense(dims[i], activation=act, kernel_initializer=init, name='decoder_%d' % i)(y)
# output
y = Dense(dims[0], kernel_initializer=init, name='decoder_0')(y)
return Model(inputs=x, outputs=y, name='AE'), Model(inputs=x, outputs=h, name='encoder')
class MyIterator(NumpyArrayIterator):
"""
The only difference with NumpyArrayIterator is this.next() returns (samples, index) while NumpyArrayIterator
returns samples
"""
def next(self):
with self.lock:
index_array = next(self.index_generator)
# The transformation of images is not under thread lock
# so it can be done in parallel
return self._get_batches_of_transformed_samples(index_array), index_array
class MyImageGenerator(ImageDataGenerator):
"""
The only difference with ImageDataGenerator is this.flow().next() returns (samples, index) while ImageDataGenerator
returns samples
"""
def flow(self, x, y=None, batch_size=32, shuffle=True, seed=None,
save_to_dir=None, save_prefix='', save_format='png'):
return MyIterator(
x, y, self,
batch_size=batch_size,
shuffle=shuffle,
seed=seed,
data_format=self.data_format,
save_to_dir=save_to_dir,
save_prefix=save_prefix,
save_format=save_format)
def generator(image_generator, x, y=None, sample_weight=None, batch_size=32, shuffle=True):
"""
Data generator that supplies training batches for Model().fit_generator.
:param image_generator: MyImageGenerator, defines and applies transformations for the input images
:param x: input image data, supports shape=[n_samples, width, height, channels] and [n_samples, n_features]
:param y: the target of the network's output
:param sample_weight: weight for x, shape=[n_samples]
:param batch_size: batch size
:param shuffle: whether to shuffle the data
:return: An iterator, outputs [batch_x, batch_x] or [batch_x, batch_y] or
[batch_x, batch_y, batch_sample_weight] each time
"""
if len(x.shape) > 2: # image
gen0, idx = image_generator.flow(x, shuffle=shuffle, batch_size=batch_size)
while True:
batch_x = gen0.next()
result = [batch_x] + \
[batch_x if y is None else y[idx]] + \
([] if sample_weight is None else [sample_weight[idx]])
yield tuple(result)
else: # if the sample is represented by vector, need to reshape to matrix and then flatten back
width = int(np.sqrt(x.shape[-1]))
if width * width == x.shape[-1]: # gray
im_shape = [-1, width, width, 1]
else: # RGB
width = int(np.sqrt(x.shape[-1] / 3.0))
im_shape = [-1, width, width, 3]
gen0 = image_generator.flow(np.reshape(x, im_shape), shuffle=shuffle, batch_size=batch_size)
while True:
batch_x, idx = gen0.next()
batch_x = np.reshape(batch_x, [batch_x.shape[0], x.shape[-1]])
result = [batch_x] + \
[batch_x if y is None else y[idx]] + \
([] if sample_weight is None else [sample_weight[idx]])
yield tuple(result)
def random_transform(x, datagen):
if len(x.shape) > 2: # image
return datagen.flow(x, shuffle=False, batch_size=x.shape[0]).next()
# if input a flattened vector, reshape to image before transform
width = int(np.sqrt(x.shape[-1]))
if width * width == x.shape[-1]: # gray
im_shape = [-1, width, width, 1]
else: # RGB
width = int(np.sqrt(x.shape[-1] / 3.0))
im_shape = [-1, width, width, 3]
gen = datagen.flow(np.reshape(x, im_shape), shuffle=False, batch_size=x.shape[0])
return np.reshape(gen.next(), x.shape)