forked from lazyprogrammer/machine_learning_examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq_learning.py
184 lines (153 loc) · 6 KB
/
q_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# https://deeplearningcourses.com/c/deep-reinforcement-learning-in-python
# https://www.udemy.com/deep-reinforcement-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
#
# This takes 4min 30s to run in Python 2.7
# But only 1min 30s to run in Python 3.5!
#
# Note: gym changed from version 0.7.3 to 0.8.0
# MountainCar episode length is capped at 200 in later versions.
# This means your agent can't learn as much in the earlier episodes
# since they are no longer as long.
import gym
import os
import sys
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from gym import wrappers
from datetime import datetime
from sklearn.pipeline import FeatureUnion
from sklearn.preprocessing import StandardScaler
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import SGDRegressor
# SGDRegressor defaults:
# loss='squared_loss', penalty='l2', alpha=0.0001,
# l1_ratio=0.15, fit_intercept=True, n_iter=5, shuffle=True,
# verbose=0, epsilon=0.1, random_state=None, learning_rate='invscaling',
# eta0=0.01, power_t=0.25, warm_start=False, average=False
# Inspired by https://github.com/dennybritz/reinforcement-learning
class FeatureTransformer:
def __init__(self, env, n_components=500):
observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
scaler = StandardScaler()
scaler.fit(observation_examples)
# Used to converte a state to a featurizes represenation.
# We use RBF kernels with different variances to cover different parts of the space
featurizer = FeatureUnion([
("rbf1", RBFSampler(gamma=5.0, n_components=n_components)),
("rbf2", RBFSampler(gamma=2.0, n_components=n_components)),
("rbf3", RBFSampler(gamma=1.0, n_components=n_components)),
("rbf4", RBFSampler(gamma=0.5, n_components=n_components))
])
example_features = featurizer.fit_transform(scaler.transform(observation_examples))
self.dimensions = example_features.shape[1]
self.scaler = scaler
self.featurizer = featurizer
def transform(self, observations):
# print "observations:", observations
scaled = self.scaler.transform(observations)
# assert(len(scaled.shape) == 2)
return self.featurizer.transform(scaled)
# Holds one SGDRegressor for each action
class Model:
def __init__(self, env, feature_transformer, learning_rate):
self.env = env
self.models = []
self.feature_transformer = feature_transformer
for i in range(env.action_space.n):
model = SGDRegressor(learning_rate=learning_rate)
model.partial_fit(feature_transformer.transform( [env.reset()] ), [0])
self.models.append(model)
def predict(self, s):
X = self.feature_transformer.transform([s])
assert(len(X.shape) == 2)
return np.array([m.predict(X)[0] for m in self.models])
def update(self, s, a, G):
X = self.feature_transformer.transform([s])
assert(len(X.shape) == 2)
self.models[a].partial_fit(X, [G])
def sample_action(self, s, eps):
# eps = 0
# Technically, we don't need to do epsilon-greedy
# because SGDRegressor predicts 0 for all states
# until they are updated. This works as the
# "Optimistic Initial Values" method, since all
# the rewards for Mountain Car are -1.
if np.random.random() < eps:
return self.env.action_space.sample()
else:
return np.argmax(self.predict(s))
# returns a list of states_and_rewards, and the total reward
def play_one(model, eps, gamma):
observation = env.reset()
done = False
totalreward = 0
iters = 0
while not done and iters < 10000:
action = model.sample_action(observation, eps)
prev_observation = observation
observation, reward, done, info = env.step(action)
# update the model
G = reward + gamma*np.max(model.predict(observation)[0])
model.update(prev_observation, action, G)
totalreward += reward
iters += 1
return totalreward
def plot_cost_to_go(env, estimator, num_tiles=20):
x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], num=num_tiles)
y = np.linspace(env.observation_space.low[1], env.observation_space.high[1], num=num_tiles)
X, Y = np.meshgrid(x, y)
# both X and Y will be of shape (num_tiles, num_tiles)
Z = np.apply_along_axis(lambda _: -np.max(estimator.predict(_)), 2, np.dstack([X, Y]))
# Z will also be of shape (num_tiles, num_tiles)
fig = plt.figure(figsize=(10, 5))
ax = fig.add_subplot(111, projection='3d')
surf = ax.plot_surface(X, Y, Z,
rstride=1, cstride=1, cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
ax.set_xlabel('Position')
ax.set_ylabel('Velocity')
ax.set_zlabel('Cost-To-Go == -V(s)')
ax.set_title("Cost-To-Go Function")
fig.colorbar(surf)
plt.show()
def plot_running_avg(totalrewards):
N = len(totalrewards)
running_avg = np.empty(N)
for t in range(N):
running_avg[t] = totalrewards[max(0, t-100):(t+1)].mean()
plt.plot(running_avg)
plt.title("Running Average")
plt.show()
if __name__ == '__main__':
env = gym.make('MountainCar-v0')
ft = FeatureTransformer(env)
model = Model(env, ft, "constant")
# learning_rate = 10e-5
# eps = 1.0
gamma = 0.99
if 'monitor' in sys.argv:
filename = os.path.basename(__file__).split('.')[0]
monitor_dir = './' + filename + '_' + str(datetime.now())
env = wrappers.Monitor(env, monitor_dir)
N = 300
totalrewards = np.empty(N)
for n in range(N):
# eps = 1.0/(0.1*n+1)
eps = 0.1*(0.97**n)
# eps = 0.5/np.sqrt(n+1)
totalreward = play_one(model, eps, gamma)
totalrewards[n] = totalreward
print("episode:", n, "total reward:", totalreward)
print("avg reward for last 100 episodes:", totalrewards[-100:].mean())
print("total steps:", -totalrewards.sum())
plt.plot(totalrewards)
plt.title("Rewards")
plt.show()
plot_running_avg(totalrewards)
# plot the optimal state-value function
plot_cost_to_go(env, model)