Skip to content

Update to reflect API changes. #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,14 @@ on `using actors`_.
**Exercise 8:** Use ``ray.put`` to avoid serializing and copying the same
object into shared memory multiple times.

**Exercise 9:** Use ``ray.register_class`` to enable Ray to serialize custom
classes.

**Exercise 10:** Parallelize a serial example that uses a neural net to perform
**Exercise 9:** Parallelize a serial example that uses a neural net to perform
rollouts in a gym environment.

**Exercise 11:** Extract neural network weights from an actor on one process,
**Exercise 10:** Extract neural network weights from an actor on one process,
and set them in another actor. You may want to read the documentation on
`using Ray with TensorFlow`_.

**Exercise 12:** Specify that an actor requires some GPUs. For a complete
**Exercise 11:** Specify that an actor requires some GPUs. For a complete
example that does something similar, you may want to see the `ResNet example`_.

.. _`Ray documentation`: http://ray.readthedocs.io/en/latest/?badge=latest
Expand Down
5 changes: 0 additions & 5 deletions examples/block_linear_algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ def __init__(self, shape, block_size=10):
self.block_ids = np.empty(self.num_blocks, dtype=object)


# Note that we need to call ray.register_class so that we can pass BlockMatrix
# objects to remote functions and return them from remote functions.
ray.register_class(BlockMatrix)


# This is a helper function which creates an array of zeros.
@ray.remote
def zeros_helper(shape):
Expand Down
3 changes: 0 additions & 3 deletions examples/evolution_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
# we are currently only starting two workers, so the opportunity for
# parallelism will depend on the number of workers.
#
# EXERCISE: You will probably need to tell Ray how to serialize Config and
# Result objects using "ray.register_class".
#
# EXERCISE: This code uses a large numpy array of noise that is shared between
# the workers. This array is created by the function "create_shared_noise".
# Make "create_shared_noise" a remote function so that the noise object is put
Expand Down
189 changes: 97 additions & 92 deletions exercises/exercise09.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,112 @@
# The goal of this exercise is to develop some intuition for what kinds of
# objects Ray can serialize and deserialize efficiently, and what kinds are
# handled inefficiently.
# The goal of this exercise is to combine a handful of lessons in a single
# example and to get some practice parallelizing serial code. In this exercise,
# we create a neural network and a gym environment and use the network to do
# some rollouts (that is, we use the neural net to choose actions to take in
# the environment). However, all of the rollouts are done serially.
#
# HIGH-LEVEL TAKEAWAY: Whenever possible, use numpy arrays.
#
# You can serialize a Python object and place it in the object store with
# ray.put. You can then retrieve it and deserialize it with ray.get. This
# should work out of the box with all primitive data types (e.g., ints, floats,
# string, lists, tuples, dictionaries, and numpy arrays). However, an extra
# line is needed to handle custom Python objects or more complex objects.
#
# For example, if you define a custom class, then in order to pass it to a
# remote function, you need to call ray.register_class.
#
# class Foo(object):
# def __init__(self, a, b):
# self.a = a
# self.b = b
#
# ray.put(Foo(1, 2)) # This raises an exception.
#
# ray.register_class(Foo) # This tells Ray to serialize objects of type
# # "Foo" by turning them into a dictionary of
# # their fields, e.g., {"a": 1, "b": 2}.
# ray.get(ray.put(Foo(1, 2))) # This should work.
#
# Not everything can be serialized by unpacking its fields into a dictionary,
# so for those objects, we can fall back to pickle. E.g.,
#
# ray.register_class(t, pickle=True) # This tells Ray to serialize objects
# # of type t with pickle (actually we
# # use cloudpickle under the hood).
#
# NOTE: This is one of the uglier parts of the API. We can make more things
# work out of the box by falling back to pickle, but the danger is that small
# changs to application code could cause a change in the way serialization is
# done (from our custom serialization using Apache Arrow to pickle), which
# could lead to a big performance hit without any explanation.
#
# EXERCISE: See if you can make the following code run by calling
# ray.register_class in the appropriate places. NOTE: It is probably simpler to
# play around with this in an ipython interpreter than to repeatedly run this
# script. Also, if you want to drop into an the interpreter in the middle of
# this script, you can add the following lines.
#
# import IPython
# IPython.embed()
# EXERCISE: Change this code to do rollouts in parallel by making an actor that
# creates both the "env" object and the "policy" object in its constructor. The
# "rollout" function should then be a method of the actor class.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import psutil
import ray
import tensorflow as tf
import time

from ray_tutorial.reinforce.env import BatchedEnv
from ray_tutorial.reinforce.policy import ProximalPolicyLoss
from ray_tutorial.reinforce.filter import MeanStdFilter
from ray_tutorial.reinforce.rollout import rollouts, add_advantage_values

