Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/travis/format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ check_command_exist() {
if ! [ -x "$(command -v "$1")" ]; then
echo "$1 not installed. pip install $1==$VERSION"
exit 1
fi
fi
}

check_command_exist yapf
Expand Down Expand Up @@ -104,7 +104,7 @@ format_changed() {
yapf --in-place "${YAPF_EXCLUDES[@]}" "${YAPF_FLAGS[@]}"
if which flake8 >/dev/null; then
git diff --name-only --diff-filter=ACRM "$MERGEBASE" -- '*.py' | xargs -P 5 \
flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,streaming/python/generated,doc/source/conf.py,python/ray/cloudpickle/,python/ray/thirdparty_files/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605
flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,streaming/python/generated,doc/source/conf.py,python/ray/cloudpickle/,python/ray/thirdparty_files/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605,F821
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ignore errors related to Type-annotations (those that use quotes around the actual class (b/c we want to avoid circular import errors)).

fi
fi

Expand Down
7 changes: 4 additions & 3 deletions rllib/agents/ppo/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ def validate_config(config):
if isinstance(config["entropy_coeff"], int):
config["entropy_coeff"] = float(config["entropy_coeff"])
if config["sgd_minibatch_size"] > config["train_batch_size"]:
raise ValueError(
"Minibatch size {} must be <= train batch size {}.".format(
config["sgd_minibatch_size"], config["train_batch_size"]))
raise ValueError("`sgd_minibatch_size` ({}) must be <= "
"`train_batch_size` ({}).".format(
config["sgd_minibatch_size"],
config["train_batch_size"]))
if config["batch_mode"] == "truncate_episodes" and not config["use_gae"]:
raise ValueError(
"Episode truncation is not supported without a value "
Expand Down
6 changes: 2 additions & 4 deletions rllib/agents/ppo/ppo_tf_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def postprocess_ppo_gae(policy,
episode=None):
"""Adds the policy logits, VF preds, and advantages to the trajectory."""

completed = sample_batch["dones"][-1]
completed = sample_batch[SampleBatch.DONES][-1]
if completed:
last_r = 0.0
else:
Expand Down Expand Up @@ -207,9 +207,7 @@ def __init__(self, config):
self.kl_coeff_val = config["kl_coeff"]
self.kl_target = config["kl_target"]
self.kl_coeff = get_variable(
float(self.kl_coeff_val),
tf_name="kl_coeff",
trainable=False)
float(self.kl_coeff_val), tf_name="kl_coeff", trainable=False)

def update_kl(self, sampled_kl):
if sampled_kl > 2.0 * self.kl_target:
Expand Down
10 changes: 5 additions & 5 deletions rllib/agents/ppo/ppo_torch_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def reduce_mean_valid(t):


def ppo_surrogate_loss(policy, model, dist_class, train_batch):
logits, state = model.from_batch(train_batch)
logits, state = model.from_batch(train_batch, is_training=True)
action_dist = dist_class(logits, model)

mask = None
Expand Down Expand Up @@ -194,10 +194,10 @@ def value(ob, prev_action, prev_reward, *state):
SampleBatch.PREV_REWARDS: convert_to_torch_tensor(
np.asarray([prev_reward]), self.device),
"is_training": False,
}, [convert_to_torch_tensor(np.asarray([s]), self.device) for
s in state],
convert_to_torch_tensor(
np.asarray([1]), self.device))
}, [
convert_to_torch_tensor(np.asarray([s]), self.device)
for s in state
], convert_to_torch_tensor(np.asarray([1]), self.device))
return self.model.value_function()[0]

