-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Add Trajectory and Policy Queues #3113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -134,8 +133,8 @@ def add_experiences( | |||
agent_id=agent_id, | |||
next_obs=next_obs, | |||
) | |||
# This will eventually be replaced with a queue | |||
self.trainer.process_trajectory(trajectory) | |||
for _traj_queue in self.trajectory_queues: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't use leading underscores for local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
trajectory_queue=Queue(), | ||
policy_queue=Queue(), | ||
) | ||
agent_manager.processor.publish_trajectory_queue(agent_manager.trajectory_queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels strange that the AgentManager doesn't subscribe to its own trajectory_queue. I'd recommend making AgentManager its own class and set this up in AgentManager.__init__
(you can maybe do this as a namedtuple still, but I think you have to do it in __new__
instead)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made AgentManager a subclass of AgentProcessor, that contains 2 queues. Didn't integrate it with AgentProcessor since in the future we might have processors that subscribe to multiple queues.
@@ -5,13 +5,15 @@ | |||
from mlagents.tf_utils import tf | |||
|
|||
from collections import deque | |||
from queue import Queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need queue.Queue
here instead of something lighter-weight like collections.deque
? I don't think we need the synchronization that Queue provides (https://docs.python.org/3/library/queue.html)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet :) The intention was to enable the environment stepping and trainer advancing to run in separate threads (or even processes) for algorithms like APE-X. Eventually we'd want the trainer to wait until a Trajectory is available before doing any computation. I went with Queue since the method calls are the same between Queue and multiprocessing.Queue, but are different from deque
.
We could also just use deque
until the time comes to change over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that deque
is actually thread-safe as long as you're using just get
and put
, so I'll switch to that. Even if we go multi-thread we can still use deque
, and it's faster.
https://stackoverflow.com/questions/717148/queue-queue-vs-collections-deque
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should anything still be using queue.Queue()? If not, you should remove the imports and see what's breaking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some of the tests still do - will switch them out
* Move writing logic out of TC * Modify trainer config files * Fix tests * Update Migrating doc * Add should_still_train * Fix write_summary * Move logic for writing summaries * Fix summary after loading from checkpoint * Switch Trainer to abstract class
separately from an AgentManager. | ||
""" | ||
self.queue: Deque[Union[Trajectory, Policy]] = deque() | ||
self.behavior_id = behavior_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Giving the queue a behavior_id is understandably a bit weird. I did it this way so that the Trainer can retrieve which behavior id the policy queue belongs to and publish the right policy to it.
The alternative is to give the Trainer access to the AgentManager itself, but I wanted to keep the AgentManager abstraction separate from the Trainer. I'd imagine in the future, we'd want to assemble queues and Trainers in a different way.
docs/Migrating.md
Outdated
@@ -19,12 +19,14 @@ The versions can be found in | |||
* Offline Behavioral Cloning has been removed. To learn from demonstrations, use the GAIL and | |||
Behavioral Cloning features with either PPO or SAC. See [Imitation Learning](Training-Imitation-Learning.md) for more information. | |||
* `mlagents.envs` was renamed to `mlagents_envs`. The previous repo layout depended on [PEP420](https://www.python.org/dev/peps/pep-0420/), which caused problems with some of our tooling such as mypy and pylint. | |||
* Trainer steps are now counted per-Agent, not per-environment as in previous versions. For instance, if you have 10 Agents in the scene, 20 environment steps now corresponds to 200 steps as printed in the terminal and in Tensorboard. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless you're going to merge this into the release branch, we should make a "Migrating from 0.13 to latest" section and put these there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Adds a trajectory queue to the list of queues for the trainer injest Trajectories from. | ||
:param queue: Trajectory queue to publish to. | ||
""" | ||
self.trajectory_queues.append(trajectory_queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong types here - without #3151 mypy doesn't understand what an AgentManagerQueue is. With it, you'll get an error like
Argument 1 to "append" of "list" has incompatible type "Queue[Any]"; expected "AgentManagerQueue"
""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to be explicit about the arguments here, but if you think it's too tedious, ignore me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicit it is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking until type checks are fixed.
""" | ||
with hierarchical_timer("process_trajectory"): | ||
for traj_queue in self.trajectory_queues: | ||
if not traj_queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if there are ever multiple consumers here, you could run into problems - if there's one element in the queue, both could think it's non-empty, and one would try to grab it and the other would fail and raise an exception. Might be better to just to a try/except around the get_nowait()
call, and avoid the not empty()
check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an AgentManagerQueue.Empty exception and now using try/except here and in TC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for making those changes
This PR adds Trajectory and Policy Queues between the env_manager, AgentProcessor, and Trainers. Getting one step closer to decoupling the environment stepping and the trainer training.
However there are still two points of coupling that remain:
Summary and model writing. When the summaries are written is driven by the TrainerController currently - the TC tells the Trainers when to write out to Tensorboard. It would be better to have the trainer write out when appropriate during thetrainer.advance()
call -> Addressed in PR: Move stepping logic into advance() function #3124Curriculum. Somehow, the TC still needs to know what the individual trainers' rewards are to update reset parameters in the environment. I think we might be able to get these stats from the AgentProcessor instead. - Postponed to future PR