from ray_tutorial.reinforce.env import (NoPreprocessor, AtariRamPreprocessor,
AtariPixelPreprocessor)
from ray_tutorial.reinforce.models.fc_net import fc_net
from ray_tutorial.reinforce.models.vision_net import vision_net

config = {"kl_coeff": 0.2,
"num_sgd_iter": 30,
"sgd_stepsize": 5e-5,
"sgd_batchsize": 128,
"entropy_coeff": 0.0,
"clip_param": 0.3,
"kl_target": 0.01,
"timesteps_per_batch": 40000}


if __name__ == "__main__":
ray.init(num_cpus=4, redirect_output=True)

class Foo(object):
def __init__(self, x):
self.x = x

# By default, Ray doesn't know how to serialize Foo objects. Make this work.
result = ray.get(ray.put(Foo(1)))
assert result.x == 1

class Bar(object):
def __init__(self, a, b):
self.a = a
self.b = b
self.c = np.ones((a, b))

class Qux(object):
def __init__(self, a, b):
self.bar = Bar(a, b)

# By default, Ray doesn't know how to serialize Qux objects. Make the line
# below work. NOTE: if Ray falls back to pickling the Qux object, then Ray
# will not be able to efficiently handle the large numpy array inside.
# You can compare the performance difference with and without pickling as
# follows (in ipython).
# For a more interesting example, try this with the following values. Note
# that this will require installing gym with the atari environments. You'll
# probably want to use a smaller batchsize for this.
#
# # Time it without pickle.
# ray.register_class(Qux)
# ray.register_class(Bar)
# q = Qux(1000, 1000)
# %time q_id = ray.put(q)
# %time q_val = ray.get(q_id)
#
# # Time it with pickle.
# ray.register_class(Qux, pickle=True)
# q = Qux(1000, 1000)
# %time q_id = ray.put(q)
# %time q_vl = ray.get(q_id)
#
# Note that the above timing code will only work in the order listed above.
# Once you tell Ray to serialize the class using pickle, it will always use
# pickle.
result = ray.get(ray.put(Qux(1000, 10000)))
assert result.bar.a == 1000
assert result.bar.b == 10000
assert np.allclose(result.bar.c, np.ones((1000, 10000)))

print("Success! The example ran to completion.")
# name = "Pong-v0"
# preprocessor = AtariPixelPreprocessor()

name = "CartPole-v0"
batchsize = 100
preprocessor = NoPreprocessor()
gamma = 0.995
lam = 1.0
horizon = 2000

# Create a simulator environment. This is a wrapper containing a batch of gym
# environments. The simulator can be simulated with "env.step(action)", which
# is called within the "rollouts" function below.
env = BatchedEnv(name, batchsize, preprocessor=preprocessor)

# Create a neural net policy. Note that we create the neural net inside its
# own graph. This can help avoid variable name collisions. It shouldn't
# matter in this example, but if you create a neural net inside of a remote
# function, and multiple tasks execute that remote function on the same
# worker, then this can lead to variable name collisions.
with tf.Graph().as_default():
sess = tf.Session()
if preprocessor.shape is None:
preprocessor.shape = env.observation_space.shape
policy = ProximalPolicyLoss(env.observation_space, env.action_space,
preprocessor, config, sess)
observation_filter = MeanStdFilter(preprocessor.shape, clip=None)
reward_filter = MeanStdFilter((), clip=None)
sess.run(tf.global_variables_initializer())

# Note that directly making this function a remote function will give a
# pickling error. That happens because when we define a remote function, we
# pickle the function definition and ship the definition to the workers.
# However, this function uses "policy", which is a TensorFlow neural net, and
# TensorFlow often cannot be pickled. This could be addressed by constructing
# "policy" within the rollout function, but in this case it's better to
# create an actor that creates the policy in its constructor (so that we can
# reuse the policy between multiple calls to "rollout").
def rollout():
# Collect some rollouts.
trajectory = rollouts(policy, env, horizon, observation_filter,
reward_filter)
add_advantage_values(trajectory, gamma, lam, reward_filter)
return trajectory

# Do some rollouts to make sure that all of the neural nets have been
# constructed. This isn't relevant for the serial code, but when we create
# the neural nets in the background using actors, we don't want the time to
# create the actors to interfere with the timing measurement below. Make sure
# that this code uses all of the actors.
collected_rollouts = [rollout() for _ in range(20)]

start_time = time.time()

# Do some rollouts serially. These should be done in parallel.
collected_rollouts = [rollout() for _ in range(20)]

end_time = time.time()
duration = end_time - start_time

expected_duration = np.ceil(20 / psutil.cpu_count()) * 0.5
assert duration < expected_duration, ("Rollouts took {} seconds. This is "
"too slow.".format(duration))

print("Success! The example took {} seconds.".format(duration))
Loading