|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +@created on: 9/21/19, |
| 4 | +@author: Shreesha N, |
| 5 | +@version: v0.0.1 |
| 6 | +@system name: badgod |
| 7 | +Description: |
| 8 | +
|
| 9 | +..todo:: |
| 10 | +
|
| 11 | +""" |
| 12 | +import numpy as np |
| 13 | +import random |
| 14 | +from collections import defaultdict |
| 15 | + |
| 16 | +""" |
| 17 | + Monte-Carlo |
| 18 | + In this problem, we will implememnt an AI player for Blackjack. |
| 19 | + The main goal of this problem is to get familar with Monte-Carlo algorithm. |
| 20 | + We can test the correctness of our code |
| 21 | + by typing 'nosetests -v mc_test.py' in the terminal. |
| 22 | +""" |
| 23 | + |
| 24 | + |
| 25 | +def initial_policy(observation): |
| 26 | + """A policy that sticks if the player score is >= 20 and his otherwise |
| 27 | +
|
| 28 | + Parameters: |
| 29 | + ----------- |
| 30 | + observation: |
| 31 | + Returns: |
| 32 | + -------- |
| 33 | + action: 0 or 1 |
| 34 | + 0: STICK |
| 35 | + 1: HIT |
| 36 | + """ |
| 37 | + return 0 if observation[0] >= 20 else 1 |
| 38 | + |
| 39 | + |
| 40 | +def play_step(env, action_to_take): |
| 41 | + """ |
| 42 | + Given the action to be taken, plays a step in the environment and returns the new set of values |
| 43 | +
|
| 44 | + :param env: function |
| 45 | + OpenAI gym environment. |
| 46 | + :param action_to_take: int |
| 47 | + Action index to be taken for the current step. |
| 48 | +
|
| 49 | + :return: next_state: 3-tuple |
| 50 | + (Player's sum, Dealer's sum, Boolean indicating if the player has ACE). |
| 51 | + :return: reward: int |
| 52 | + Reward received for choosing the given action |
| 53 | + :return: done: boolean |
| 54 | + Boolean indicating if the state is a terminal or not. |
| 55 | + """ |
| 56 | + next_state, reward, done, info = env.step(action_to_take) |
| 57 | + return next_state, reward, done |
| 58 | + |
| 59 | + |
| 60 | +def get_random_episode(env, policy): |
| 61 | + """ |
| 62 | + Generates a list having episodes. Each episode in this list is generated until a terminal state is reached. |
| 63 | + :param env: function |
| 64 | + OpenAI gym environment. |
| 65 | + :param policy: function |
| 66 | + The policy to be followed while choosing an action. |
| 67 | + :return: list |
| 68 | + List of generated episodes |
| 69 | + """ |
| 70 | + new_set_of_episodes = [] |
| 71 | + current_state = env.reset() |
| 72 | + while True: |
| 73 | + action_to_take = policy(current_state) |
| 74 | + next_state, reward, done = play_step(env, action_to_take) |
| 75 | + new_set_of_episodes.append((current_state, action_to_take, reward)) |
| 76 | + if done: |
| 77 | + break |
| 78 | + current_state = next_state |
| 79 | + return new_set_of_episodes |
| 80 | + |
| 81 | + |
| 82 | +def mc_prediction(policy, env, n_episodes, gamma=1.0): |
| 83 | + """ |
| 84 | + Given policy using sampling to calculate the value function |
| 85 | + by using Monte Carlo first visit algorithm. |
| 86 | +
|
| 87 | + :param policy: function |
| 88 | + A function that maps an observation to action probabilities |
| 89 | + :param env: function |
| 90 | + OpenAI gym environment |
| 91 | + :param n_episodes: int |
| 92 | + Number of episodes to sample |
| 93 | + :param gamma: float |
| 94 | + Gamma discount factor |
| 95 | + :return V: defaultdict(float) |
| 96 | + A dictionary that maps from state to value |
| 97 | + """ |
| 98 | + |
| 99 | + # initialize empty dictionaries |
| 100 | + returns_sum = defaultdict(float) |
| 101 | + returns_count = defaultdict(float) |
| 102 | + # a nested dictionary that maps state -> value |
| 103 | + V = defaultdict(float) |
| 104 | + for i in range(n_episodes): |
| 105 | + new_set_of_episodes = get_random_episode(env, policy) |
| 106 | + states_set = set([episode[0] for episode in new_set_of_episodes]) |
| 107 | + for i, state in enumerate(states_set): |
| 108 | + first_occurance = next(i for i, x in enumerate(new_set_of_episodes) if x[0] == state) |
| 109 | + total_reward = sum([(int(gamma) ** power) * episode[2] for power, episode in |
| 110 | + enumerate(new_set_of_episodes[first_occurance:])]) |
| 111 | + returns_sum[state] += total_reward |
| 112 | + returns_count[state] += 1 |
| 113 | + V[state] = returns_sum[state] / returns_count[state] |
| 114 | + return V |
| 115 | + |
| 116 | + |
| 117 | +def epsilon_greedy(Q, state, nA, epsilon=0.1): |
| 118 | + """ |
| 119 | + A epsilon-greedy method to generate random action based on Q state |
| 120 | + :param Q: dict() |
| 121 | + A dictionary that maps from state -> action-values, |
| 122 | + where Q[s][a] is the estimated action value corresponding to state s and action a. |
| 123 | + :param state: int |
| 124 | + current state |
| 125 | + :param nA: int |
| 126 | + Number of actions in the environment |
| 127 | + :param epsilon: float |
| 128 | + The probability to select a random action, range between 0 and 1 |
| 129 | + :return: int |
| 130 | + action based current state |
| 131 | +
|
| 132 | + Hints: |
| 133 | + ------ |
| 134 | + With probability (1 − epsilon) choose the greedy action. |
| 135 | + With probability epsilon choose an action at random. |
| 136 | + """ |
| 137 | + A = np.ones(nA) * epsilon / float(nA) |
| 138 | + best_action = np.argmax(Q[state]) |
| 139 | + A[best_action] += (1.0 - epsilon) |
| 140 | + return np.random.choice(np.arange(len(A)), p=A) |
| 141 | + |
| 142 | + |
| 143 | +def generate_random_episode_greedy(Q, nA, epsilon, env): |
| 144 | + """ |
| 145 | + Generate episodes using epsilon greedy action chooser method. |
| 146 | +
|
| 147 | + :param Q: dict() |
| 148 | + A dictionary that maps from state -> action-values, |
| 149 | + where Q[s][a] is the estimated action value corresponding to state s and action a. |
| 150 | + :param nA: int |
| 151 | + Number of actions in the environment |
| 152 | + :param epsilon: float |
| 153 | + The probability to select a random action, range between 0 and 1 |
| 154 | + :param env: function |
| 155 | + OpenAI gym environment |
| 156 | + :return: list |
| 157 | + List of generated episodes |
| 158 | + """ |
| 159 | + new_set_of_episodes = [] |
| 160 | + current_state = env.reset() |
| 161 | + while True: |
| 162 | + action_to_take = epsilon_greedy(Q, current_state, nA, epsilon) |
| 163 | + next_state, reward, done = play_step(env, action_to_take) |
| 164 | + new_set_of_episodes.append((current_state, action_to_take, reward)) |
| 165 | + if done: |
| 166 | + break |
| 167 | + current_state = next_state |
| 168 | + return new_set_of_episodes |
| 169 | + |
| 170 | + |
| 171 | +def mc_control_epsilon_greedy(env, n_episodes, gamma=1.0, epsilon=0.1): |
| 172 | + """ |
| 173 | + Monte Carlo control with exploring starts. |
| 174 | + Find an optimal epsilon-greedy policy. |
| 175 | +
|
| 176 | + :param env: function |
| 177 | + OpenAI gym environment |
| 178 | + :param n_episodes: int |
| 179 | + Number of episodes to sample |
| 180 | + :param gamma: float |
| 181 | + Gamma discount factor |
| 182 | + :param epsilon: float |
| 183 | + The probability to select a random action, range between 0 and 1 |
| 184 | + :return: Q: dict() |
| 185 | + A dictionary that maps from state -> action-values, |
| 186 | + where Q[s][a] is the estimated action value corresponding to state s and action a. |
| 187 | +
|
| 188 | + Hint: |
| 189 | + ----- |
| 190 | + You could consider decaying epsilon, i.e. epsilon = epsilon-(0.1/n_episodes) during each episode |
| 191 | + and episode must > 0. |
| 192 | +
|
| 193 | + """ |
| 194 | + returns_sum = defaultdict(float) |
| 195 | + returns_count = defaultdict(float) |
| 196 | + # a nested dictionary that maps state -> (action -> action-value) |
| 197 | + Q = defaultdict(lambda: np.zeros(env.action_space.n)) |
| 198 | + |
| 199 | + def decay(epsilon): |
| 200 | + return epsilon - (0.1 / n_episodes) |
| 201 | + |
| 202 | + for i in range(n_episodes): |
| 203 | + new_set_of_episodes = generate_random_episode_greedy(Q, env.action_space.n, epsilon, env) |
| 204 | + epsilon = decay(epsilon) |
| 205 | + for i in range(len(new_set_of_episodes)): |
| 206 | + state_action_pair = (new_set_of_episodes[i][0], new_set_of_episodes[i][1]) |
| 207 | + first_occurance = next( |
| 208 | + i for i, episode in enumerate(new_set_of_episodes) if (episode[0], episode[1]) == state_action_pair) |
| 209 | + g = sum([(int(gamma) ** power) * episode[2] for power, episode in |
| 210 | + enumerate(new_set_of_episodes[first_occurance:])]) |
| 211 | + returns_sum[state_action_pair] += g |
| 212 | + returns_count[state_action_pair] += 1 |
| 213 | + Q[state_action_pair[0]][state_action_pair[1]] = returns_sum[state_action_pair] / returns_count[ |
| 214 | + state_action_pair] |
| 215 | + return Q |
0 commit comments