|
| 1 | +import numpy as np |
| 2 | + |
| 3 | + |
| 4 | +class BlackJackAgent: |
| 5 | + def __init__(self, method, env, function='V', gamma=0.99, epsilon=0.1): |
| 6 | + self.method = method |
| 7 | + self.values = {(i, j, b): 0 for i in range(env.observation_space[0].n) for j in range(env.observation_space[1].n) for b in [True, False]} |
| 8 | + self.vreturns = {(i, j, b): [] for i in range(env.observation_space[0].n) for j in range(env.observation_space[1].n) for b in [True, False]} |
| 9 | + self.qs = {(i, j, b, a): 10 for i in range(env.observation_space[0].n) for j in range(env.observation_space[1].n) for b in [True, False] for a in range(env.action_space.n)} |
| 10 | + self.qreturns = {(i, j, b, a): [] for i in range(env.observation_space[0].n) for j in range(env.observation_space[1].n) for b in [True, False] for a in range(env.action_space.n)} |
| 11 | + self.value_function = lambda i, j, k: self.values[(i, j, k)] |
| 12 | + self.q_function = lambda i, j, k, l: self.qs[(i, j, k, l)] |
| 13 | + self.get_state_name = lambda state: (state[0], state[1], state[2]) |
| 14 | + self.get_state_action_name = lambda state, action: (state[0], state[1], state[2], action) |
| 15 | + self.gamma = gamma |
| 16 | + self.actions = list(range(env.action_space.n)) |
| 17 | + self.policy = {state: 0 for state in self.values.keys()} |
| 18 | + self.epsilon = epsilon |
| 19 | + self.function = function |
| 20 | + |
| 21 | + def choose_action(self, state): |
| 22 | + sum_, show, ace = state |
| 23 | + if self.method == 'lucky': |
| 24 | + return self.feeling_lucky(sum_) |
| 25 | + if self.method == 'egreedy': |
| 26 | + return self.epsilon_greedy(state) |
| 27 | + |
| 28 | + def epsilon_greedy(self, state): |
| 29 | + if np.random.random() < self.epsilon: |
| 30 | + return np.random.choice(self.actions) |
| 31 | + else: |
| 32 | + state_name = self.get_state_name(state) |
| 33 | + return self.policy[state_name] |
| 34 | + |
| 35 | + def feeling_lucky(self, sum_): |
| 36 | + if sum_ < 20: |
| 37 | + return 1 |
| 38 | + return 0 |
| 39 | + |
| 40 | + def update(self, rewards, states, actions, function='V'): |
| 41 | + visited = set() |
| 42 | + if self.function == 'V': |
| 43 | + for i, state in enumerate(states): |
| 44 | + state_name = self.get_state_name(state) |
| 45 | + if state_name in visited: |
| 46 | + continue |
| 47 | + G = 0 |
| 48 | + for j, reward in enumerate(rewards[i:], 1): |
| 49 | + G += self.gamma ** j * reward |
| 50 | + self.vreturns[state_name].append(G) |
| 51 | + self.values[state_name] = np.mean(self.vreturns[state_name]) |
| 52 | + visited.add(state_name) |
| 53 | + elif self.function == 'Q': |
| 54 | + for i, (state, action) in enumerate(zip(states, actions)): |
| 55 | + state_action_name = self.get_state_action_name(state, action) |
| 56 | + if state_action_name in visited: |
| 57 | + continue |
| 58 | + G = 0 |
| 59 | + for j, reward in enumerate(rewards[i:], 1): |
| 60 | + G += self.gamma ** j * reward |
| 61 | + self.qreturns[state_action_name].append(G) |
| 62 | + self.qs[state_action_name] = np.mean(self.qreturns[state_action_name]) |
| 63 | + visited.add(state_action_name) |
| 64 | + for state in states: |
| 65 | + Q_prime, A_prime = -np.inf, None |
| 66 | + for action in actions: |
| 67 | + state_action_name = self.get_state_action_name(state, action) |
| 68 | + curr_Q = self.qs[state_action_name] |
| 69 | + if curr_Q > Q_prime: |
| 70 | + Q_prime = curr_Q |
| 71 | + A_prime = action |
| 72 | + state_name = self.get_state_name(state) |
| 73 | + self.policy[state_name] = A_prime |
| 74 | + else: |
| 75 | + raise NotImplementedError |
| 76 | + |
| 77 | + |
| 78 | +class CartPoleNoob: |
| 79 | + def __init__(self, method, env, function='V', alpha=0.1, gamma=0.99, epsilon=0.1, n_bins=10): |
| 80 | + self.method = method |
| 81 | + self.alpha = alpha |
| 82 | + self.gamma = gamma |
| 83 | + self.epsilon = epsilon |
| 84 | + self.function = function |
| 85 | + self.actions = list(range(env.action_space.n)) |
| 86 | + self.rad = np.linspace(-0.2094, 0.2094, n_bins) |
| 87 | + self.values = {r: 0 for r in range(len(self.rad) + 1)} |
| 88 | + self.qs = {(r, a): 10 for r in range(len(self.rad) + 1) for a in self.actions} |
| 89 | + |
| 90 | + def choose_action(self, state): |
| 91 | + if self.method == 'naive': |
| 92 | + return self.naive_action(state) |
| 93 | + if self.method == 'egreedy': |
| 94 | + return self.epsilon_greedy(state) |
| 95 | + |
| 96 | + def naive_action(self, state): |
| 97 | + if state[2] < 0: |
| 98 | + return 0 |
| 99 | + return 1 |
| 100 | + |
| 101 | + def epsilon_greedy(self, state): |
| 102 | + if np.random.random() < self.epsilon: |
| 103 | + return np.random.choice(self.actions) |
| 104 | + else: |
| 105 | + s = self.get_bucket_index([state[2]])[0] |
| 106 | + action = np.array([self.qs[(s, a)] for a in self.actions]).argmax() |
| 107 | + return action |
| 108 | + |
| 109 | + def get_bucket_index(self, states): |
| 110 | + inds = np.digitize(states, self.rad) |
| 111 | + return inds |
| 112 | + |
| 113 | + def update(self, state, action, reward, state_): |
| 114 | + r, r_ = self.get_bucket_index([state[2], state_[2]]) |
| 115 | + if self.function == 'V': |
| 116 | + # TD update w/ bootstrap |
| 117 | + self.values[r] += self.alpha * (reward + self.gamma * self.values[r_] - self.values[r]) |
| 118 | + elif self.function == 'Q': |
| 119 | + Q_ = np.array([self.qs[(r_, a)] for a in self.actions]).max() |
| 120 | + self.qs[(r, action)] += self.alpha * (reward + self.gamma * Q_ - self.qs[(r, action)]) |
| 121 | + self.decrease_eps() |
| 122 | + |
| 123 | + def decrease_eps(self): |
| 124 | + self.epsilon = max(0.01, self.epsilon - 1e-5) |
0 commit comments