-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
101 lines (73 loc) · 3.37 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import numpy as np
import cvxpy as cp
from train import idx_state
class FeatureEstimate:
def __init__(self, feature_num, env):
self.env = env
self.feature_num = feature_num
self.feature = np.ones(self.feature_num)
def gaussian_function(self, x, mu):
return np.exp(-np.power(x - mu, 2.) / (2 * np.power(1., 2.)))
def get_features(self, state):
env_low = self.env.observation_space.low
env_high = self.env.observation_space.high
env_distance = (env_high - env_low) / (self.feature_num - 1)
for i in range(int(self.feature_num/2)):
# position
self.feature[i] = self.gaussian_function(state[0],
env_low[0] + i * env_distance[0])
# velocity
self.feature[i+int(self.feature_num/2)] = self.gaussian_function(state[1],
env_low[1] + i * env_distance[1])
return self.feature
def calc_feature_expectation(feature_num, gamma, q_table, demonstrations, env):
feature_estimate = FeatureEstimate(feature_num, env)
feature_expectations = np.zeros(feature_num)
demo_num = len(demonstrations)
for _ in range(demo_num):
state = env.reset()
demo_length = 0
done = False
while not done:
demo_length += 1
state_idx = idx_state(env, state)
action = np.argmax(q_table[state_idx])
next_state, reward, done, _ = env.step(action)
features = feature_estimate.get_features(next_state)
feature_expectations += (gamma**(demo_length)) * np.array(features)
state = next_state
feature_expectations = feature_expectations/ demo_num
return feature_expectations
def expert_feature_expectation(feature_num, gamma, demonstrations, env):
feature_estimate = FeatureEstimate(feature_num, env)
feature_expectations = np.zeros(feature_num)
for demo_num in range(len(demonstrations)):
for demo_length in range(len(demonstrations[0])):
state = demonstrations[demo_num][demo_length]
features = feature_estimate.get_features(state)
feature_expectations += (gamma**(demo_length)) * np.array(features)
feature_expectations = feature_expectations / len(demonstrations)
return feature_expectations
def QP_optimizer(feature_num, learner, expert):
w = cp.Variable(feature_num)
obj_func = cp.Minimize(cp.norm(w))
constraints = [(expert-learner) * w >= 2]
prob = cp.Problem(obj_func, constraints)
prob.solve()
if prob.status == "optimal":
print("status:", prob.status)
print("optimal value", prob.value)
weights = np.squeeze(np.asarray(w.value))
return weights, prob.status
else:
print("status:", prob.status)
weights = np.zeros(feature_num)
return weights, prob.status
def add_feature_expectation(learner, temp_learner):
# save new feature expectation to list after RL step
learner = np.vstack([learner, temp_learner])
return learner
def subtract_feature_expectation(learner):
# if status is infeasible, subtract first feature expectation
learner = learner[1:][:]
return learner