-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Move add_experiences out of trainer, add Trajectories #3067
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… develop-agentprocessor
self.episode_steps: Counter = Counter() | ||
self.episode_rewards: Dict[str, float] = defaultdict(float) | ||
self.stats_reporter = stats_reporter | ||
if max_trajectory_length: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this default to a huge number instead (like sys.maxsize
)? That might get rid of the need for ignore_max_length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redid the logic to use sys.maxsize
as default and now feeds that in in trainer_controller.py
when no time_horizon is specified.
if stored_info is not None: | ||
stored_take_action_outputs = self.last_take_action_outputs[agent_id] | ||
idx = stored_info.agents.index(agent_id) | ||
next_idx = next_info.agents.index(agent_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need to look this up since you're already iterating over next_info.agents, right? Just change
for agent_id in next_info.agents:
to
for next_idx, agent_id in enumerate(next_info.agents):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call - changed.
stored_info = self.last_brain_info.get(agent_id, None) | ||
if stored_info is not None: | ||
stored_take_action_outputs = self.last_take_action_outputs[agent_id] | ||
idx = stored_info.agents.index(agent_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Little worried about the O(N) lookup here since we're doing it N times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to do something like
agent_id_to_index = {agent_id, i for i, agent_id in enumerate(stored_info.agents)}
outside the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky bit here is that the stored_info might be different per iteration of the loop (some agents in next_info might not have been in the previous info and vice-versa). So the index might change as well. To make matters worse, we do this indexing twice (once here, and once in the LL-Python API to convert BatchedState-> BrainInfo).
Long-term we will be removing BrainInfo, (today: BatchedState -> BrainInfo -> AgentExperience, end goal: BatchedState -> AgentExperience) so I think we will be able to get away with simply adding to trajectories agent-by-agent. We won't have to store the stored_info anymore. In this case, we will only have to do the indexing once.
@@ -43,230 +30,39 @@ def __init__(self, *args, **kwargs): | |||
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward | |||
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless | |||
# of what reward signals are actually present. | |||
self.collected_rewards = {"environment": {}} | |||
self.processing_buffer = ProcessingBuffer() | |||
self.collected_rewards = {"environment": defaultdict(lambda: 0)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type hint
ml-agents/mlagents/trainers/stats.py
Outdated
filewriter_dir = "{basedir}/{category}".format( | ||
basedir=self.base_dir, category=category | ||
) | ||
if not os.path.exists(filewriter_dir): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have missed this before - you can just do os.makedirs(filewriter_dir, exist_ok=True)
instead of checking os.path.exists()
. see https://docs.python.org/3/library/os.html#os.makedirs
assert statssummary2.num == 10 | ||
assert statssummary1.mean == 4.5 | ||
assert statssummary2.mean == 4.5 | ||
assert round(statssummary1.std, 1) == 2.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can also do assert statssummary1.std == pytest.approx(2.9)
- maybe a little cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pytest.approx(2.9, abs=0.1)
does the same thing I think
# Test write_stats | ||
base_dir = "base_dir" | ||
category = "category1" | ||
tb_writer = TensorboardWriter(base_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this actually make a directory somewhere if I run tests locally? If so, you should do something like
import tempfile
...
with tempfile.TemporaryDirectory(prefix="unittest-") as base_dir:
# rest of test here
Then the directory will get automatically cleaned up when the context manager closes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that this create ./base_dir
and ./base_dir/category1
locally. Please use a tempfile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test now uses tempfile.
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None: | ||
""" | ||
Saves text to Tensorboard. | ||
Note: Only works on tensorflow r1.2 or above. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we only support tf >= 1.7, so we can drop this comment
trainer, | ||
trainer.policy, | ||
trainer.parameters["time_horizon"] | ||
if "time_horizon" in trainer.parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with trainer.parameters.get("time_horizon")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call - I now get with a default value of sys.maxsize.
""" | ||
agent_buffer_trajectory = AgentBuffer() | ||
for step, exp in enumerate(self.steps): | ||
vec_vis_obs = SplitObservations.from_observations(exp.obs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that you're calling SplitObservations.from_observations twice on each element (except maybe the start and end). Might be worth changing the loop to something like
vec_vis_obs = SplitObservations.from_observations(self.steps[0])
for step, exp in enumerate(self.steps):
# get next_vec_vis_obs same as now
# rest of loop
vec_vis_obs = next_vec_vis_obs
but then again, from_observations looks pretty fast unless you're concatenating large vector observations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, fixed
:param obs: List of numpy arrays (observation) | ||
:returns: A SplitObservations object. | ||
""" | ||
vis_obs_indices = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should avoid creating the indices arrays here. You can form vis_obs
and the input to np.concatenate()
directly on the loop through obs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call - I now just build the list directly.
I think we use the same logic in the brain_conversion_utils.py
file as well
|
||
agent_buffer_trajectory["prev_action"].append(exp.prev_action) | ||
|
||
# Add the value outputs if needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure this comment matches the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope - removed.
if not os.path.exists(self.summary_path): | ||
os.makedirs(self.summary_path) | ||
self.stats_reporter = StatsReporter(self.summary_path) | ||
# if not os.path.exists(self.summary_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: dead code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - removed
self.last_take_action_outputs[agent_id] = take_action_outputs | ||
|
||
# Store the environment reward | ||
tmp_environment_reward = np.array(next_info.rewards, dtype=np.float32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to convert this to an np.array
? Looks like you only use it twice and always look it up by index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point this was necessary but not anymore - fixed.
mean_current_observation - self.running_mean | ||
# Based on Welford's algorithm for running mean and standard deviation, for batch updates. Discussion here: | ||
# https://stackoverflow.com/questions/56402955/whats-the-formula-for-welfords-algorithm-for-variance-std-with-batch-updates | ||
steps_increment = tf.shape(vector_input)[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to unit test this with some synthetic data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for normalization in test_ppo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, some minor feedback. Feel free to punt some of it to a followup PR
Moves add_experiences out of the trainer class, and into a separate AgentProcessor class. Also, introduce the Trajectory abstraction.
In this new design, the AgentProcessor is given the BrainInfos through an add_experiences method. This then assembles Trajectories (lists of AgentExperiences) that are passed to a Trainer. The trainer can assume all AgentExperiences within a Trajectory belong to the same agent, and ingest it into the update buffer as appropriate.
NOTE: New normalization changes causes slight degradation in some environments (Crawler, Reacher) and a boost in other (Walker.) It also seems to improve performance with SAC (makes sense). New normalization should be more correct, as before we were taking the running mean and std of the mean obs across all agents, while now we are doing every experience separately.
ToDo for this PR:
Add TensorBoard logger that spans both inference stats (entropy, value estimates, rewards, etc.) and training stats.Verify training performance (reward) is same. Currently seeing a small degradation in Crawler, Walker, and Reacher.ToDo in subsequent PRs:
AgentManager
NamedTuple.process_trajectory
a private method in the Trainer, and rather give the trainer anadvance()
method that will ingest a trajectory from the queue and callprocess_trajectory
.