forked from lazyprogrammer/machine_learning_examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtd_lambda.py
131 lines (105 loc) · 3.66 KB
/
td_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# https://deeplearningcourses.com/c/deep-reinforcement-learning-in-python
# https://www.udemy.com/deep-reinforcement-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
#
# Note: gym changed from version 0.7.3 to 0.8.0
# MountainCar episode length is capped at 200 in later versions.
# This means your agent can't learn as much in the earlier episodes
# since they are no longer as long.
#
# Adapt Q-Learning script to use TD(lambda) method instead
import gym
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from gym import wrappers
from datetime import datetime
# code we already wrote
from q_learning import plot_cost_to_go, FeatureTransformer, plot_running_avg
class BaseModel:
def __init__(self, D):
self.w = np.random.randn(D) / np.sqrt(D)
def partial_fit(self, input_, target, eligibility, lr=1e-2):
self.w += lr*(target - input_.dot(self.w))*eligibility
def predict(self, X):
X = np.array(X)
return X.dot(self.w)
# Holds one BaseModel for each action
class Model:
def __init__(self, env, feature_transformer):
self.env = env
self.models = []
self.feature_transformer = feature_transformer
D = feature_transformer.dimensions
self.eligibilities = np.zeros((env.action_space.n, D))
for i in range(env.action_space.n):
model = BaseModel(D)
self.models.append(model)
def predict(self, s):
X = self.feature_transformer.transform([s])
assert(len(X.shape) == 2)
result = np.stack([m.predict(X) for m in self.models]).T
assert(len(result.shape) == 2)
return result
def update(self, s, a, G, gamma, lambda_):
X = self.feature_transformer.transform([s])
assert(len(X.shape) == 2)
self.eligibilities *= gamma*lambda_
self.eligibilities[a] += X[0]
self.models[a].partial_fit(X[0], G, self.eligibilities[a])
def sample_action(self, s, eps):
if np.random.random() < eps:
return self.env.action_space.sample()
else:
return np.argmax(self.predict(s))
# returns a list of states_and_rewards, and the total reward
def play_one(model, env, eps, gamma, lambda_):
observation = env.reset()
done = False
totalreward = 0
iters = 0
# while not done and iters < 200:
while not done and iters < 10000:
action = model.sample_action(observation, eps)
prev_observation = observation
observation, reward, done, info = env.step(action)
# update the model
next = model.predict(observation)
assert(next.shape == (1, env.action_space.n))
G = reward + gamma*np.max(next[0])
model.update(prev_observation, action, G, gamma, lambda_)
totalreward += reward
iters += 1
return totalreward
if __name__ == '__main__':
env = gym.make('MountainCar-v0')
ft = FeatureTransformer(env)
model = Model(env, ft)
gamma = 0.9999
lambda_ = 0.7
if 'monitor' in sys.argv:
filename = os.path.basename(__file__).split('.')[0]
monitor_dir = './' + filename + '_' + str(datetime.now())
env = wrappers.Monitor(env, monitor_dir)
N = 300
totalrewards = np.empty(N)
costs = np.empty(N)
for n in range(N):
# eps = 1.0/(0.1*n+1)
eps = 0.1*(0.97**n)
# eps = 0.5/np.sqrt(n+1)
totalreward = play_one(model, env, eps, gamma, lambda_)
totalrewards[n] = totalreward
print("episode:", n, "total reward:", totalreward)
print("avg reward for last 100 episodes:", totalrewards[-100:].mean())
print("total steps:", -totalrewards.sum())
plt.plot(totalrewards)
plt.title("Rewards")
plt.show()
plot_running_avg(totalrewards)
# plot the optimal state-value function
plot_cost_to_go(env, model)