forked from lazyprogrammer/machine_learning_examples
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrbm.py
128 lines (104 loc) · 4.58 KB
/
rbm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# https://www.udemy.com/unsupervised-deep-learning-in-python
import numpy as np
import theano
import theano.tensor as T
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from theano.tensor.shared_randomstreams import RandomStreams
from util import relu, error_rate, getKaggleMNIST, init_weights
from autoencoder import DNN
class RBM(object):
def __init__(self, M, an_id):
self.M = M
self.id = an_id
self.rng = RandomStreams()
def fit(self, X, learning_rate=0.1, epochs=1, batch_sz=100, show_fig=False):
N, D = X.shape
n_batches = N / batch_sz
W0 = init_weights((D, self.M))
self.W = theano.shared(W0, 'W_%s' % self.id)
self.c = theano.shared(np.zeros(self.M), 'c_%s' % self.id)
self.b = theano.shared(np.zeros(D), 'b_%s' % self.id)
self.params = [self.W, self.c, self.b]
self.forward_params = [self.W, self.c]
# we won't use this to fit the RBM but we will use these for backpropagation later
# TODO: technically they should be reset before doing backprop
self.dW = theano.shared(np.zeros(W0.shape), 'dW_%s' % self.id)
self.dc = theano.shared(np.zeros(self.M), 'dbh_%s' % self.id)
self.db = theano.shared(np.zeros(D), 'dbo_%s' % self.id)
self.dparams = [self.dW, self.dc, self.db]
self.forward_dparams = [self.dW, self.dc]
X_in = T.matrix('X_%s' % self.id)
# attach it to the object so it can be used later
# must be sigmoidal because the output is also a sigmoid
H = T.nnet.sigmoid(X_in.dot(self.W) + self.c)
self.hidden_op = theano.function(
inputs=[X_in],
outputs=H,
)
# we won't use this cost to do any updates
# but we would like to see how this cost function changes
# as we do contrastive divergence
X_hat = self.forward_output(X_in)
cost = -(X_in * T.log(X_hat) + (1 - X_in) * T.log(1 - X_hat)).sum() / (batch_sz * D)
cost_op = theano.function(
inputs=[X_in],
outputs=cost,
)
# do one round of Gibbs sampling to obtain X_sample
H = self.sample_h_given_v(X_in)
X_sample = self.sample_v_given_h(H)
# define the objective, updates, and train function
objective = T.mean(self.free_energy(X_in)) - T.mean(self.free_energy(X_sample))
# need to consider X_sample constant because you can't take the gradient of random numbers in Theano
updates = [(p, p - learning_rate*T.grad(objective, p, consider_constant=[X_sample])) for p in self.params]
train_op = theano.function(
inputs=[X_in],
updates=updates,
)
costs = []
print "training rbm: %s" % self.id
for i in xrange(epochs):
print "epoch:", i
X = shuffle(X)
for j in xrange(n_batches):
batch = X[j*batch_sz:(j*batch_sz + batch_sz)]
train_op(batch)
the_cost = cost_op(X) # technically we could also get the cost for Xtest here
print "j / n_batches:", j, "/", n_batches, "cost:", the_cost
costs.append(the_cost)
if show_fig:
plt.plot(costs)
plt.show()
def free_energy(self, V):
return -V.dot(self.b) - T.sum(T.log(1 + T.exp(V.dot(self.W) + self.c)), axis=1)
def sample_h_given_v(self, V):
p_h_given_v = T.nnet.sigmoid(V.dot(self.W) + self.c)
h_sample = self.rng.binomial(size=p_h_given_v.shape, n=1, p=p_h_given_v)
return h_sample
def sample_v_given_h(self, H):
p_v_given_h = T.nnet.sigmoid(H.dot(self.W.T) + self.b)
v_sample = self.rng.binomial(size=p_v_given_h.shape, n=1, p=p_v_given_h)
return v_sample
def forward_hidden(self, X):
return T.nnet.sigmoid(X.dot(self.W) + self.c)
def forward_output(self, X):
Z = self.forward_hidden(X)
Y = T.nnet.sigmoid(Z.dot(self.W.T) + self.b)
return Y
@staticmethod
def createFromArrays(W, c, b, an_id):
rbm = AutoEncoder(W.shape[1], an_id)
rbm.W = theano.shared(W, 'W_%s' % rbm.id)
rbm.c = theano.shared(c, 'c_%s' % rbm.id)
rbm.b = theano.shared(b, 'b_%s' % rbm.id)
rbm.params = [rbm.W, rbm.c, rbm.b]
rbm.forward_params = [rbm.W, rbm.c]
return rbm
def main():
Xtrain, Ytrain, Xtest, Ytest = getKaggleMNIST()
dnn = DNN([1000, 750, 500], UnsupervisedModel=RBM)
dnn.fit(Xtrain, Ytrain, Xtest, Ytest, epochs=3)
# we compare with no pretraining in autoencoder.py
if __name__ == '__main__':
main()