Skip to content

Commit

Permalink
[rllib] A3C Refactoring (ray-project#1166)
Browse files Browse the repository at this point in the history
* fixing policy

* Compute Action is singular, fixed weird issue with arrays

* remove vestige

* extraneous ipdb

* Can Drop in Pytorch Model

* lint

* naming

* finish comments
  • Loading branch information
richardliaw authored Oct 29, 2017
1 parent 4cace09 commit dc66a2d
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 344 deletions.
4 changes: 2 additions & 2 deletions doc/source/example-a3c.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ We use a Ray Actor to simulate the environment.
self.policy.set_weights(params)
rollout = self.pull_batch_from_queue()
batch = process_rollout(rollout, gamma=0.99, lambda_=1.0)
gradient = self.policy.get_gradients(batch)
gradient = self.policy.compute_gradients(batch)
info = {"id": self.id,
"size": len(batch.a)}
return gradient, info
Expand Down Expand Up @@ -138,7 +138,7 @@ global model parameters. The main training script looks like the following.
obs += info["size"]
# apply update, get the weights from the model, start a new task on the same actor object
policy.model_update(gradient)
policy.apply_gradients(gradient)
parameters = policy.get_weights()
gradient_list.extend([agents[info["id"]].compute_gradient(parameters)])
return policy
Expand Down
83 changes: 8 additions & 75 deletions python/ray/rllib/a3c/a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

import numpy as np
import pickle
import tensorflow as tf
import six.moves.queue as queue
import os

import ray
from ray.rllib.agent import Agent
from ray.rllib.a3c.runner import RunnerThread, process_rollout
from ray.rllib.a3c.envs import create_and_wrap
from ray.rllib.a3c.runner import RemoteRunner
from ray.rllib.a3c.shared_model import SharedModel
from ray.rllib.a3c.shared_model_lstm import SharedModelLSTM
from ray.tune.result import TrainingResult
Expand All @@ -24,76 +22,11 @@
"use_lstm": True,
"model": {"grayscale": True,
"zero_mean": False,
"dim": 42}
"dim": 42,
"channel_major": True}
}


@ray.remote
class Runner(object):
"""Actor object to start running simulation on workers.
The gradient computation is also executed from this object.
"""
def __init__(self, env_creator, policy_cls, actor_id, batch_size,
preprocess_config, logdir):
env = create_and_wrap(env_creator, preprocess_config)
self.id = actor_id
# TODO(rliaw): should change this to be just env.observation_space
self.policy = policy_cls(env.observation_space.shape, env.action_space)
self.runner = RunnerThread(env, self.policy, batch_size)
self.env = env
self.logdir = logdir
self.start()

def pull_batch_from_queue(self):
"""Take a rollout from the queue of the thread runner."""
rollout = self.runner.queue.get(timeout=600.0)
if isinstance(rollout, BaseException):
raise rollout
while not rollout.terminal:
try:
part = self.runner.queue.get_nowait()
if isinstance(part, BaseException):
raise rollout
rollout.extend(part)
except queue.Empty:
break
return rollout

def get_completed_rollout_metrics(self):
"""Returns metrics on previously completed rollouts.
Calling this clears the queue of completed rollout metrics.
"""
completed = []
while True:
try:
completed.append(self.runner.metrics_queue.get_nowait())
except queue.Empty:
break
return completed

def start(self):
summary_writer = tf.summary.FileWriter(
os.path.join(self.logdir, "agent_%d" % self.id))
self.summary_writer = summary_writer
self.runner.start_runner(self.policy.sess, summary_writer)

def compute_gradient(self, params):
self.policy.set_weights(params)
rollout = self.pull_batch_from_queue()
batch = process_rollout(rollout, gamma=0.99, lambda_=1.0)
gradient, info = self.policy.get_gradients(batch)
if "summary" in info:
self.summary_writer.add_summary(
tf.Summary.FromString(info['summary']),
self.policy.local_steps)
self.summary_writer.flush()
info = {"id": self.id,
"size": len(batch.a)}
return gradient, info


class A3CAgent(Agent):
_agent_name = "A3C"
_default_config = DEFAULT_CONFIG
Expand All @@ -107,9 +40,9 @@ def _init(self):
self.policy = policy_cls(
self.env.observation_space.shape, self.env.action_space)
self.agents = [
Runner.remote(self.env_creator, policy_cls, i,
self.config["batch_size"],
self.config["model"], self.logdir)
RemoteRunner.remote(self.env_creator, policy_cls, i,
self.config["batch_size"],
self.config["model"], self.logdir)
for i in range(self.config["num_workers"])]
self.parameters = self.policy.get_weights()

Expand All @@ -122,7 +55,7 @@ def _train(self):
while gradient_list:
done_id, gradient_list = ray.wait(gradient_list)
gradient, info = ray.get(done_id)[0]
self.policy.model_update(gradient)
self.policy.apply_gradients(gradient)
self.parameters = self.policy.get_weights()
if batches_so_far < max_batches:
batches_so_far += 1
Expand Down Expand Up @@ -168,5 +101,5 @@ def _restore(self, checkpoint_path):
self.policy.set_weights(self.parameters)