else:
Expand Down
12 changes: 8 additions & 4 deletions rllib/agents/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@
"env": None,
# Unsquash actions to the upper and lower bounds of env's action space
"normalize_actions": False,
# Whether to clip rewards prior to experience postprocessing. Setting to
# None means clip for Atari only.
# Whether to clip rewards during Policy's postprocessing.
# None (default): Clip for Atari only (r=sign(r)).
# True: r=sign(r): Fixed rewards -1.0, 1.0, or 0.0.
# False: Never clip.
# [float value]: Clip at -value and + value.
# Tuple[value1, value2]: Clip at value1 and value2.
"clip_rewards": None,
# Whether to np.clip() actions to the action space low/high range spec.
# Whether to clip actions to the action space's low/high range spec.
"clip_actions": True,
# Whether to use rllib or deepmind preprocessors by default
# Whether to use "rllib" or "deepmind" preprocessors by default
"preprocessor_pref": "deepmind",
# The default learning rate.
"lr": 0.0001,
Expand Down
35 changes: 16 additions & 19 deletions rllib/evaluation/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ def compute_advantages(rollout: SampleBatch,
processed rewards.
"""

traj = {}
trajsize = len(rollout[SampleBatch.ACTIONS])
for key in rollout:
traj[key] = np.stack(rollout[key])
rollout_size = len(rollout[SampleBatch.ACTIONS])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't copy the entire batch here. Significantly speeds up postprocessing.


assert SampleBatch.VF_PREDS in rollout or not use_critic, \
"use_critic=True but values not found"
Expand All @@ -54,13 +51,13 @@ def compute_advantages(rollout: SampleBatch,
[rollout[SampleBatch.VF_PREDS],
np.array([last_r])])
delta_t = (
traj[SampleBatch.REWARDS] + gamma * vpred_t[1:] - vpred_t[:-1])
rollout[SampleBatch.REWARDS] + gamma * vpred_t[1:] - vpred_t[:-1])
# This formula for the advantage comes from:
# "Generalized Advantage Estimation": https://arxiv.org/abs/1506.02438
traj[Postprocessing.ADVANTAGES] = discount(delta_t, gamma * lambda_)
traj[Postprocessing.VALUE_TARGETS] = (
traj[Postprocessing.ADVANTAGES] +
traj[SampleBatch.VF_PREDS]).copy().astype(np.float32)
rollout[Postprocessing.ADVANTAGES] = discount(delta_t, gamma * lambda_)
rollout[Postprocessing.VALUE_TARGETS] = (
rollout[Postprocessing.ADVANTAGES] +
rollout[SampleBatch.VF_PREDS]).copy().astype(np.float32)
else:
rewards_plus_v = np.concatenate(
[rollout[SampleBatch.REWARDS],
Expand All @@ -69,18 +66,18 @@ def compute_advantages(rollout: SampleBatch,
gamma)[:-1].copy().astype(np.float32)

if use_critic:
traj[Postprocessing.
ADVANTAGES] = discounted_returns - rollout[SampleBatch.
VF_PREDS]
traj[Postprocessing.VALUE_TARGETS] = discounted_returns
rollout[Postprocessing.
ADVANTAGES] = discounted_returns - rollout[SampleBatch.
VF_PREDS]
rollout[Postprocessing.VALUE_TARGETS] = discounted_returns
else:
traj[Postprocessing.ADVANTAGES] = discounted_returns
traj[Postprocessing.VALUE_TARGETS] = np.zeros_like(
traj[Postprocessing.ADVANTAGES])
rollout[Postprocessing.ADVANTAGES] = discounted_returns
rollout[Postprocessing.VALUE_TARGETS] = np.zeros_like(
rollout[Postprocessing.ADVANTAGES])

traj[Postprocessing.ADVANTAGES] = traj[
rollout[Postprocessing.ADVANTAGES] = rollout[
Postprocessing.ADVANTAGES].copy().astype(np.float32)

assert all(val.shape[0] == trajsize for val in traj.values()), \
assert all(val.shape[0] == rollout_size for key, val in rollout.items()), \
"Rollout stacked incorrectly!"
return SampleBatch(traj)
return rollout
2 changes: 2 additions & 0 deletions rllib/evaluation/rollout_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ def wrap(env):
# Deepmind wrappers already handle all preprocessing
self.preprocessing_enabled = False

# If clip_rewards not explicitly set to False, switch it
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clip_rewards has been moved into base-Policy's postprocess_trajectory method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this break reward clipping for policies that don't call that method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, if users don't use the build_policy methods, then yes. The thought was: reward clipping is actually a post-processing step.
Undid these changes again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let's be careful about breaking changes like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

# on here (clip between -1.0 and 1.0).
if clip_rewards is None:
clip_rewards = True

Expand Down
17 changes: 11 additions & 6 deletions rllib/evaluation/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def get_data(self) -> SampleBatchType:
raise RuntimeError("Sampling thread has died")
rollout = self.queue.get(timeout=600.0)

# Propagate errors
# Propagate errors.
if isinstance(rollout, BaseException):
raise rollout

Expand Down Expand Up @@ -436,8 +436,8 @@ def _env_runner(worker: "RolloutWorker",
terminal condition, and other fields as dictated by `policy`.
"""

# Try to get Env's max_episode_steps prop. If it doesn't exist, catch
# error and continue.
# Try to get Env's `max_episode_steps` prop. If it doesn't exist, ignore
# error and continue with max_episode_steps=None.
max_episode_steps = None
try:
max_episode_steps = base_env.get_unwrapped()[0].spec.max_episode_steps
Expand Down Expand Up @@ -642,6 +642,7 @@ def _process_observations(
large_batch_threshold: int = max(1000, rollout_fragment_length * 10) if \
rollout_fragment_length != float("inf") else 5000

# For each environment.
# type: EnvID, Dict[AgentID, EnvObsType]
for env_id, agent_obs in unfiltered_obs.items():
is_new_episode: bool = env_id not in active_episodes
Expand Down Expand Up @@ -757,8 +758,10 @@ def _process_observations(
callbacks.on_episode_step(
worker=worker, base_env=base_env, episode=episode)

# Cut the batch if we're not packing multiple episodes into one,
# or if we've exceeded the requested batch size.
# Cut the batch if ...
# - all-agents-done and not packing multiple episodes into one
# (batch_mode="complete_episodes")
# - or if we've exceeded the rollout_fragment_length.
if episode.batch_builder.has_pending_agent_data():
# Sanity check, whether all agents have done=True, if done[__all__]
# is True.
Expand All @@ -775,6 +778,7 @@ def _process_observations(
elif all_agents_done:
episode.batch_builder.postprocess_batch_so_far(episode)

# Episode is done.
if all_agents_done:
# Handle episode termination.
batch_builder_pool.append(episode.batch_builder)
Expand Down Expand Up @@ -959,7 +963,8 @@ def _process_policy_eval_results(
available to Models. Default: False.

Returns:
actions_to_send: Nested dict of env id -> agent id -> agent replies.
actions_to_send: Nested dict of env id -> agent id -> actions to be
sent to Env (np.ndarrays).
"""

actions_to_send: Dict[EnvID, Dict[AgentID, EnvActionType]] = \
Expand Down
3 changes: 3 additions & 0 deletions rllib/examples/env/multi_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gym

from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.examples.env.stateless_cartpole import StatelessCartPole
from ray.rllib.tests.test_rollout_worker import MockEnv, MockEnv2


Expand Down Expand Up @@ -164,3 +165,5 @@ def step(self, action_dict):
MultiAgentCartPole = make_multiagent("CartPole-v0")
MultiAgentMountainCar = make_multiagent("MountainCarContinuous-v0")
MultiAgentPendulum = make_multiagent("Pendulum-v0")
MultiAgentStatelessCartPole = make_multiagent(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for LSTM multi-agent testing purposes.

lambda config: StatelessCartPole(config))
3 changes: 1 addition & 2 deletions rllib/execution/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

# Asserts that an object is a type of SampleBatch.
def _check_sample_batch_type(batch):
if not isinstance(batch, SampleBatch) and not isinstance(
batch, MultiAgentBatch):
if not isinstance(batch, (SampleBatch, MultiAgentBatch)):
raise ValueError("Expected either SampleBatch or MultiAgentBatch, "
"got {}: {}".format(type(batch), batch))

Expand Down
4 changes: 2 additions & 2 deletions rllib/models/action_dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class ActionDistribution:

@DeveloperAPI
def __init__(self, inputs: List[TensorType], model: ModelV2):
"""Initialize the action dist.
"""Initializes an ActionDist object.

Arguments:
Args:
inputs (Tensors): input vector to compute samples from.
model (ModelV2): reference to model producing the inputs. This
is mainly useful if you want to use model variables to compute
Expand Down
1 change: 0 additions & 1 deletion rllib/models/preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def check_shape(self, observation: Any) -> None:
observation = np.array(observation)
try:
if not self._obs_space.contains(observation):
print()
raise ValueError(
"Observation outside expected value range",
self._obs_space, observation)
Expand Down
2 changes: 1 addition & 1 deletion rllib/offline/input_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def tf_input_ops(self, queue_size: int = 1) -> Dict[str, TensorType]:
class _QueueRunner(threading.Thread):
"""Thread that feeds a TF queue from a InputReader."""

def __init__(self, input_reader: InputReader, queue: tf1.FIFOQueue,
def __init__(self, input_reader: InputReader, queue: "tf1.FIFOQueue",
keys: List[str], dtypes: "tf.dtypes.DType"):
threading.Thread.__init__(self)
self.sess = tf1.get_default_session()
Expand Down
11 changes: 9 additions & 2 deletions rllib/policy/rnn_sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ def pad_batch_to_sequences_of_same_size(batch,


@DeveloperAPI
def add_time_dimension(padded_inputs, seq_lens, framework="tf"):
def add_time_dimension(padded_inputs,
seq_lens,
framework="tf",
time_major=False):
"""Adds a time dimension to padded inputs.

Arguments:
Expand All @@ -127,6 +130,7 @@ def add_time_dimension(padded_inputs, seq_lens, framework="tf"):
# input batch must be padded to the max seq length given here. That is,
# batch_size == len(seq_lens) * max(seq_lens)
if framework == "tf":
assert time_major is False, "time-major not supported yet for tf!"
padded_batch_size = tf.shape(padded_inputs)[0]
max_seq_len = padded_batch_size // tf.shape(seq_lens)[0]

Expand All @@ -142,7 +146,10 @@ def add_time_dimension(padded_inputs, seq_lens, framework="tf"):

# Dynamically reshape the padded batch to introduce a time dimension.
new_batch_size = padded_batch_size // max_seq_len
new_shape = (new_batch_size, max_seq_len) + padded_inputs.shape[1:]
if time_major:
new_shape = (max_seq_len, new_batch_size) + padded_inputs.shape[1:]
else:
new_shape = (new_batch_size, max_seq_len) + padded_inputs.shape[1:]
return torch.reshape(padded_inputs, new_shape)


Expand Down
2 changes: 1 addition & 1 deletion rllib/policy/sample_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def split_by_episode(self) -> List["SampleBatch"]:

@PublicAPI
def slice(self, start: int, end: int) -> "SampleBatch":
"""Returns a slice of the row data of this batch.
"""Returns a slice of the row data of this batch (w/o copying).

Args:
start (int): Starting index.
Expand Down
2 changes: 2 additions & 0 deletions rllib/policy/tf_policy_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ def postprocess_trajectory(self,
sample_batch,
other_agent_batches=None,
episode=None):
# Call super's postprocess_trajectory first.
sample_batch = Policy.postprocess_trajectory(self, sample_batch)
if postprocess_fn:
return postprocess_fn(self, sample_batch, other_agent_batches,
episode)
Expand Down
32 changes: 27 additions & 5 deletions rllib/utils/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@ def aligned_array(size, dtype, align=64):
return output


def concat_aligned(items):
def concat_aligned(items, time_major=None):
"""Concatenate arrays, ensuring the output is 64-byte aligned.

We only align float arrays; other arrays are concatenated as normal.

This should be used instead of np.concatenate() to improve performance
when the output array is likely to be fed into TensorFlow.

Args:
items (List(np.ndarray)): The list of items to concatenate and align.
time_major (bool): Whether the data in items is time-major, in which
case, we will concatenate along axis=1.

Returns:
np.ndarray: The concat'd and aligned array.
"""

if len(items) == 0:
Expand All @@ -41,11 +49,25 @@ def concat_aligned(items):
and items[0].dtype in [np.float32, np.float64, np.uint8]):
dtype = items[0].dtype
flat = aligned_array(sum(s.size for s in items), dtype)
batch_dim = sum(s.shape[0] for s in items)
new_shape = (batch_dim, ) + items[0].shape[1:]
if time_major is not None:
if time_major is True:
batch_dim = sum(s.shape[1] for s in items)
new_shape = (
items[0].shape[0],
batch_dim,
) + items[0].shape[2:]
else:
batch_dim = sum(s.shape[0] for s in items)
new_shape = (
batch_dim,
items[0].shape[1],
) + items[0].shape[2:]
else:
batch_dim = sum(s.shape[0] for s in items)
new_shape = (batch_dim, ) + items[0].shape[1:]
output = flat.reshape(new_shape)
assert output.ctypes.data % 64 == 0, output.ctypes.data
np.concatenate(items, out=output)
np.concatenate(items, out=output, axis=1 if time_major else 0)
return output
else:
return np.concatenate(items)
return np.concatenate(items, axis=1 if time_major else 0)
Loading