1
+ '''
2
+ Contains Agent for implementation of Q Learning Algorithms
3
+
4
+ Refs:
5
+ https://discussions.udacity.com/t/how-do-i-capture-two-states-in-order-to-implement-the-q-learning-algorithm/191327/2
6
+
7
+ '''
8
+
1
9
import random
2
10
import pdb
11
+ from pprint import pprint
12
+
3
13
from environment import Agent , Environment
4
14
from planner import RoutePlanner
5
15
from simulator import Simulator
@@ -17,60 +27,90 @@ def __init__(self, env):
17
27
self .color = 'red' # override color
18
28
self .planner = RoutePlanner (self .env , self ) # simple route planner to get next_waypoint
19
29
# TODO: Initialize any additional variables here
20
- # row will be state - light, oncoming, left, right
21
- # col will be action - None, forward, left, right
22
- self .Q_table = [[ 0 ] * 4 ] * 4
23
- # learning variable
30
+ self . state = None
31
+ self . reward = None
32
+ self .action = None
33
+ self . alpha = 0.8
24
34
self .gamma = 0.8
25
- #
26
- self .state = []
35
+ self .epsilon = 0.8
36
+ self .prev_state = None
37
+ self .prev_reward = None
38
+ self .prev_action = None
39
+ self .Q = dict ()
27
40
28
41
def reset (self , destination = None ):
29
42
self .planner .route_to (destination )
30
43
# TODO: Prepare for a new trip; reset any variables here, if required
31
44
45
+ def get_q_val (self , state , action ):
46
+ try :
47
+ return self .Q [(self .state , action )]
48
+ except KeyError :
49
+ self .Q [(self .state , action )] = 0
50
+ return 0
51
+
52
+ def best_q_action (self , s ):
53
+ # greedy selection to find the best action according to Q
54
+ # returns action & reward
55
+ max_reward = 0
56
+ best_action = ''
57
+ if self .prev_state or (random .random () > self .epsilon ):
58
+ for act in ALL_ACTIONS :
59
+ tmp = self .get_q_val (s , act )
60
+ if max_reward <= tmp :
61
+ max_reward = tmp
62
+ best_action = act
63
+ else :
64
+ best_action = random .choice (ALL_ACTIONS )
65
+ max_reward = self .get_q_val (s , best_action )
66
+ return best_action , max_reward
67
+
68
+ def update_q_policy (self ):
69
+ # Q Learning Policy Updation
70
+ if self .prev_state :
71
+ s , r , a = self .prev_state , self .prev_reward , self .prev_action
72
+ s1 , r1 , a1 = self .state , self .reward , self .action
73
+ self .Q [(s , a )] = self .get_q_val (s , a ) + \
74
+ (self .alpha * ( r + \
75
+ self .gamma * (
76
+ # max([self.get_q_val(s1, a1) - self.get_q_val(s, a)]) )
77
+ self .get_q_val (s1 , a1 ) - self .get_q_val (s , a )
78
+ )
79
+ )
80
+ )
81
+
32
82
def update (self , t ):
33
83
# Gather inputs
34
84
self .next_waypoint = self .planner .next_waypoint () # from route planner, also displayed by simulator
35
85
inputs = self .env .sense (self )
36
86
deadline = self .env .get_deadline (self )
37
87
38
88
# TODO: Update state
39
- self .state = [inputs [_ ] for _ in ALL_ACTIONS ]
89
+ state = (inputs ['light' ],
90
+ inputs ['left' ],
91
+ inputs ['oncoming' ],
92
+ inputs ['right' ],
93
+ self .next_waypoint )
94
+ self .state = state
40
95
41
96
# TODO: Select action according to your policy
42
- action = None
43
- if GEAR == 0 :
44
- # manual driving - set
45
- pdb .set_trace () # update action accordingly
46
- elif GEAR == 1 :
47
- # random
48
- action = (None , 'forward' , 'left' , 'right' )[random .randrange (0 ,4 )]
49
- elif GEAR == 2 :
50
- # controlled
51
- if inputs ['light' ] == 'red' and \
52
- not (inputs ['left' ] == inputs ['left' ] == inputs ['left' ] == None ):
53
- action = None
54
- else :
55
- action = self .next_waypoint
56
- elif GEAR == 3 :
57
- # reckless
58
- action = self .next_waypoint
59
- elif GEAR == 4 :
60
- # controlled2
61
- if inputs ['light' ] == 'red' and \
62
- not (inputs ['left' ] == inputs ['left' ] == inputs ['left' ] == None ):
63
- action = None
64
- if self .next_waypoint == 'right' :
65
- action = self .next_waypoint
66
- else :
67
- action = self .next_waypoint
97
+ action , expected_reward = self .best_q_action (self .state )
68
98
69
99
# Execute action and get reward
70
100
reward = self .env .act (self , action )
101
+ self .reward = reward
102
+ print 'LearningAgent.update(): expected_reward = {}, received_reward = {}' .format (expected_reward , reward )
71
103
72
104
# TODO: Learn policy based on state, action, reward
73
-
105
+ self .update_q_policy ()
106
+
107
+ # passing values for next state
108
+ self .prev_state = state
109
+ self .prev_action = action
110
+ self .prev_reward = reward
111
+ if deadline == 0 :
112
+ pprint (self .Q )
113
+ # pdb.set_trace()
74
114
print "LearningAgent.update(): deadline = {}, inputs = {}, action = {}, reward = {}" .format (deadline , inputs , action , reward ) # [debug]
75
115
76
116
@@ -84,10 +124,10 @@ def run():
84
124
# NOTE: You can set enforce_deadline=False while debugging to allow longer trials
85
125
86
126
# Now simulate it
87
- sim = Simulator (e , update_delay = 0.5 , display = True ) # create simulator (uses pygame when display=True, if available)
127
+ sim = Simulator (e , update_delay = 0.0005 , display = False ) # create simulator (uses pygame when display=True, if available)
88
128
# NOTE: To speed up simulation, reduce update_delay and/or set display=False
89
129
90
- sim .run (n_trials = 2 ) # run for a specified number of trials
130
+ sim .run (n_trials = 100 ) # run for a specified number of trials
91
131
# NOTE: To quit midway, press Esc or close pygame window, or hit Ctrl+C on the command-line
92
132
93
133
0 commit comments