forked from PacktPublishing/Deep-Learning-with-Keras
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
dinesh-packt
authored
Apr 25, 2017
1 parent
5ddebdf
commit c1b0367
Showing
4 changed files
with
497 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
from keras.models import Sequential | ||
from keras.layers import Dense | ||
from keras.layers import Reshape | ||
from keras.layers.core import Activation | ||
from keras.layers.normalization import BatchNormalization | ||
from keras.layers.convolutional import UpSampling2D | ||
from keras.layers.convolutional import Convolution2D, MaxPooling2D | ||
from keras.layers.core import Flatten | ||
from keras.optimizers import SGD | ||
from keras.datasets import mnist | ||
import numpy as np | ||
from PIL import Image | ||
import argparse | ||
import math | ||
|
||
|
||
def generator_model(): | ||
model = Sequential() | ||
model.add(Dense(input_dim=100, output_dim=1024)) | ||
model.add(Activation('tanh')) | ||
model.add(Dense(128*7*7)) | ||
model.add(BatchNormalization()) | ||
model.add(Activation('tanh')) | ||
model.add(Reshape((128, 7, 7), input_shape=(128*7*7,))) | ||
model.add(UpSampling2D(size=(2, 2))) | ||
model.add(Convolution2D(64, 5, 5, border_mode='same')) | ||
model.add(Activation('tanh')) | ||
model.add(UpSampling2D(size=(2, 2))) | ||
model.add(Convolution2D(1, 5, 5, border_mode='same')) | ||
model.add(Activation('tanh')) | ||
return model | ||
|
||
|
||
def discriminator_model(): | ||
model = Sequential() | ||
model.add(Convolution2D( | ||
64, 5, 5, | ||
border_mode='same', | ||
input_shape=(1, 28, 28))) | ||
model.add(Activation('tanh')) | ||
model.add(MaxPooling2D(pool_size=(2, 2))) | ||
model.add(Convolution2D(128, 5, 5)) | ||
model.add(Activation('tanh')) | ||
model.add(MaxPooling2D(pool_size=(2, 2))) | ||
model.add(Flatten()) | ||
model.add(Dense(1024)) | ||
model.add(Activation('tanh')) | ||
model.add(Dense(1)) | ||
model.add(Activation('sigmoid')) | ||
return model | ||
|
||
|
||
def generator_containing_discriminator(generator, discriminator): | ||
model = Sequential() | ||
model.add(generator) | ||
discriminator.trainable = False | ||
model.add(discriminator) | ||
return model | ||
|
||
|
||
def combine_images(generated_images): | ||
num = generated_images.shape[0] | ||
width = int(math.sqrt(num)) | ||
height = int(math.ceil(float(num)/width)) | ||
shape = generated_images.shape[2:] | ||
image = np.zeros((height*shape[0], width*shape[1]), | ||
dtype=generated_images.dtype) | ||
for index, img in enumerate(generated_images): | ||
i = int(index/width) | ||
j = index % width | ||
image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1]] = \ | ||
img[0, :, :] | ||
return image | ||
|
||
|
||
def train(BATCH_SIZE): | ||
(X_train, y_train), (X_test, y_test) = mnist.load_data() | ||
X_train = (X_train.astype(np.float32) - 127.5)/127.5 | ||
X_train = X_train.reshape((X_train.shape[0], 1) + X_train.shape[1:]) | ||
discriminator = discriminator_model() | ||
generator = generator_model() | ||
discriminator_on_generator = \ | ||
generator_containing_discriminator(generator, discriminator) | ||
d_optim = SGD(lr=0.0005, momentum=0.9, nesterov=True) | ||
g_optim = SGD(lr=0.0005, momentum=0.9, nesterov=True) | ||
generator.compile(loss='binary_crossentropy', optimizer="SGD") | ||
discriminator_on_generator.compile( | ||
loss='binary_crossentropy', optimizer=g_optim) | ||
discriminator.trainable = True | ||
discriminator.compile(loss='binary_crossentropy', optimizer=d_optim) | ||
noise = np.zeros((BATCH_SIZE, 100)) | ||
for epoch in range(100): | ||
print("Epoch is", epoch) | ||
print("Number of batches", int(X_train.shape[0]/BATCH_SIZE)) | ||
for index in range(int(X_train.shape[0]/BATCH_SIZE)): | ||
for i in range(BATCH_SIZE): | ||
noise[i, :] = np.random.uniform(-1, 1, 100) | ||
image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE] | ||
generated_images = generator.predict(noise, verbose=0) | ||
if index % 20 == 0: | ||
image = combine_images(generated_images) | ||
image = image*127.5+127.5 | ||
Image.fromarray(image.astype(np.uint8)).save( | ||
str(epoch)+"_"+str(index)+".png") | ||
X = np.concatenate((image_batch, generated_images)) | ||
y = [1] * BATCH_SIZE + [0] * BATCH_SIZE | ||
d_loss = discriminator.train_on_batch(X, y) | ||
print("batch %d d_loss : %f" % (index, d_loss)) | ||
for i in range(BATCH_SIZE): | ||
noise[i, :] = np.random.uniform(-1, 1, 100) | ||
discriminator.trainable = False | ||
g_loss = discriminator_on_generator.train_on_batch( | ||
noise, [1] * BATCH_SIZE) | ||
discriminator.trainable = True | ||
print("batch %d g_loss : %f" % (index, g_loss)) | ||
if index % 10 == 9: | ||
generator.save_weights('generator', True) | ||
discriminator.save_weights('discriminator', True) | ||
|
||
|
||
def generate(BATCH_SIZE, nice=False): | ||
generator = generator_model() | ||
generator.compile(loss='binary_crossentropy', optimizer="SGD") | ||
generator.load_weights('generator') | ||
if nice: | ||
discriminator = discriminator_model() | ||
discriminator.compile(loss='binary_crossentropy', optimizer="SGD") | ||
discriminator.load_weights('discriminator') | ||
noise = np.zeros((BATCH_SIZE*20, 100)) | ||
for i in range(BATCH_SIZE*20): | ||
noise[i, :] = np.random.uniform(-1, 1, 100) | ||
generated_images = generator.predict(noise, verbose=1) | ||
d_pret = discriminator.predict(generated_images, verbose=1) | ||
index = np.arange(0, BATCH_SIZE*20) | ||
index.resize((BATCH_SIZE*20, 1)) | ||
pre_with_index = list(np.append(d_pret, index, axis=1)) | ||
pre_with_index.sort(key=lambda x: x[0], reverse=True) | ||
nice_images = np.zeros((BATCH_SIZE, 1) + | ||
(generated_images.shape[2:]), dtype=np.float32) | ||
for i in range(int(BATCH_SIZE)): | ||
idx = int(pre_with_index[i][1]) | ||
nice_images[i, 0, :, :] = generated_images[idx, 0, :, :] | ||
image = combine_images(nice_images) | ||
else: | ||
noise = np.zeros((BATCH_SIZE, 100)) | ||
for i in range(BATCH_SIZE): | ||
noise[i, :] = np.random.uniform(-1, 1, 100) | ||
generated_images = generator.predict(noise, verbose=1) | ||
image = combine_images(generated_images) | ||
image = image*127.5+127.5 | ||
Image.fromarray(image.astype(np.uint8)).save( | ||
"generated_image.png") | ||
|
||
|
||
def get_args(): | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("--mode", type=str) | ||
parser.add_argument("--batch_size", type=int, default=128) | ||
parser.add_argument("--nice", dest="nice", action="store_true") | ||
parser.set_defaults(nice=False) | ||
args = parser.parse_args() | ||
return args | ||
|
||
if __name__ == "__main__": | ||
args = get_args() | ||
if args.mode == "train": | ||
train(BATCH_SIZE=args.batch_size) | ||
elif args.mode == "generate": | ||
generate(BATCH_SIZE=args.batch_size, nice=args.nice) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
import matplotlib as mpl | ||
|
||
# This line allows mpl to run with no DISPLAY defined | ||
mpl.use('Agg') | ||
|
||
import pandas as pd | ||
import numpy as np | ||
import os | ||
from keras.layers import Reshape, Flatten, LeakyReLU, Activation | ||
from keras.layers.convolutional import UpSampling2D, MaxPooling2D | ||
from keras.models import Sequential | ||
from keras.optimizers import Adam | ||
from keras.callbacks import TensorBoard | ||
from keras_adversarial.image_grid_callback import ImageGridCallback | ||
|
||
from keras_adversarial import AdversarialModel, simple_gan, gan_targets | ||
from keras_adversarial import AdversarialOptimizerSimultaneous, normal_latent_sampling | ||
from keras_adversarial.legacy import Dense, BatchNormalization, fit, l1l2, Convolution2D, AveragePooling2D | ||
import keras.backend as K | ||
from cifar10_utils import cifar10_data | ||
from image_utils import dim_ordering_fix, dim_ordering_unfix, dim_ordering_shape | ||
|
||
|
||
def model_generator(): | ||
model = Sequential() | ||
nch = 256 | ||
reg = lambda: l1l2(l1=1e-7, l2=1e-7) | ||
h = 5 | ||
model.add(Dense(nch * 4 * 4, input_dim=100, W_regularizer=reg())) | ||
model.add(BatchNormalization(mode=0)) | ||
model.add(Reshape(dim_ordering_shape((nch, 4, 4)))) | ||
model.add(Convolution2D(nch / 2, h, h, border_mode='same', W_regularizer=reg())) | ||
model.add(BatchNormalization(mode=0, axis=1)) | ||
model.add(LeakyReLU(0.2)) | ||
model.add(UpSampling2D(size=(2, 2))) | ||
model.add(Convolution2D(nch / 2, h, h, border_mode='same', W_regularizer=reg())) | ||
model.add(BatchNormalization(mode=0, axis=1)) | ||
model.add(LeakyReLU(0.2)) | ||
model.add(UpSampling2D(size=(2, 2))) | ||
model.add(Convolution2D(nch / 4, h, h, border_mode='same', W_regularizer=reg())) | ||
model.add(BatchNormalization(mode=0, axis=1)) | ||
model.add(LeakyReLU(0.2)) | ||
model.add(UpSampling2D(size=(2, 2))) | ||
model.add(Convolution2D(3, h, h, border_mode='same', W_regularizer=reg())) | ||
model.add(Activation('sigmoid')) | ||
return model | ||
|
||
|
||
def model_discriminator(): | ||
nch = 256 | ||
h = 5 | ||
reg = lambda: l1l2(l1=1e-7, l2=1e-7) | ||
|
||
c1 = Convolution2D(nch / 4, h, h, border_mode='same', W_regularizer=reg(), | ||
input_shape=dim_ordering_shape((3, 32, 32))) | ||
c2 = Convolution2D(nch / 2, h, h, border_mode='same', W_regularizer=reg()) | ||
c3 = Convolution2D(nch, h, h, border_mode='same', W_regularizer=reg()) | ||
c4 = Convolution2D(1, h, h, border_mode='same', W_regularizer=reg()) | ||
|
||
model = Sequential() | ||
model.add(c1) | ||
model.add(MaxPooling2D(pool_size=(2, 2))) | ||
model.add(LeakyReLU(0.2)) | ||
model.add(c2) | ||
model.add(MaxPooling2D(pool_size=(2, 2))) | ||
model.add(LeakyReLU(0.2)) | ||
model.add(c3) | ||
model.add(MaxPooling2D(pool_size=(2, 2))) | ||
model.add(LeakyReLU(0.2)) | ||
model.add(c4) | ||
model.add(AveragePooling2D(pool_size=(4, 4), border_mode='valid')) | ||
model.add(Flatten()) | ||
model.add(Activation('sigmoid')) | ||
return model | ||
|
||
|
||
def example_gan(adversarial_optimizer, path, opt_g, opt_d, nb_epoch, generator, discriminator, latent_dim, | ||
targets=gan_targets, loss='binary_crossentropy'): | ||
csvpath = os.path.join(path, "history.csv") | ||
if os.path.exists(csvpath): | ||
print("Already exists: {}".format(csvpath)) | ||
return | ||
|
||
print("Training: {}".format(csvpath)) | ||
# gan (x - > yfake, yreal), z is gaussian generated on GPU | ||
# can also experiment with uniform_latent_sampling | ||
generator.summary() | ||
discriminator.summary() | ||
gan = simple_gan(generator=generator, | ||
discriminator=discriminator, | ||
latent_sampling=normal_latent_sampling((latent_dim,))) | ||
|
||
# build adversarial model | ||
model = AdversarialModel(base_model=gan, | ||
player_params=[generator.trainable_weights, discriminator.trainable_weights], | ||
player_names=["generator", "discriminator"]) | ||
model.adversarial_compile(adversarial_optimizer=adversarial_optimizer, | ||
player_optimizers=[opt_g, opt_d], | ||
loss=loss) | ||
|
||
# create callback to generate images | ||
zsamples = np.random.normal(size=(10 * 10, latent_dim)) | ||
|
||
def generator_sampler(): | ||
xpred = dim_ordering_unfix(generator.predict(zsamples)).transpose((0, 2, 3, 1)) | ||
return xpred.reshape((10, 10) + xpred.shape[1:]) | ||
|
||
generator_cb = ImageGridCallback(os.path.join(path, "epoch-{:03d}.png"), generator_sampler, cmap=None) | ||
|
||
# train model | ||
xtrain, xtest = cifar10_data() | ||
y = targets(xtrain.shape[0]) | ||
ytest = targets(xtest.shape[0]) | ||
callbacks = [generator_cb] | ||
if K.backend() == "tensorflow": | ||
callbacks.append( | ||
TensorBoard(log_dir=os.path.join(path, 'logs'), histogram_freq=0, write_graph=True, write_images=True)) | ||
history = fit(model, x=dim_ordering_fix(xtrain), y=y, validation_data=(dim_ordering_fix(xtest), ytest), | ||
callbacks=callbacks, nb_epoch=nb_epoch, | ||
batch_size=32) | ||
|
||
# save history to CSV | ||
df = pd.DataFrame(history.history) | ||
df.to_csv(csvpath) | ||
|
||
# save models | ||
generator.save(os.path.join(path, "generator.h5")) | ||
discriminator.save(os.path.join(path, "discriminator.h5")) | ||
|
||
|
||
def main(): | ||
# z \in R^100 | ||
latent_dim = 100 | ||
# x \in R^{28x28} | ||
# generator (z -> x) | ||
generator = model_generator() | ||
# discriminator (x -> y) | ||
discriminator = model_discriminator() | ||
example_gan(AdversarialOptimizerSimultaneous(), "output/gan-cifar10", | ||
opt_g=Adam(1e-4, decay=1e-5), | ||
opt_d=Adam(1e-3, decay=1e-5), | ||
nb_epoch=100, generator=generator, discriminator=discriminator, | ||
latent_dim=latent_dim) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.