forked from Josh-Joseph/pseudo_off-policy_policy_search
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cartpole.py
161 lines (124 loc) · 5.72 KB
/
cartpole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import numpy as np
import scipy.stats
import rl_tools
reload(rl_tools)
XBOUND = 90
THETABOUND = np.pi/15
XDOTBOUND = 2
THETADOTBOUND = 2
INITSTATE = np.array([0, 0, 0, 0])
rnd_start = False
print "[cartpole]: Training random start is " + ("On" if rnd_start else "Off")
class Cartpole(rl_tools.Domain):
def __init__(self, input_pars):
self.input_pars = input_pars
self.N_MC_eval_samples = 250
self.episode_length = 500
self.data_columns = ('x','theta','xdot','thetadot','u','r') # assume the 2nd to last is u and the last is r
self.n_dim = 4
self.bounds = np.array([[-XBOUND, XBOUND],[-THETABOUND, THETABOUND],[-XDOTBOUND, XDOTBOUND],[-THETADOTBOUND, THETADOTBOUND]]).transpose()
self.goal = np.array([[-np.inf, np.inf], [-THETABOUND, THETABOUND], [-np.inf, np.inf], [-np.inf, np.inf]]).transpose()
self.initstate = INITSTATE
self.action_centers = np.array([-5, 5])
self.n_x_centers = 1
self.n_theta_centers = 50
self.n_xdot_centers = 30
self.n_thetadot_centers = 30
self.true_pars = (1, 1, 1)
self.optimization_pars = {'initial step size':np.array([5, 5, 5]),
'start':np.array([1, 10, 1]),
'maximum evaluations':50,
'only positive':True}
#self.initial_par_search_space = [[p1, p2] for p1 in np.linspace(-0.003, -.002, 5) for p2 in np.linspace(2, 4, 5)] # TODO
self.noise = input_pars
self.value_iteration_threshold = 1e-5
self.state_centers = self.construct_discrete_policy_centers()
self.dim_centers = rl_tools.split_states_on_dim(self.state_centers)
self.pi_init = 1-np.int8((np.sign(self.state_centers[:,1]) + 1)/2)
self.training_data_random_start = rnd_start
#def distance_fn(self, x1, u1, x2, u2):
# return 1e5*(u1 != u2) + np.sum(((x1-x2)/np.array([1e5, 0.4189, 10, 10]))**2, axis=1)
def distance_fn(self, x1, x2):
return np.sum(((x1-x2)/np.array([1e5, 0.4189, 10, 10]))**2, axis=1)
def at_goal(self, s):
return np.abs(s[1]) == THETABOUND
def reward(self, s):
#return 1 if np.abs(s[1]) < THETABOUND else 0
#return 0 if np.abs(s[1]) < THETABOUND else -1
return 1-np.abs(s[1]/THETABOUND) if np.abs(s[1]) < THETABOUND else 0
def approx_dynamics(self, s, u, pars):
mc = pars[0]
mp = pars[1]
l = pars[2]
g = 9.8
dt = .02
x = s[0]
theta = s[1]
xdot = s[2]
thetadot = s[3]
s = -np.sin(theta)
s2 = s**2
c = -np.cos(theta)
den = mc + mp * s2
xdotdot = (u + (mp * s * (l * (thetadot**2) + g * c))) / den
thetadotdot = (-u * c - mp * l * (thetadot**2) * c * s - (mp + mc) * g * s) / (l * den)
xdot = min(max(xdot + xdotdot * dt, self.bounds[0,2]), self.bounds[1,2])
thetadot = min(max(thetadot + thetadotdot * dt, self.bounds[0,3]), self.bounds[1,3])
x = min(max(x + xdot * dt, self.bounds[0,0]), self.bounds[1,0])
theta = min(max(theta + thetadot * dt, self.bounds[0,1]), self.bounds[1,1])
return np.array([x, theta, xdot, thetadot])
def true_dynamics(self, s, u):
mc = self.true_pars[0]
mp = self.true_pars[1]
l = self.true_pars[2]
g = 9.8
dt = .02
x = s[0]
theta = s[1]
xdot = s[2]
thetadot = s[3]
s = -np.sin(theta)
s2 = s**2
c = -np.cos(theta)
den = mc + mp * s2
xdotdot = (u + (mp * s * (l * (thetadot**2) + g * c))) / den
thetadotdot = (-u * c - mp * l * (thetadot**2) * c * s - (mp + mc) * g * s) / (l * den)
xdot = min(max(xdot + xdotdot * dt, self.bounds[0,2]), self.bounds[1,2])
thetadot = min(max(thetadot + thetadotdot * dt, self.bounds[0,3]), self.bounds[1,3])
x = min(max(x + xdot * dt, self.bounds[0,0]), self.bounds[1,0])
theta = min(max(theta + thetadot * dt + (-s*np.sign(theta)*self.input_pars[0]) + np.random.normal(loc=0, scale=self.input_pars[1]), self.bounds[0,1]), self.bounds[1,1])
return np.array([x, theta, xdot, thetadot])
def true_dynamics_pmf(self, s_i, a_i):
# returns a vector of probability masses the same size as state_centers
s = self.state_centers[s_i,:].copy()
u = self.action_centers[a_i]
pmf = np.zeros(self.state_centers.shape[0])
mc = self.true_pars[0]
mp = self.true_pars[1]
l = self.true_pars[2]
g = 9.8
dt = .02
x = s[0]
theta = s[1]
xdot = s[2]
thetadot = s[3]
s = -np.sin(theta)
s2 = s**2
c = -np.cos(theta)
den = mc + mp * s2
xdotdot = (u + (mp * s * (l * (thetadot**2) + g * c))) / den
thetadotdot = (-u * c - mp * l * (thetadot**2) * c * s - (mp + mc) * g * s) / (l * den)
xdot = min(max(xdot + xdotdot * dt, self.bounds[0,2]), self.bounds[1,2])
thetadot = min(max(thetadot + thetadotdot * dt, self.bounds[0,3]), self.bounds[1,3])
x = min(max(x + xdot * dt, self.bounds[0,0]), self.bounds[1,0])
theta = min(max(theta + thetadot * dt + (-s*np.sign(theta)*self.input_pars[0]), self.bounds[0,1]), self.bounds[1,1])
s = np.array([x, theta, xdot, thetadot])
s_next_i = rl_tools.find_nearest_index_fast(self.dim_centers, s)
if self.noise[1]:
supported_s = np.all(np.equal(self.state_centers[:,2:], self.state_centers[s_next_i,2:]),axis=1)
tmp_pmf = scipy.stats.norm.pdf(self.state_centers[supported_s,1], loc=s[1], scale=self.noise[1])
pmf[supported_s] = tmp_pmf
pmf /= np.sum(pmf)
else:
pmf[s_next_i] = 1.0
return pmf