Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rllib] Rename PolicyEvaluator => RolloutWorker #4820

Merged
merged 26 commits into from
Jun 2, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix test
  • Loading branch information
ericl committed May 24, 2019
commit e4224d8967faa4124563944a226cc4fb84f594c0
4 changes: 2 additions & 2 deletions doc/source/rllib-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ Most interaction with deep learning frameworks is isolated to the `Policy interf
Policy Evaluation
-----------------

Given an environment and policy, policy evaluation produces `batches <https://github.com/ray-project/ray/blob/master/python/ray/rllib/policy/sample_batch.py>`__ of experiences. This is your classic "environment interaction loop". Efficient policy evaluation can be burdensome to get right, especially when leveraging vectorization, RNNs, or when operating in a multi-agent environment. RLlib provides a `PolicyEvaluator <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/policy_evaluator.py>`__ class that manages all of this, and this class is used in most RLlib algorithms.
Given an environment and policy, policy evaluation produces `batches <https://github.com/ray-project/ray/blob/master/python/ray/rllib/policy/sample_batch.py>`__ of experiences. This is your classic "environment interaction loop". Efficient policy evaluation can be burdensome to get right, especially when leveraging vectorization, RNNs, or when operating in a multi-agent environment. RLlib provides a `RolloutWorker <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/rollout_worker.py>`__ class that manages all of this, and this class is used in most RLlib algorithms.

You can use policy evaluation standalone to produce batches of experiences. This can be done by calling ``ev.sample()`` on an evaluator instance, or ``ev.sample.remote()`` in parallel on evaluator instances created as Ray actors (see ``PolicyEvaluator.as_remote()``).
You can use rollout workers standalone to produce batches of experiences. This can be done by calling ``worker.sample()`` on an worker instance, or ``worker.sample.remote()`` in parallel on worker instances created as Ray actors (see `WorkerSet <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/worker_set.py>`__).

Here is an example of creating a set of policy evaluation actors and using the to gather experiences in parallel. The trajectories are concatenated, the policy learns on the trajectory batch, and then we broadcast the policy weights to the evaluators for the next round of rollouts:
ericl marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
10 changes: 3 additions & 7 deletions python/ray/rllib/evaluation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ def get_learner_stats(grad_info):


@DeveloperAPI
def collect_metrics(local_worker=None,
remote_workers=[],
timeout_seconds=180):
def collect_metrics(local_worker=None, remote_workers=[], timeout_seconds=180):
"""Gathers episode metrics from RolloutWorker instances."""

episodes, num_dropped = collect_episodes(
Expand All @@ -51,15 +49,13 @@ def collect_metrics(local_worker=None,


@DeveloperAPI
def collect_episodes(local_worker=None,
remote_workers=[],
def collect_episodes(local_worker=None, remote_workers=[],
timeout_seconds=180):
"""Gathers new episodes metrics tuples from the given evaluators."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix


if remote_workers:
pending = [
a.apply.remote(lambda ev: ev.get_metrics())
for a in remote_workers
a.apply.remote(lambda ev: ev.get_metrics()) for a in remote_workers
]
collected, _ = ray.wait(
pending, num_returns=len(pending), timeout=timeout_seconds * 1.0)
Expand Down
50 changes: 23 additions & 27 deletions python/ray/rllib/tests/test_optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy
from ray.rllib.evaluation import SampleBatch
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.optimizers import AsyncGradientsOptimizer, AsyncSamplesOptimizer
from ray.rllib.optimizers.aso_tree_aggregator import TreeAggregator
from ray.rllib.tests.mock_evaluator import _MockEvaluator
ericl marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -29,9 +30,8 @@ def testBasic(self):
local = _MockEvaluator()
remotes = ray.remote(_MockEvaluator)
remote_workers = [remotes.remote() for i in range(5)]
workers = WorkerSet._from_existing(local, remotes)
test_optimizer = AsyncGradientsOptimizer(
workers, grads_per_step=10)
workers = WorkerSet._from_existing(local, remote_workers)
test_optimizer = AsyncGradientsOptimizer(workers, grads_per_step=10)
test_optimizer.step()
self.assertTrue(all(local.get_weights() == 0))

Expand Down Expand Up @@ -118,30 +118,28 @@ def setUpClass(cls):

def testSimple(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(local, remotes)
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(workers)
self._wait_for(optimizer, 1000, 1000)

def testMultiGPU(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(
local, remotes, num_gpus=2, _fake_gpus=True)
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(workers, num_gpus=2, _fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)

def testMultiGPUParallelLoad(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
local,
remotes,
num_gpus=2,
num_data_loader_buffers=2,
_fake_gpus=True)
workers, num_gpus=2, num_data_loader_buffers=2, _fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)

def testMultiplePasses(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
local,
remotes,
workers,
minibatch_buffer_size=10,
num_sgd_iter=10,
sample_batch_size=10,
Expand All @@ -152,9 +150,9 @@ def testMultiplePasses(self):

def testReplay(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
local,
remotes,
workers,
replay_buffer_num_slots=100,
replay_proportion=10,
sample_batch_size=10,
Expand All @@ -169,9 +167,9 @@ def testReplay(self):

def testReplayAndMultiplePasses(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
local,
remotes,
workers,
minibatch_buffer_size=10,
num_sgd_iter=10,
replay_buffer_num_slots=100,
Expand All @@ -190,45 +188,43 @@ def testReplayAndMultiplePasses(self):

def testMultiTierAggregationBadConf(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
aggregators = TreeAggregator.precreate_aggregators(4)
optimizer = AsyncSamplesOptimizer(
local, remotes, num_aggregation_workers=4)
optimizer = AsyncSamplesOptimizer(workers, num_aggregation_workers=4)
self.assertRaises(ValueError,
lambda: optimizer.aggregator.init(aggregators))

def testMultiTierAggregation(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
aggregators = TreeAggregator.precreate_aggregators(1)
optimizer = AsyncSamplesOptimizer(
local, remotes, num_aggregation_workers=1)
optimizer = AsyncSamplesOptimizer(workers, num_aggregation_workers=1)
optimizer.aggregator.init(aggregators)
self._wait_for(optimizer, 1000, 1000)

def testRejectBadConfigs(self):
local, remotes = self._make_evs()
workers = WorkerSet._from_existing(local, remotes)
self.assertRaises(
ValueError, lambda: AsyncSamplesOptimizer(
local, remotes,
num_data_loader_buffers=2, minibatch_buffer_size=4))
optimizer = AsyncSamplesOptimizer(
local,
remotes,
workers,
num_gpus=2,
train_batch_size=100,
sample_batch_size=50,
_fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)
optimizer = AsyncSamplesOptimizer(
local,
remotes,
workers,
num_gpus=2,
train_batch_size=100,
sample_batch_size=25,
_fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)
optimizer = AsyncSamplesOptimizer(
local,
remotes,
workers,
num_gpus=2,
train_batch_size=100,
sample_batch_size=74,
Expand Down