forked from lazyprogrammer/machine_learning_examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtd_lambda.py
127 lines (101 loc) · 3.53 KB
/
td_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# https://deeplearningcourses.com/c/deep-reinforcement-learning-in-python
# https://www.udemy.com/deep-reinforcement-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
#
# Note: gym changed from version 0.7.3 to 0.8.0
# MountainCar episode length is capped at 200 in later versions.
# This means your agent can't learn as much in the earlier episodes
# since they are no longer as long.
#
# # Adapt Q-Learning script to use TD(lambda) method instead
import gym
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from gym import wrappers
from datetime import datetime
# code we already wrote
from q_learning import plot_cost_to_go, FeatureTransformer, plot_running_avg
class BaseModel:
def __init__(self, D):
self.w = np.random.randn(D) / np.sqrt(D)
def partial_fit(self, input_, target, eligibility, lr=10e-3):
self.w += lr*(target - input_.dot(self.w))*eligibility
def predict(self, X):
X = np.array(X)
return X.dot(self.w)
# Holds one BaseModel for each action
class Model:
def __init__(self, env, feature_transformer):
self.env = env
self.models = []
self.feature_transformer = feature_transformer
D = feature_transformer.dimensions
self.eligibilities = np.zeros((env.action_space.n, D))
for i in range(env.action_space.n):
model = BaseModel(D)
self.models.append(model)
def predict(self, s):
X = self.feature_transformer.transform([s])
assert(len(X.shape) == 2)
return np.array([m.predict(X)[0] for m in self.models])
def update(self, s, a, G, gamma, lambda_):
X = self.feature_transformer.transform([s])
assert(len(X.shape) == 2)
self.eligibilities *= gamma*lambda_
self.eligibilities[a] += X[0]
self.models[a].partial_fit(X[0], G, self.eligibilities[a])
def sample_action(self, s, eps):
if np.random.random() < eps:
return self.env.action_space.sample()
else:
return np.argmax(self.predict(s))
# returns a list of states_and_rewards, and the total reward
def play_one(model, eps, gamma, lambda_):
observation = env.reset()
done = False
totalreward = 0
iters = 0
# while not done and iters < 200:
while not done and iters < 10000:
action = model.sample_action(observation, eps)
prev_observation = observation
observation, reward, done, info = env.step(action)
# update the model
G = reward + gamma*np.max(model.predict(observation)[0])
model.update(prev_observation, action, G, gamma, lambda_)
totalreward += reward
iters += 1
return totalreward
if __name__ == '__main__':
env = gym.make('MountainCar-v0')
ft = FeatureTransformer(env)
model = Model(env, ft)
gamma = 0.99
lambda_ = 0.7
if 'monitor' in sys.argv:
filename = os.path.basename(__file__).split('.')[0]
monitor_dir = './' + filename + '_' + str(datetime.now())
env = wrappers.Monitor(env, monitor_dir)
N = 300
totalrewards = np.empty(N)
costs = np.empty(N)
for n in range(N):
# eps = 1.0/(0.1*n+1)
eps = 0.1*(0.97**n)
# eps = 0.5/np.sqrt(n+1)
totalreward = play_one(model, eps, gamma, lambda_)
totalrewards[n] = totalreward
print("episode:", n, "total reward:", totalreward)
print("avg reward for last 100 episodes:", totalrewards[-100:].mean())
print("total steps:", -totalrewards.sum())
plt.plot(totalrewards)
plt.title("Rewards")
plt.show()
plot_running_avg(totalrewards)
# plot the optimal state-value function
plot_cost_to_go(env, model)