Skip to content

Move add_experiences out of trainer, add Trajectories #3067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 79 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
9a16838
Split buffer into two buffers (PPO works)
Nov 22, 2019
55b2918
buffer split for SAC
Nov 25, 2019
38f5795
Fix buffer tests and truncate
Nov 25, 2019
453dd4c
Fix RL tests
Nov 25, 2019
b00f779
Fix demo loader and remaining tests
Nov 25, 2019
3b7191b
Remove MANIFEST file
Nov 25, 2019
9c47678
Add type hints to Buffer
Nov 25, 2019
a57a220
Rename append_update_buffer to append_to_update_buffer
Nov 25, 2019
efe29c8
Merge branch 'develop' into develop-splitbuffer
Nov 26, 2019
f5f9598
Non-working commit
Nov 26, 2019
f3459eb
Revert buffer for now
Nov 26, 2019
0b603c7
Another nonworking commit
Nov 27, 2019
ea6e79d
Runs but doesn't do anything yet
Nov 27, 2019
a264b48
Merge branch 'develop' into develop-agentprocessor
Dec 4, 2019
5e4f1bc
Use ProcessingBuffer in AgentProcessor
Dec 4, 2019
a5ac988
Convert to trajectory
Dec 4, 2019
a2e33e8
Looks like it's training
Dec 5, 2019
7004db8
Fix memory leak
Dec 5, 2019
0863ff5
Attempt reward reporting
Dec 5, 2019
88feb1b
Stats reporting is working
Dec 5, 2019
d6fe367
Clean up some stuff
Dec 5, 2019
8e43ecd
No longer using ProcessingBuffer for PPO
Dec 5, 2019
2b32d61
Move trajectory and related functions to trajectory.py
Dec 5, 2019
991be2c
Add back max_step logic
Dec 6, 2019
9b7969b
Merge branch 'master' of github.com:Unity-Technologies/ml-agents into…
Dec 6, 2019
5efd4e9
Remove epsilon
Dec 6, 2019
3bfe3df
Migrate SAC
Dec 6, 2019
f7649ae
Remove dead code
Dec 6, 2019
6b40d00
Move some common logic to buffer class
Dec 6, 2019
bf59521
Kill the ProcessingBuffer
Dec 6, 2019
68984df
Convert BC (warning) might be broken
Dec 6, 2019
12d4467
Fix some bugs for visual obs
Dec 6, 2019
2322150
Fixes for recurrent
Dec 7, 2019
2d084ed
Better decoupling for agent processor
Dec 7, 2019
295e3a0
Fix some of the tests
Dec 7, 2019
9334bb6
Add test for trajectory
Dec 9, 2019
93060b5
Fix BC and tests
Dec 9, 2019
3a3eb5b
Lots of test fixes
Dec 9, 2019
4c5bd73
Remove BootstrapExperience
Dec 9, 2019
1c95992
Move agent_id to Trajectory
Dec 9, 2019
a48e7f7
Add back next_obs
Dec 10, 2019
0053517
Fix test again
Dec 10, 2019
29797b1
Fix PPO value tests
Dec 10, 2019
e9dcdd9
Properly report value estimates and episode length
Dec 10, 2019
68a3b3d
Fix np float32 errors
Dec 10, 2019
6298731
Fix one more np float32 issue
Dec 10, 2019
cd4c09c
Merge branch 'master' into develop-agentprocessor
Dec 10, 2019
1a545c1
Fix some import errors
Dec 10, 2019
9452806
Make conversion methods part of NamedTuples
Dec 11, 2019
1052ad5
Add way to check if trajectory is done or max_reached
Dec 11, 2019
94c5f8c
Add docstring
Dec 11, 2019
866bf9c
Address AgentProcessor comments
Dec 11, 2019
03bd3e4
Allow None max steps
Dec 12, 2019
153368c
Merge branch 'master' into develop-agentprocessor
Dec 12, 2019
fd1312b
Fix tests
Dec 12, 2019
1a7fffd
Fix some mypy issues and remove unused code
Dec 12, 2019
d1b30b3
Fix numpy import
Dec 12, 2019
d9abe26
Remove defaultdict that didn't make sense
Dec 12, 2019
f090033
Fixed value estimate bug
Dec 12, 2019
6a1f275
Fix mypy issue
Dec 12, 2019
0f08718
Add stats reporter class and re-enable missing stats (#3076)
Dec 13, 2019
80a3359
Revert gitignore
Dec 13, 2019
a938d61
Normalize based on number of elements
Dec 14, 2019
63d6dd0
Add comment
Dec 16, 2019
82e8191
Merge branch 'master' into develop-agentprocessor
Dec 16, 2019
9a83b66
New way to update mean and var
Dec 17, 2019
c827581
Merge branch 'master' into develop-agentprocessor
Dec 18, 2019
89f9375
Fix tests
Dec 18, 2019
212cc3b
Add comments for normalization
Dec 18, 2019
10dcc1b
Remove dead code
Dec 18, 2019
2d72b06
Add type hints to rl_trainer
Dec 19, 2019
a0c76c7
Cleanup agent_processor
Dec 19, 2019
b1060e5
Make file creation safer
Dec 19, 2019
70f91af
Fix error message
Dec 19, 2019
8a44fc5
Clean up trajectory and splitobs
Dec 19, 2019
919a00b
Use .get for trainer_parameters
Dec 19, 2019
7122d39
Add test for normalization
Dec 19, 2019
cb1ec87
Float32 array in test
Dec 19, 2019
9d554bb
Fix comment in test
Dec 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ml-agents/mlagents/trainers/action_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import NamedTuple, Any, Dict
import numpy as np

ActionInfoOutputs = Dict[str, Any]
ActionInfoOutputs = Dict[str, np.ndarray]


class ActionInfo(NamedTuple):
Expand Down
198 changes: 139 additions & 59 deletions ml-agents/mlagents/trainers/agent_processor.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,155 @@
from typing import List, Union
import sys
from typing import List, Dict
from collections import defaultdict, Counter

from mlagents.trainers.buffer import AgentBuffer, BufferException
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.trajectory import Trajectory, AgentExperience
from mlagents.trainers.brain import BrainInfo
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.stats import StatsReporter


class ProcessingBuffer(dict):
class AgentProcessor:
"""
ProcessingBuffer contains a dictionary of AgentBuffer. The AgentBuffers are indexed by agent_id.
AgentProcessor contains a dictionary per-agent trajectory buffers. The buffers are indexed by agent_id.
Buffer also contains an update_buffer that corresponds to the buffer used when updating the model.
One AgentProcessor should be created per agent group.
"""

def __str__(self):
return "local_buffers :\n{0}".format(
"\n".join(["\tagent {0} :{1}".format(k, str(self[k])) for k in self.keys()])
)

def __getitem__(self, key):
if key not in self.keys():
self[key] = AgentBuffer()
return super().__getitem__(key)

def reset_local_buffers(self) -> None:
def __init__(
self,
trainer: Trainer,
policy: TFPolicy,
stats_reporter: StatsReporter,
max_trajectory_length: int = sys.maxsize,
):
"""
Resets all the local AgentBuffers.
Create an AgentProcessor.
:param trainer: Trainer instance connected to this AgentProcessor. Trainer is given trajectory
when it is finished.
:param policy: Policy instance associated with this AgentProcessor.
:param max_trajectory_length: Maximum length of a trajectory before it is added to the trainer.
:param stats_category: The category under which to write the stats. Usually, this comes from the Trainer.
"""
for buf in self.values():
buf.reset_agent()
self.experience_buffers: Dict[str, List[AgentExperience]] = defaultdict(list)
self.last_brain_info: Dict[str, BrainInfo] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the last_brain_info per agent ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

self.last_take_action_outputs: Dict[str, ActionInfoOutputs] = {}
# Note: this is needed until we switch to AgentExperiences as the data input type.
# We still need some info from the policy (memories, previous actions)
# that really should be gathered by the env-manager.
self.policy = policy
self.episode_steps: Counter = Counter()
self.episode_rewards: Dict[str, float] = defaultdict(float)
self.stats_reporter = stats_reporter
self.trainer = trainer
self.max_trajectory_length = max_trajectory_length

def append_to_update_buffer(
def add_experiences(
self,
update_buffer: AgentBuffer,
agent_id: Union[int, str],
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Appends the buffer of an agent to the update buffer.
:param update_buffer: A reference to an AgentBuffer to append the agent's buffer to
:param agent_id: The id of the agent which data will be appended
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
Adds experiences to each agent's experience history.
:param curr_info: current BrainInfo.
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
if key_list is None:
key_list = self[agent_id].keys()
if not self[agent_id].check_length(key_list):
raise BufferException(
"The length of the fields {0} for agent {1} were not of same length".format(
key_list, agent_id
)
if take_action_outputs:
self.stats_reporter.add_stat(
"Policy/Entropy", take_action_outputs["entropy"].mean()
)
for field_key in key_list:
update_buffer[field_key].extend(
self[agent_id][field_key].get_batch(
batch_size=batch_size, training_length=training_length
)
self.stats_reporter.add_stat(
"Policy/Learning Rate", take_action_outputs["learning_rate"]
)

def append_all_agent_batch_to_update_buffer(
self,
update_buffer: AgentBuffer,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Appends the buffer of all agents to the update buffer.
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
for agent_id in self.keys():
self.append_to_update_buffer(
update_buffer, agent_id, key_list, batch_size, training_length
)
for agent_id in curr_info.agents:
self.last_brain_info[agent_id] = curr_info
self.last_take_action_outputs[agent_id] = take_action_outputs

# Store the environment reward
tmp_environment_reward = next_info.rewards

for next_idx, agent_id in enumerate(next_info.agents):
stored_info = self.last_brain_info.get(agent_id, None)
if stored_info is not None:
stored_take_action_outputs = self.last_take_action_outputs[agent_id]
idx = stored_info.agents.index(agent_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little worried about the O(N) lookup here since we're doing it N times.

Copy link
Contributor

@chriselion chriselion Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to do something like

agent_id_to_index = {agent_id, i for i, agent_id in enumerate(stored_info.agents)}

outside the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky bit here is that the stored_info might be different per iteration of the loop (some agents in next_info might not have been in the previous info and vice-versa). So the index might change as well. To make matters worse, we do this indexing twice (once here, and once in the LL-Python API to convert BatchedState-> BrainInfo).

Long-term we will be removing BrainInfo, (today: BatchedState -> BrainInfo -> AgentExperience, end goal: BatchedState -> AgentExperience) so I think we will be able to get away with simply adding to trajectories agent-by-agent. We won't have to store the stored_info anymore. In this case, we will only have to do the indexing once.

obs = []
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
obs.append(stored_info.visual_observations[i][idx])
if self.policy.use_vec_obs:
obs.append(stored_info.vector_observations[idx])
if self.policy.use_recurrent:
memory = self.policy.retrieve_memories([agent_id])[0, :]
else:
memory = None

done = next_info.local_done[next_idx]
max_step = next_info.max_reached[next_idx]

# Add the outputs of the last eval
action = stored_take_action_outputs["action"][idx]
if self.policy.use_continuous_act:
action_pre = stored_take_action_outputs["pre_action"][idx]
else:
action_pre = None
action_probs = stored_take_action_outputs["log_probs"][idx]
action_masks = stored_info.action_masks[idx]
prev_action = self.policy.retrieve_previous_action([agent_id])[0, :]

experience = AgentExperience(
obs=obs,
reward=tmp_environment_reward[next_idx],
done=done,
action=action,
action_probs=action_probs,
action_pre=action_pre,
action_mask=action_masks,
prev_action=prev_action,
max_step=max_step,
memory=memory,
)
# Add the value outputs if needed
self.experience_buffers[agent_id].append(experience)
self.episode_rewards[agent_id] += tmp_environment_reward[next_idx]
if (
next_info.local_done[next_idx]
or (
len(self.experience_buffers[agent_id])
>= self.max_trajectory_length
)
) and len(self.experience_buffers[agent_id]) > 0:
# Make next AgentExperience
next_obs = []
for i, _ in enumerate(next_info.visual_observations):
next_obs.append(next_info.visual_observations[i][next_idx])
if self.policy.use_vec_obs:
next_obs.append(next_info.vector_observations[next_idx])
trajectory = Trajectory(
steps=self.experience_buffers[agent_id],
agent_id=agent_id,
next_obs=next_obs,
)
# This will eventually be replaced with a queue
self.trainer.process_trajectory(trajectory)
self.experience_buffers[agent_id] = []
if next_info.local_done[next_idx]:
self.stats_reporter.add_stat(
"Environment/Cumulative Reward",
self.episode_rewards.get(agent_id, 0),
)
self.stats_reporter.add_stat(
"Environment/Episode Length",
self.episode_steps.get(agent_id, 0),
)
del self.episode_steps[agent_id]
del self.episode_rewards[agent_id]
elif not next_info.local_done[next_idx]:
self.episode_steps[agent_id] += 1
self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)
29 changes: 29 additions & 0 deletions ml-agents/mlagents/trainers/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,35 @@ def truncate(self, max_length: int, sequence_length: int = 1) -> None:
for _key in self.keys():
self[_key] = self[_key][current_length - max_length :]

def resequence_and_append(
self,
target_buffer: "AgentBuffer",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this magic string here ? Will it cause problems if a user names a brain AgentBuffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a magic string - it's a type annotation :P -> https://mypy.readthedocs.io/en/latest/cheat_sheet_py3.html#miscellaneous

key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Takes in a batch size and training length (sequence length), and appends this AgentBuffer to target_buffer
properly padded for LSTM use. Optionally, use key_list to restrict which fields are inserted into the new
buffer.
:param target_buffer: The buffer which to append the samples to.
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
if key_list is None:
key_list = list(self.keys())
if not self.check_length(key_list):
raise BufferException(
"The length of the fields {0} were not of same length".format(key_list)
)
for field_key in key_list:
target_buffer[field_key].extend(
self[field_key].get_batch(
batch_size=batch_size, training_length=training_length
)
)

@property
def num_experiences(self) -> int:
"""
Expand Down
1 change: 0 additions & 1 deletion ml-agents/mlagents/trainers/components/bc/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def _update_batch(
feed_dict[self.policy.model.prev_action] = mini_batch_demo[
"prev_action"
]

network_out = self.policy.sess.run(
list(self.out_dict.values()), feed_dict=feed_dict
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def evaluate(
return RewardSignalResult(scaled_reward, unscaled_reward)

def evaluate_batch(self, mini_batch: Dict[str, np.array]) -> RewardSignalResult:
env_rews = np.array(mini_batch["environment_rewards"])
env_rews = np.array(mini_batch["environment_rewards"], dtype=np.float32)
return RewardSignalResult(self.strength * env_rews, env_rews)
13 changes: 7 additions & 6 deletions ml-agents/mlagents/trainers/curriculum.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import json
import math
from typing import Dict, Any, TextIO

from .exception import CurriculumConfigError, CurriculumLoadingError

Expand Down Expand Up @@ -51,14 +52,14 @@ def __init__(self, location):
)

@property
def lesson_num(self):
def lesson_num(self) -> int:
return self._lesson_num

@lesson_num.setter
def lesson_num(self, lesson_num):
def lesson_num(self, lesson_num: int) -> None:
self._lesson_num = max(0, min(lesson_num, self.max_lesson_num))

def increment_lesson(self, measure_val):
def increment_lesson(self, measure_val: float) -> bool:
"""
Increments the lesson number depending on the progress given.
:param measure_val: Measure of progress (either reward or percentage
Expand Down Expand Up @@ -87,7 +88,7 @@ def increment_lesson(self, measure_val):
return True
return False

def get_config(self, lesson=None):
def get_config(self, lesson: int = None) -> Dict[str, Any]:
"""
Returns reset parameters which correspond to the lesson.
:param lesson: The lesson you want to get the config of. If None, the
Expand All @@ -106,7 +107,7 @@ def get_config(self, lesson=None):
return config

@staticmethod
def load_curriculum_file(location):
def load_curriculum_file(location: str) -> None:
try:
with open(location) as data_file:
return Curriculum._load_curriculum(data_file)
Expand All @@ -120,7 +121,7 @@ def load_curriculum_file(location):
)

@staticmethod
def _load_curriculum(fp):
def _load_curriculum(fp: TextIO) -> None:
try:
return json.load(fp)
except json.decoder.JSONDecodeError as e:
Expand Down
32 changes: 14 additions & 18 deletions ml-agents/mlagents/trainers/demo_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import List, Tuple
import numpy as np
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
from mlagents.trainers.brain import BrainParameters, BrainInfo
from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import (
AgentInfoActionPairProto,
Expand All @@ -27,8 +26,8 @@ def make_demo_buffer(
sequence_length: int,
) -> AgentBuffer:
# Create and populate buffer using experiences
demo_process_buffer = ProcessingBuffer()
demo_buffer = AgentBuffer()
demo_raw_buffer = AgentBuffer()
demo_processed_buffer = AgentBuffer()
for idx, experience in enumerate(pair_infos):
if idx > len(pair_infos) - 2:
break
Expand All @@ -47,30 +46,27 @@ def make_demo_buffer(
previous_action = np.array(
pair_infos[idx - 1].action_info.vector_actions, dtype=np.float32
)
demo_process_buffer[0].last_brain_info = current_brain_info
demo_process_buffer[0]["done"].append(next_brain_info.local_done[0])
demo_process_buffer[0]["rewards"].append(next_brain_info.rewards[0])
demo_raw_buffer["done"].append(next_brain_info.local_done[0])
demo_raw_buffer["rewards"].append(next_brain_info.rewards[0])
for i in range(brain_params.number_visual_observations):
demo_process_buffer[0]["visual_obs%d" % i].append(
demo_raw_buffer["visual_obs%d" % i].append(
current_brain_info.visual_observations[i][0]
)
if brain_params.vector_observation_space_size > 0:
demo_process_buffer[0]["vector_obs"].append(
demo_raw_buffer["vector_obs"].append(
current_brain_info.vector_observations[0]
)
demo_process_buffer[0]["actions"].append(
current_pair_info.action_info.vector_actions
)
demo_process_buffer[0]["prev_action"].append(previous_action)
demo_raw_buffer["actions"].append(current_pair_info.action_info.vector_actions)
demo_raw_buffer["prev_action"].append(previous_action)
if next_brain_info.local_done[0]:
demo_process_buffer.append_to_update_buffer(
demo_buffer, 0, batch_size=None, training_length=sequence_length
demo_raw_buffer.resequence_and_append(
demo_processed_buffer, batch_size=None, training_length=sequence_length
)
demo_process_buffer.reset_local_buffers()
demo_process_buffer.append_to_update_buffer(
demo_buffer, 0, batch_size=None, training_length=sequence_length
demo_raw_buffer.reset_agent()
demo_raw_buffer.resequence_and_append(
demo_processed_buffer, batch_size=None, training_length=sequence_length
)
return demo_buffer
return demo_processed_buffer


@timed
Expand Down
Loading