Skip to content

Move add_experiences out of trainer, add Trajectories #3067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 79 commits into from
Dec 19, 2019

Conversation

ervteng
Copy link
Contributor

@ervteng ervteng commented Dec 10, 2019

Moves add_experiences out of the trainer class, and into a separate AgentProcessor class. Also, introduce the Trajectory abstraction.

In this new design, the AgentProcessor is given the BrainInfos through an add_experiences method. This then assembles Trajectories (lists of AgentExperiences) that are passed to a Trainer. The trainer can assume all AgentExperiences within a Trajectory belong to the same agent, and ingest it into the update buffer as appropriate.

NOTE: New normalization changes causes slight degradation in some environments (Crawler, Reacher) and a boost in other (Walker.) It also seems to improve performance with SAC (makes sense). New normalization should be more correct, as before we were taking the running mean and std of the mean obs across all agents, while now we are doing every experience separately.

ToDo for this PR:

  • Add TensorBoard logger that spans both inference stats (entropy, value estimates, rewards, etc.) and training stats.
  • Verify training performance (reward) is same. Currently seeing a small degradation in Crawler, Walker, and Reacher.

ToDo in subsequent PRs:

  • Add Queues between AgentProcessor and Trainers, to be stored as part of the AgentManager NamedTuple.
  • Make process_trajectory a private method in the Trainer, and rather give the trainer an advance() method that will ingest a trajectory from the queue and call process_trajectory.
  • Remove reference to Trainer in AgentProcessor.
  • Change internal buffer representation inside Trainer to be made of AgentExperiences rather than the old AgentBuffer format.

self.episode_steps: Counter = Counter()
self.episode_rewards: Dict[str, float] = defaultdict(float)
self.stats_reporter = stats_reporter
if max_trajectory_length:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make this default to a huge number instead (like sys.maxsize)? That might get rid of the need for ignore_max_length

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redid the logic to use sys.maxsize as default and now feeds that in in trainer_controller.py when no time_horizon is specified.

if stored_info is not None:
stored_take_action_outputs = self.last_take_action_outputs[agent_id]
idx = stored_info.agents.index(agent_id)
next_idx = next_info.agents.index(agent_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to look this up since you're already iterating over next_info.agents, right? Just change

for agent_id in next_info.agents:

to

for next_idx, agent_id in enumerate(next_info.agents):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - changed.

stored_info = self.last_brain_info.get(agent_id, None)
if stored_info is not None:
stored_take_action_outputs = self.last_take_action_outputs[agent_id]
idx = stored_info.agents.index(agent_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little worried about the O(N) lookup here since we're doing it N times.

Copy link
Contributor

@chriselion chriselion Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to do something like

agent_id_to_index = {agent_id, i for i, agent_id in enumerate(stored_info.agents)}

outside the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky bit here is that the stored_info might be different per iteration of the loop (some agents in next_info might not have been in the previous info and vice-versa). So the index might change as well. To make matters worse, we do this indexing twice (once here, and once in the LL-Python API to convert BatchedState-> BrainInfo).

Long-term we will be removing BrainInfo, (today: BatchedState -> BrainInfo -> AgentExperience, end goal: BatchedState -> AgentExperience) so I think we will be able to get away with simply adding to trajectories agent-by-agent. We won't have to store the stored_info anymore. In this case, we will only have to do the indexing once.

@@ -43,230 +30,39 @@ def __init__(self, *args, **kwargs):
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless
# of what reward signals are actually present.
self.collected_rewards = {"environment": {}}
self.processing_buffer = ProcessingBuffer()
self.collected_rewards = {"environment": defaultdict(lambda: 0)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type hint

filewriter_dir = "{basedir}/{category}".format(
basedir=self.base_dir, category=category
)
if not os.path.exists(filewriter_dir):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed this before - you can just do os.makedirs(filewriter_dir, exist_ok=True) instead of checking os.path.exists(). see https://docs.python.org/3/library/os.html#os.makedirs

assert statssummary2.num == 10
assert statssummary1.mean == 4.5
assert statssummary2.mean == 4.5
assert round(statssummary1.std, 1) == 2.9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can also do assert statssummary1.std == pytest.approx(2.9) - maybe a little cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pytest.approx(2.9, abs=0.1) does the same thing I think

# Test write_stats
base_dir = "base_dir"
category = "category1"
tb_writer = TensorboardWriter(base_dir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this actually make a directory somewhere if I run tests locally? If so, you should do something like

import tempfile
...
with tempfile.TemporaryDirectory(prefix="unittest-") as base_dir:
    # rest of test here

Then the directory will get automatically cleaned up when the context manager closes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that this create ./base_dir and ./base_dir/category1 locally. Please use a tempfile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test now uses tempfile.

def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None:
"""
Saves text to Tensorboard.
Note: Only works on tensorflow r1.2 or above.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we only support tf >= 1.7, so we can drop this comment

trainer,
trainer.policy,
trainer.parameters["time_horizon"]
if "time_horizon" in trainer.parameters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace with trainer.parameters.get("time_horizon")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - I now get with a default value of sys.maxsize.

"""
agent_buffer_trajectory = AgentBuffer()
for step, exp in enumerate(self.steps):
vec_vis_obs = SplitObservations.from_observations(exp.obs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that you're calling SplitObservations.from_observations twice on each element (except maybe the start and end). Might be worth changing the loop to something like

vec_vis_obs = SplitObservations.from_observations(self.steps[0])
for step, exp in enumerate(self.steps):
  # get next_vec_vis_obs same as now
  # rest of loop
  vec_vis_obs = next_vec_vis_obs

but then again, from_observations looks pretty fast unless you're concatenating large vector observations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, fixed

:param obs: List of numpy arrays (observation)
:returns: A SplitObservations object.
"""
vis_obs_indices = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should avoid creating the indices arrays here. You can form vis_obs and the input to np.concatenate() directly on the loop through obs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - I now just build the list directly.

I think we use the same logic in the brain_conversion_utils.py file as well


agent_buffer_trajectory["prev_action"].append(exp.prev_action)

# Add the value outputs if needed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure this comment matches the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope - removed.

if not os.path.exists(self.summary_path):
os.makedirs(self.summary_path)
self.stats_reporter = StatsReporter(self.summary_path)
# if not os.path.exists(self.summary_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dead code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - removed

self.last_take_action_outputs[agent_id] = take_action_outputs

# Store the environment reward
tmp_environment_reward = np.array(next_info.rewards, dtype=np.float32)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to convert this to an np.array? Looks like you only use it twice and always look it up by index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point this was necessary but not anymore - fixed.

mean_current_observation - self.running_mean
# Based on Welford's algorithm for running mean and standard deviation, for batch updates. Discussion here:
# https://stackoverflow.com/questions/56402955/whats-the-formula-for-welfords-algorithm-for-variance-std-with-batch-updates
steps_increment = tf.shape(vector_input)[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to unit test this with some synthetic data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for normalization in test_ppo

Copy link
Contributor

@chriselion chriselion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, some minor feedback. Feel free to punt some of it to a followup PR

@ervteng ervteng merged commit dfe9c11 into master Dec 19, 2019
@delete-merged-branch delete-merged-branch bot deleted the develop-agentprocessor branch December 19, 2019 01:24
@github-actions github-actions bot locked as resolved and limited conversation to collaborators May 17, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants