Skip to content

[rllib] Improve performance for small rollouts #812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions python/ray/rllib/policy_gradient/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from tensorflow.python import debug as tf_debug

import numpy as np
import ray

from ray.rllib.parallel import LocalSyncParallelOptimizer
Expand All @@ -16,6 +17,7 @@
from ray.rllib.policy_gradient.loss import ProximalPolicyLoss
from ray.rllib.policy_gradient.filter import MeanStdFilter
from ray.rllib.policy_gradient.rollout import rollouts, add_advantage_values
from ray.rllib.policy_gradient.utils import flatten, concatenate

# TODO(pcm): Make sure that both observation_filter and reward_filter
# are correctly handled, i.e. (a) the values are accumulated accross
Expand Down Expand Up @@ -82,8 +84,8 @@ def __init__(
assert config["sgd_batchsize"] % len(devices) == 0, \
"Batch size must be evenly divisible by devices"
if is_remote:
self.batch_size = 1
self.per_device_batch_size = 1
self.batch_size = config["rollout_batchsize"]
self.per_device_batch_size = config["rollout_batchsize"]
else:
self.batch_size = config["sgd_batchsize"]
self.per_device_batch_size = int(self.batch_size / len(devices))
Expand Down Expand Up @@ -148,11 +150,51 @@ def load_weights(self, weights):
self.variables.set_weights(weights)

def compute_trajectory(self, gamma, lam, horizon):
"""Compute a single rollout on the agent and return."""
trajectory = rollouts(
self.common_policy,
self.env, horizon, self.observation_filter, self.reward_filter)
add_advantage_values(trajectory, gamma, lam, self.reward_filter)
return trajectory

def compute_steps(self, gamma, lam, horizon, min_steps_per_task=-1):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to define this method in terms of compute_trajectory?

"""Compute multiple rollouts and concatenate the results.

Args:
gamma: MDP discount factor
lam: GAE(lambda) parameter
horizon: Number of steps after which a rollout gets cut
min_steps_per_task: Lower bound on the number of states to be
collected.

Returns:
states: List of states.
total_rewards: Total rewards of the trajectories.
trajectory_lengths: Lengths of the trajectories.
"""
num_steps_so_far = 0
trajectories = []
total_rewards = []
trajectory_lengths = []
while True:
trajectory = self.compute_trajectory(gamma, lam, horizon)
total_rewards.append(
trajectory["raw_rewards"].sum(axis=0).mean())
trajectory_lengths.append(
np.logical_not(trajectory["dones"]).sum(axis=0).mean())
trajectory = flatten(trajectory)
not_done = np.logical_not(trajectory["dones"])
# Filtering out states that are done. We do this because
# trajectories are batched and cut only if all the trajectories
# in the batch terminated, so we can potentially get rid of
# some of the states here.
trajectory = {key: val[not_done]
for key, val in trajectory.items()}
num_steps_so_far += trajectory["raw_rewards"].shape[0]
trajectories.append(trajectory)
if num_steps_so_far >= min_steps_per_task:
break
return concatenate(trajectories), total_rewards, trajectory_lengths


RemoteAgent = ray.remote(Agent)
34 changes: 31 additions & 3 deletions python/ray/rllib/policy_gradient/policy_gradient.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@


DEFAULT_CONFIG = {
# Discount factor of the MDP
"gamma": 0.995,
# Number of steps after which the rollout gets cut
"horizon": 2000,
# GAE(lambda) parameter
"lambda": 1.0,
# Initial coefficient for KL divergence
"kl_coeff": 0.2,
# Number of SGD iterations in each outer loop
"num_sgd_iter": 30,
# Number of outer loop iterations
"max_iterations": 1000,
# Stepsize of SGD
"sgd_stepsize": 5e-5,
# TODO(pcm): Expose the choice between gpus and cpus
# as a command line argument.
Expand All @@ -31,17 +40,34 @@
"log_device_placement": False,
"allow_soft_placement": True,
},
"sgd_batchsize": 128, # total size across all devices
# Batch size for policy evaluations for rollouts
"rollout_batchsize": 1,
# Total SGD batch size across all devices for SGD
"sgd_batchsize": 128,
# Coefficient of the entropy regularizer
"entropy_coeff": 0.0,
# PPO clip parameter
"clip_param": 0.3,
# Target value for KL divergence
"kl_target": 0.01,
"model": {"free_logstd": False},
# Number of timesteps collected in each outer loop
"timesteps_per_batch": 40000,
# Each tasks performs rollouts until at least this
# number of steps is obtained
"min_steps_per_task": 1000,
# Number of actors used to collect the rollouts
"num_agents": 5,
# Dump TensorFlow timeline after this many SGD minibatches
"full_trace_nth_sgd_batch": -1,
# Whether to profile data loading
"full_trace_data_load": False,
# If this is True, the TensorFlow debugger is invoked if an Inf or NaN
# is detected
"use_tf_debugger": False,
"write_logs": True, # write checkpoints and tensorflow logging?
# If True, we write checkpoints and tensorflow logging
"write_logs": True,
# Name of the model checkpoint file
"model_checkpoint_file": "iteration-%s.ckpt"}


