diff --git a/chapter08/maze.py b/chapter08/maze.py index ec941d66..eb154151 100644 --- a/chapter08/maze.py +++ b/chapter08/maze.py @@ -247,43 +247,46 @@ def sample(self): # Model containing a priority queue for Prioritized Sweeping class PriorityModel(TrivialModel): - def __init__(self, rand=np.random): TrivialModel.__init__(self, rand) # maintain a priority queue - self.priorityQueue = PriorityQueue() + self.priority_queue = PriorityQueue() # track predecessors for every state self.predecessors = dict() # add a @state-@action pair into the priority queue with priority @priority def insert(self, priority, state, action): # note the priority queue is a minimum heap, so we use -priority - self.priorityQueue.add_item((tuple(state), action), -priority) + self.priority_queue.add_item((tuple(state), action), -priority) # @return: whether the priority queue is empty def empty(self): - return self.priorityQueue.empty() + return self.priority_queue.empty() # get the first item in the priority queue def sample(self): - (state, action), priority = self.priorityQueue.pop_item() - newState, reward = self.model[state][action] - return -priority, list(state), action, list(newState), reward + (state, action), priority = self.priority_queue.pop_item() + next_state, reward = self.model[state][action] + state = deepcopy(state) + next_state = deepcopy(next_state) + return -priority, list(state), action, list(next_state), reward # feed the model with previous experience - def feed(self, currentState, action, newState, reward): - TrivialModel.feed(self, currentState, action, newState, reward) - if tuple(newState) not in self.predecessors.keys(): - self.predecessors[tuple(newState)] = set() - self.predecessors[tuple(newState)].add((tuple(currentState), action)) + def feed(self, state, action, next_state, reward): + state = deepcopy(state) + next_state = deepcopy(next_state) + TrivialModel.feed(self, state, action, next_state, reward) + if tuple(next_state) not in self.predecessors.keys(): + self.predecessors[tuple(next_state)] = set() + self.predecessors[tuple(next_state)].add((tuple(state), action)) # get all seen predecessors of a state @state def predecessor(self, state): if tuple(state) not in self.predecessors.keys(): return [] predecessors = [] - for statePre, actionPre in list(self.predecessors[tuple(state)]): - predecessors.append([list(statePre), actionPre, self.model[statePre][actionPre][1]]) + for state_pre, action_pre in list(self.predecessors[tuple(state)]): + predecessors.append([list(state_pre), action_pre, self.model[state_pre][action_pre][1]]) return predecessors @@ -329,13 +332,13 @@ def dyna_q(q_value, model, maze, dyna_params): return steps # play for an episode for prioritized sweeping algorithm -# @stateActionValues: state action pair values, will be updated +# @q_value: state action pair values, will be updated # @model: model instance for planning # @maze: a maze instance containing all information about the environment -# @dynaParams: several params for the algorithm -# @return: # of backups happened in planning phase in this episode -def prioritizedSweeping(stateActionValues, model, maze, dynaParams): - currentState = maze.START_STATE +# @dyna_params: several params for the algorithm +# @return: # of backups during this episode +def prioritized_sweeping(q_value, model, maze, dyna_params): + state = maze.START_STATE # track the steps in this episode steps = 0 @@ -343,51 +346,51 @@ def prioritizedSweeping(stateActionValues, model, maze, dynaParams): # track the backups in planning phase backups = 0 - while currentState not in maze.GOAL_STATES: + while state not in maze.GOAL_STATES: steps += 1 # get action - action = choose_action(currentState, stateActionValues, maze, dynaParams) + action = choose_action(state, q_value, maze, dyna_params) # take action - newState, reward = maze.step(currentState, action) + next_state, reward = maze.step(state, action) # feed the model with experience - model.feed(currentState, action, newState, reward) + model.feed(state, action, next_state, reward) # get the priority for current state action pair - priority = np.abs(reward + dynaParams.gamma * np.max(stateActionValues[newState[0], newState[1], :]) - - stateActionValues[currentState[0], currentState[1], action]) + priority = np.abs(reward + dyna_params.gamma * np.max(q_value[next_state[0], next_state[1], :]) - + q_value[state[0], state[1], action]) - if priority > dynaParams.theta: - model.insert(priority, currentState, action) + if priority > dyna_params.theta: + model.insert(priority, state, action) # start planning - planningStep = 0 + planning_step = 0 # planning for several steps, # although keep planning until the priority queue becomes empty will converge much faster - while planningStep < dynaParams.planningSteps and not model.empty(): + while planning_step < dyna_params.planning_steps and not model.empty(): # get a sample with highest priority from the model - priority, sampleState, sampleAction, sampleNewState, sampleReward = model.sample() + priority, state_, action_, next_state_, reward_ = model.sample() # update the state action value for the sample - delta = sampleReward + dynaParams.gamma * np.max(stateActionValues[sampleNewState[0], sampleNewState[1], :]) - \ - stateActionValues[sampleState[0], sampleState[1], sampleAction] - stateActionValues[sampleState[0], sampleState[1], sampleAction] += dynaParams.alpha * delta + delta = reward_ + dyna_params.gamma * np.max(q_value[next_state_[0], next_state_[1], :]) - \ + q_value[state_[0], state_[1], action_] + q_value[state_[0], state_[1], action_] += dyna_params.alpha * delta # deal with all the predecessors of the sample state - for statePre, actionPre, rewardPre in model.predecessor(sampleState): - priority = np.abs(rewardPre + dynaParams.gamma * np.max(stateActionValues[sampleState[0], sampleState[1], :]) - - stateActionValues[statePre[0], statePre[1], actionPre]) - if priority > dynaParams.theta: - model.insert(priority, statePre, actionPre) - planningStep += 1 + for state_pre, action_pre, reward_pre in model.predecessor(state_): + priority = np.abs(reward_pre + dyna_params.gamma * np.max(q_value[state_[0], state_[1], :]) - + q_value[state_pre[0], state_pre[1], action_pre]) + if priority > dyna_params.theta: + model.insert(priority, state_pre, action_pre) + planning_step += 1 - currentState = newState + state = next_state # update the # of backups - backups += planningStep + backups += planning_step + 1 return backups @@ -510,7 +513,6 @@ def figure_8_4(): plt.savefig('../images/figure_8_4.png') plt.close() - # Figure 8.5, ShortcutMaze def figure_8_5(): # set up a shortcut maze instance @@ -552,90 +554,66 @@ def figure_8_5(): plt.savefig('../images/figure_8_5.png') plt.close() -# Helper function to display best actions, just for debug -def printActions(stateActionValues, maze): - bestActions = [] - for i in range(0, maze.WORLD_HEIGHT): - bestActions.append([]) - for j in range(0, maze.WORLD_WIDTH): - if [i, j] in maze.GOAL_STATES: - bestActions[-1].append('G') - continue - if [i, j] in maze.obstacles: - bestActions[-1].append('X') - continue - bestAction = np.argmax(stateActionValues[i, j, :]) - if bestAction == maze.ACTION_UP: - bestActions[-1].append('U') - if bestAction == maze.ACTION_DOWN: - bestActions[-1].append('D') - if bestAction == maze.ACTION_LEFT: - bestActions[-1].append('L') - if bestAction == maze.ACTION_RIGHT: - bestActions[-1].append('R') - for row in bestActions: - print(row) - print('') - # Check whether state-action values are already optimal -def checkPath(stateActionValues, maze): +def check_path(q_values, maze): # get the length of optimal path # 14 is the length of optimal path of the original maze # 1.2 means it's a relaxed optifmal path - maxSteps = 14 * maze.resolution * 1.2 - currentState = maze.START_STATE + max_steps = 14 * maze.resolution * 1.2 + state = maze.START_STATE steps = 0 - while currentState not in maze.GOAL_STATES: - bestAction = np.argmax(stateActionValues[currentState[0], currentState[1], :]) - currentState, _ = maze.step(currentState, bestAction) + while state not in maze.GOAL_STATES: + action = np.argmax(q_values[state[0], state[1], :]) + state, _ = maze.step(state, action) steps += 1 - if steps > maxSteps: + if steps > max_steps: return False return True -# Figure 8.7, mazes with different resolution -def figure8_7(): +# Example 8.4, mazes with different resolution +def example_8_4(): # get the original 6 * 9 maze - originalMaze = Maze() + original_maze = Maze() # set up the parameters for each algorithm - paramsDyna = DynaParams() - paramsDyna.planning_steps = 5 - paramsDyna.alpha = 0.5 - paramsDyna.gamma = 0.95 + params_dyna = DynaParams() + params_dyna.planning_steps = 5 + params_dyna.alpha = 0.5 + params_dyna.gamma = 0.95 - paramsPrioritized = DynaParams() - paramsPrioritized.theta = 0.0001 - paramsPrioritized.planning_steps = 5 - paramsPrioritized.alpha = 0.5 - paramsPrioritized.gamma = 0.95 + params_prioritized = DynaParams() + params_prioritized.theta = 0.0001 + params_prioritized.planning_steps = 5 + params_prioritized.alpha = 0.5 + params_prioritized.gamma = 0.95 - params = [paramsPrioritized, paramsDyna] + params = [params_prioritized, params_dyna] # set up models for planning models = [PriorityModel, TrivialModel] - methodNames = ['Prioritized Sweeping', 'Dyna-Q'] + method_names = ['Prioritized Sweeping', 'Dyna-Q'] # due to limitation of my machine, I can only perform experiments for 5 mazes - # say 1st maze has w * h states, then k-th maze has w * h * k * k states - numOfMazes = 5 + # assuming the 1st maze has w * h states, then k-th maze has w * h * k * k states + num_of_mazes = 5 # build all the mazes - mazes = [originalMaze.extend_maze(i) for i in range(1, numOfMazes + 1)] - methods = [prioritizedSweeping, dyna_q] - - # track the # of backups - backups = np.zeros((2, numOfMazes)) + mazes = [original_maze.extend_maze(i) for i in range(1, num_of_mazes + 1)] + methods = [prioritized_sweeping, dyna_q] # My machine cannot afford too many runs... runs = 5 + + # track the # of backups + backups = np.zeros((runs, 2, num_of_mazes)) + for run in range(0, runs): - for i in range(0, len(methodNames)): + for i in range(0, len(method_names)): for mazeIndex, maze in zip(range(0, len(mazes)), mazes): - print('run:', run, methodNames[i], 'maze size:', maze.WORLD_HEIGHT * maze.WORLD_WIDTH) + print('run %d, %s, maze size %d' % (run, method_names[i], maze.WORLD_HEIGHT * maze.WORLD_WIDTH)) # initialize the state action values - currentStateActionValues = np.copy(maze.stateActionValues) + q_value = np.zeros(maze.q_size) # track steps / backups for each episode steps = [] @@ -645,34 +623,36 @@ def figure8_7(): # play for an episode while True: - steps.append(methods[i](currentStateActionValues, model, maze, params[i])) + steps.append(methods[i](q_value, model, maze, params[i])) # print best actions w.r.t. current state-action values # printActions(currentStateActionValues, maze) # check whether the (relaxed) optimal path is found - if checkPath(currentStateActionValues, maze): + if check_path(q_value, maze): break # update the total steps / backups for this maze - backups[i][mazeIndex] += np.sum(steps) + backups[run, i, mazeIndex] = np.sum(steps) - # Dyna-Q performs several backups per step - backups[1, :] *= paramsDyna.planning_steps + backups = backups.mean(axis=0) - # average over independent runs - backups /= float(runs) + # Dyna-Q performs several backups per step + backups[1, :] *= params_dyna.planning_steps + 1 - plt.figure(3) - for i in range(0, len(methodNames)): - plt.plot(np.arange(1, numOfMazes + 1), backups[i, :], label=methodNames[i]) + for i in range(0, len(method_names)): + plt.plot(np.arange(1, num_of_mazes + 1), backups[i, :], label=method_names[i]) plt.xlabel('maze resolution factor') plt.ylabel('backups until optimal solution') plt.yscale('log') plt.legend() + plt.savefig('../images/example_8_4.png') + plt.close() + if __name__ == '__main__': # figure_8_2() # figure_8_4() - figure_8_5() + # figure_8_5() + example_8_4() diff --git a/images/example_8_4.png b/images/example_8_4.png new file mode 100644 index 00000000..d551f43b Binary files /dev/null and b/images/example_8_4.png differ