def compute_action(self, observation):
actions = self.policy.compute_actions(observation)
actions = self.policy.compute_action(observation)
return actions[0]
37 changes: 37 additions & 0 deletions python/ray/rllib/a3c/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import scipy.signal
from collections import namedtuple


def discount(x, gamma):
return scipy.signal.lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1]


def process_rollout(rollout, gamma, lambda_=1.0):
"""Given a rollout, compute its returns and the advantage."""
batch_si = np.asarray(rollout.states)
batch_a = np.asarray(rollout.actions)
rewards = np.asarray(rollout.rewards)
vpred_t = np.asarray(rollout.values + [rollout.r])

rewards_plus_v = np.asarray(rollout.rewards + [rollout.r])
batch_r = discount(rewards_plus_v, gamma)[:-1]
delta_t = rewards + gamma * vpred_t[1:] - vpred_t[:-1]
# This formula for the advantage comes "Generalized Advantage Estimation":
# https://arxiv.org/abs/1506.02438
batch_adv = discount(delta_t, gamma * lambda_)

features = rollout.features[0]
return Batch(batch_si, batch_a, batch_adv, batch_r, rollout.terminal,
features)


Batch = namedtuple(
"Batch", ["si", "a", "adv", "r", "terminal", "features"])

CompletedRollout = namedtuple(
"CompletedRollout", ["episode_length", "episode_reward"])
84 changes: 7 additions & 77 deletions python/ray/rllib/a3c/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,99 +2,29 @@
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import ray
import gym


class Policy(object):
"""The policy base class."""
def __init__(self, ob_space, action_space, name="local", summarize=True):
self.local_steps = 0
self.summarize = summarize
worker_device = "/job:localhost/replica:0/task:0/cpu:0"
self.g = tf.Graph()
with self.g.as_default(), tf.device(worker_device):
with tf.variable_scope(name):
self.setup_graph(ob_space, action_space)
assert all([hasattr(self, attr)
for attr in ["vf", "logits", "x", "var_list"]])
print("Setting up loss")
self.setup_loss(action_space)
self.setup_gradients()
self.initialize()
pass

def setup_graph(self):
def apply_gradients(self, grads):
raise NotImplementedError

def setup_loss(self, action_space):
if isinstance(action_space, gym.spaces.Box):
ac_size = action_space.shape[0]
self.ac = tf.placeholder(tf.float32, [None, ac_size], name="ac")
elif isinstance(action_space, gym.spaces.Discrete):
self.ac = tf.placeholder(tf.int64, [None], name="ac")
else:
raise NotImplemented(
"action space" + str(type(action_space)) +
"currently not supported")
self.adv = tf.placeholder(tf.float32, [None], name="adv")
self.r = tf.placeholder(tf.float32, [None], name="r")

log_prob = self.curr_dist.logp(self.ac)

# The "policy gradients" loss: its derivative is precisely the policy
# gradient. Notice that self.ac is a placeholder that is provided
# externally. adv will contain the advantages, as calculated in
# process_rollout.
self.pi_loss = - tf.reduce_sum(log_prob * self.adv)

delta = self.vf - self.r
self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta))
self.entropy = tf.reduce_sum(self.curr_dist.entropy())
self.loss = self.pi_loss + 0.5 * self.vf_loss - self.entropy * 0.01

def setup_gradients(self):
grads = tf.gradients(self.loss, self.var_list)
self.grads, _ = tf.clip_by_global_norm(grads, 40.0)
grads_and_vars = list(zip(self.grads, self.var_list))
opt = tf.train.AdamOptimizer(1e-4)
self._apply_gradients = opt.apply_gradients(grads_and_vars)

def initialize(self):
if self.summarize:
bs = tf.to_float(tf.shape(self.x)[0])
tf.summary.scalar("model/policy_loss", self.pi_loss / bs)
tf.summary.scalar("model/value_loss", self.vf_loss / bs)
tf.summary.scalar("model/entropy", self.entropy / bs)
tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads))
tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list))
self.summary_op = tf.summary.merge_all()

self.sess = tf.Session(graph=self.g, config=tf.ConfigProto(
intra_op_parallelism_threads=1, inter_op_parallelism_threads=2))
self.variables = ray.experimental.TensorFlowVariables(self.loss,
self.sess)
self.sess.run(tf.global_variables_initializer())

def model_update(self, grads):
feed_dict = {self.grads[i]: grads[i]
for i in range(len(grads))}
self.sess.run(self._apply_gradients, feed_dict=feed_dict)

def get_weights(self):
weights = self.variables.get_weights()
return weights
raise NotImplementedError

def set_weights(self, weights):
self.variables.set_weights(weights)
raise NotImplementedError

def get_gradients(self, batch):
def compute_gradients(self, batch):
raise NotImplementedError

def get_vf_loss(self):
raise NotImplementedError

def compute_actions(self, observations):
def compute_action(self, observations):
"""Compute action for a _single_ observation"""
raise NotImplementedError

def value(self, ob):
Expand Down
Loading

0 comments on commit dc66a2d

Please sign in to comment.