Expand Down Expand Up @@ -79,6 +105,7 @@ def __init__(self, env_name, config, upload_dir=None):
self.env_name, 1, self.preprocessor, self.config,
self.logdir, True)
for _ in range(config["num_agents"])]
self.start_time = time.time()

def train(self):
agents = self.agents
Expand Down Expand Up @@ -108,7 +135,7 @@ def train(self):
weights = ray.put(model.get_weights())
[a.load_weights.remote(weights) for a in agents]
trajectory, total_reward, traj_len_mean = collect_samples(
agents, config["timesteps_per_batch"], config["gamma"], 1.0, 2000)
agents, config)
print("total reward is ", total_reward)
print("trajectory length mean is ", traj_len_mean)
print("timesteps:", trajectory["dones"].shape[0])
Expand Down Expand Up @@ -213,6 +240,7 @@ def train(self):
print("load time:", load_time)
print("sgd time:", sgd_time)
print("sgd examples/s:", len(trajectory["observations"]) / sgd_time)
print("total time so far:", time.time() - self.start_time)

result = TrainingResult(
self.experiment_id.hex, j, total_reward, traj_len_mean, info)
Expand Down
40 changes: 17 additions & 23 deletions python/ray/rllib/policy_gradient/rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import ray

from ray.rllib.policy_gradient.filter import NoFilter
from ray.rllib.policy_gradient.utils import flatten, concatenate
from ray.rllib.policy_gradient.utils import concatenate


def rollouts(policy, env, horizon, observation_filter=NoFilter(),
Expand Down Expand Up @@ -72,42 +72,36 @@ def add_advantage_values(trajectory, gamma, lam, reward_filter):
trajectory["advantages"] = advantages


@ray.remote
def compute_trajectory(policy, env, gamma, lam, horizon, observation_filter,
reward_filter):
trajectory = rollouts(policy, env, horizon, observation_filter,
reward_filter)
add_advantage_values(trajectory, gamma, lam, reward_filter)
return trajectory


def collect_samples(agents, num_timesteps, gamma, lam, horizon,
observation_filter=NoFilter(), reward_filter=NoFilter()):
def collect_samples(agents,
config,
observation_filter=NoFilter(),
reward_filter=NoFilter()):
num_timesteps_so_far = 0
trajectories = []
total_rewards = []
traj_len_means = []
trajectory_lengths = []
# This variable maps the object IDs of trajectories that are currently
# computed to the agent that they are computed on; we start some initial
# tasks here.
agent_dict = {agent.compute_trajectory.remote(gamma, lam, horizon):
agent_dict = {agent.compute_steps.remote(
config["gamma"], config["lambda"],
config["horizon"], config["min_steps_per_task"]):
agent for agent in agents}
while num_timesteps_so_far < num_timesteps:
while num_timesteps_so_far < config["timesteps_per_batch"]:
# TODO(pcm): Make wait support arbitrary iterators and remove the
# conversion to list here.
[next_trajectory], waiting_trajectories = ray.wait(
list(agent_dict.keys()))
agent = agent_dict.pop(next_trajectory)
# Start task with next trajectory and record it in the dictionary.
agent_dict[agent.compute_trajectory.remote(gamma, lam, horizon)] = (
agent_dict[agent.compute_steps.remote(
config["gamma"], config["lambda"],
config["horizon"], config["min_steps_per_task"])] = (
agent)
trajectory = flatten(ray.get(next_trajectory))
not_done = np.logical_not(trajectory["dones"])
total_rewards.append(
trajectory["raw_rewards"][not_done].sum(axis=0).mean())
traj_len_means.append(not_done.sum(axis=0).mean())
trajectory = {key: val[not_done] for key, val in trajectory.items()}
trajectory, rewards, lengths = ray.get(next_trajectory)
total_rewards.extend(rewards)
trajectory_lengths.extend(lengths)
num_timesteps_so_far += len(trajectory["dones"])
trajectories.append(trajectory)
return (concatenate(trajectories), np.mean(total_rewards),
np.mean(traj_len_means))
np.mean(trajectory_lengths))