-
Notifications
You must be signed in to change notification settings - Fork 5
/
metrics.py
138 lines (117 loc) · 3.45 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import numpy as np
import time
def distance_between_linear_function_and_neural_network(env, actor, K, terminal_err=0.01, rounds=10, steps=500):
"""sum distance between the output of LF and NN
until the state's MSE*dim less than terminal_err
Args:
env (DDPG.Enviorment): Enviorment
actor (DDPG.ActorNetwork): actor
K (numpy.matrix): coefficient of LF
terminal_err(float): when terminal
rounds(int): rounds
steps(int): steps
"""
distance = 0
sum_steps = 0
temp_env_ter_err = env.terminal_err
env.terminal_err = terminal_err
for i in range(rounds):
env.reset()
ep_distance = 0
for s in range(steps):
xk, r, terminal = env.observation()
if r == env.bad_reward:
sum_steps -= s
distance -= ep_distance
if terminal:
break
sum_steps += 1
u1 = actor.predict(np.reshape(np.array(xk), (1, actor.s_dim)))
u2 = K.dot(xk)
env.step(u1)
distance += np.linalg.norm(u1-u2)
ep_distance += np.linalg.norm(u1-u2)
env.terminal_err = temp_env_ter_err
if sum_steps == 0:
return 1
return float(distance)/sum_steps
def neural_network_performance(env, actor, terminal_err=0.01, rounds=10, steps=500):
"""Measured by the steps NN took until
the sum of state absolute value less than terminal_err
Args:
env (DDPG.Enviorment): Enviorment
actor (DDPG.ActorNetwork): actor
terminal_err(float): when terminal
rounds(int): rounds
steps(int): steps
"""
sum_steps = 0
temp_env_ter_err = env.terminal_err
env.terminal_err = terminal_err
success_rounds = rounds
for i in range(rounds):
env.reset()
for s in range(steps):
xk, r, terminal = env.observation()
if r == env.bad_reward:
sum_steps -= s
success_rounds -= 1
if terminal:
break
sum_steps += 1
u = actor.predict(np.reshape(np.array(xk), (1, actor.s_dim)))
env.step(u)
env.terminal_err = temp_env_ter_err
if success_rounds == 0:
return steps+1
return float(sum_steps)/success_rounds
def linear_function_performance(env, K, terminal_err=0.01, rounds=100, steps=500):
"""Measured by the steps LF took until
the sum of state absolute value less than terminal_err
Args:
env (DDPG.Enviorment): Enviorment
K (numpy.matrix): coefficient of LF
terminal_err(float): when terminal
rounds(int): rounds
steps(int): steps
"""
sum_steps = 0
temp_env_ter_err = env.terminal_err
env.terminal_err = terminal_err
for i in range(rounds):
env.reset()
for s in range(steps):
xk, r, terminal = env.observation()
if terminal:
break
sum_steps += 1
u = K.dot(xk)
env.step(u)
env.terminal_err = temp_env_ter_err
return float(sum_steps)/rounds
def timeit(func):
"""Record time a function runs with, print it to standard output
Args:
func (callable): The function measured
"""
def wrapper(*args, **kvargs):
start = time.time()
ret = func(*args, **kvargs)
end = time.time()
t = end-start
print func.__name__, 'run time:', t, 's'
return ret
return wrapper
def find_boundary(x, x_max, x_min):
"""find if x is between x_max and x_min
if not, extending x_max and x_min with x
Args:
x (np.array): state
x_max (np.array): state max values
x_min (np.array): state min values
"""
max_update = (x > x_max)
min_update = (x < x_min)
x_max = np.multiply(x,max_update) + np.multiply(x_max, np.logical_not(max_update))
x_min = np.multiply(x,min_update) + np.multiply(x_min, np.logical_not(min_update))
return x_max, x_min