Skip to content

Add Trajectory and Policy Queues #3113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jan 3, 2020
Merged

Add Trajectory and Policy Queues #3113

merged 30 commits into from
Jan 3, 2020

Conversation

ervteng
Copy link
Contributor

@ervteng ervteng commented Dec 21, 2019

This PR adds Trajectory and Policy Queues between the env_manager, AgentProcessor, and Trainers. Getting one step closer to decoupling the environment stepping and the trainer training.

However there are still two points of coupling that remain:

  • Summary and model writing. When the summaries are written is driven by the TrainerController currently - the TC tells the Trainers when to write out to Tensorboard. It would be better to have the trainer write out when appropriate during the trainer.advance() call -> Addressed in PR: Move stepping logic into advance() function #3124

  • Curriculum. Somehow, the TC still needs to know what the individual trainers' rewards are to update reset parameters in the environment. I think we might be able to get these stats from the AgentProcessor instead. - Postponed to future PR

@@ -134,8 +133,8 @@ def add_experiences(
agent_id=agent_id,
next_obs=next_obs,
)
# This will eventually be replaced with a queue
self.trainer.process_trajectory(trajectory)
for _traj_queue in self.trajectory_queues:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't use leading underscores for local variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

trajectory_queue=Queue(),
policy_queue=Queue(),
)
agent_manager.processor.publish_trajectory_queue(agent_manager.trajectory_queue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels strange that the AgentManager doesn't subscribe to its own trajectory_queue. I'd recommend making AgentManager its own class and set this up in AgentManager.__init__ (you can maybe do this as a namedtuple still, but I think you have to do it in __new__ instead)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made AgentManager a subclass of AgentProcessor, that contains 2 queues. Didn't integrate it with AgentProcessor since in the future we might have processors that subscribe to multiple queues.

@@ -5,13 +5,15 @@
from mlagents.tf_utils import tf

from collections import deque
from queue import Queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need queue.Queue here instead of something lighter-weight like collections.deque? I don't think we need the synchronization that Queue provides (https://docs.python.org/3/library/queue.html)

Copy link
Contributor Author

@ervteng ervteng Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet :) The intention was to enable the environment stepping and trainer advancing to run in separate threads (or even processes) for algorithms like APE-X. Eventually we'd want the trainer to wait until a Trajectory is available before doing any computation. I went with Queue since the method calls are the same between Queue and multiprocessing.Queue, but are different from deque.

We could also just use deque until the time comes to change over.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that deque is actually thread-safe as long as you're using just get and put, so I'll switch to that. Even if we go multi-thread we can still use deque, and it's faster.

https://stackoverflow.com/questions/717148/queue-queue-vs-collections-deque

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should anything still be using queue.Queue()? If not, you should remove the imports and see what's breaking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of the tests still do - will switch them out

Ervin T added 4 commits January 2, 2020 14:19
* Move writing logic out of TC

* Modify trainer config files

* Fix tests

* Update Migrating doc

* Add should_still_train

* Fix write_summary

* Move logic for writing summaries

* Fix summary after loading from checkpoint

* Switch Trainer to abstract class
@ervteng ervteng changed the title WIP: Add Trajectory and Policy Queues Add Trajectory and Policy Queues Jan 3, 2020
separately from an AgentManager.
"""
self.queue: Deque[Union[Trajectory, Policy]] = deque()
self.behavior_id = behavior_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving the queue a behavior_id is understandably a bit weird. I did it this way so that the Trainer can retrieve which behavior id the policy queue belongs to and publish the right policy to it.

The alternative is to give the Trainer access to the AgentManager itself, but I wanted to keep the AgentManager abstraction separate from the Trainer. I'd imagine in the future, we'd want to assemble queues and Trainers in a different way.

@@ -19,12 +19,14 @@ The versions can be found in
* Offline Behavioral Cloning has been removed. To learn from demonstrations, use the GAIL and
Behavioral Cloning features with either PPO or SAC. See [Imitation Learning](Training-Imitation-Learning.md) for more information.
* `mlagents.envs` was renamed to `mlagents_envs`. The previous repo layout depended on [PEP420](https://www.python.org/dev/peps/pep-0420/), which caused problems with some of our tooling such as mypy and pylint.
* Trainer steps are now counted per-Agent, not per-environment as in previous versions. For instance, if you have 10 Agents in the scene, 20 environment steps now corresponds to 200 steps as printed in the terminal and in Tensorboard.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you're going to merge this into the release branch, we should make a "Migrating from 0.13 to latest" section and put these there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Adds a trajectory queue to the list of queues for the trainer injest Trajectories from.
:param queue: Trajectory queue to publish to.
"""
self.trajectory_queues.append(trajectory_queue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong types here - without #3151 mypy doesn't understand what an AgentManagerQueue is. With it, you'll get an error like

Argument 1 to "append" of "list" has incompatible type "Queue[Any]"; expected "AgentManagerQueue"

"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to be explicit about the arguments here, but if you think it's too tedious, ignore me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit it is

Copy link
Contributor

@chriselion chriselion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking until type checks are fixed.

"""
with hierarchical_timer("process_trajectory"):
for traj_queue in self.trajectory_queues:
if not traj_queue.empty():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if there are ever multiple consumers here, you could run into problems - if there's one element in the queue, both could think it's non-empty, and one would try to grab it and the other would fail and raise an exception. Might be better to just to a try/except around the get_nowait() call, and avoid the not empty() check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an AgentManagerQueue.Empty exception and now using try/except here and in TC

@ervteng ervteng requested a review from chriselion January 3, 2020 19:20
Copy link
Contributor

@chriselion chriselion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks for making those changes

@ervteng ervteng merged commit 81310cf into master Jan 3, 2020
@delete-merged-branch delete-merged-branch bot deleted the develop-queue branch January 3, 2020 23:12
@github-actions github-actions bot locked as resolved and limited conversation to collaborators May 17, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants