Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .buildkite/pipeline.build_py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=tests_dir,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
rllib/...


Expand Down
5 changes: 0 additions & 5 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
--build_tests_only
--test_tag_filters=fake_gpus,-torch_only,-tf2_only,-no_tf_static_graph,-multi_gpu
--test_arg=--framework=tf
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
rllib/...

# TODO: (sven) tf2 (eager) multi-GPU
Expand Down Expand Up @@ -223,7 +222,6 @@
--build_tests_only
--test_tag_filters=-learning_tests,-memory_leak_tests,-examples,-tests_dir,-documentation,-multi_gpu,-no_cpu,-torch_2.x_only_benchmark
--test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
rllib/...

- label: ":brain: RLlib: RLModule tests"
Expand All @@ -250,7 +248,6 @@
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=examples,-multi_gpu,-gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
rllib/...

- label: ":brain: RLlib: tests/ dir"
Expand All @@ -263,7 +260,6 @@
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=tests_dir,-multi_gpu --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
rllib/...

- label: ":brain: RLlib: Documentation code/examples"
Expand All @@ -275,7 +271,6 @@
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=documentation --test_env=RAY_USE_MULTIPROCESSING_CPU_COUNT=1
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
rllib/...

- label: ":octopus: Tune tests and examples (small)"
Expand Down
2 changes: 1 addition & 1 deletion doc/source/rllib/doc_code/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
print(pretty_print(result))

if i % 5 == 0:
checkpoint_dir = algo.save()
checkpoint_dir = algo.save().checkpoint.path
print(f"Checkpoint saved in directory {checkpoint_dir}")
# __rllib-first-config-end__

Expand Down
15 changes: 10 additions & 5 deletions python/ray/tune/trainable/trainable.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,11 +925,18 @@ def restore(

"""
if _use_storage_context():
checkpoint_result: _TrainingResult = checkpoint_path
if isinstance(checkpoint_path, str):
checkpoint_path = Checkpoint.from_directory(checkpoint_path)
if isinstance(checkpoint_path, Checkpoint):
checkpoint_result = _TrainingResult(
checkpoint=checkpoint_path, metrics={}
)
else:
checkpoint_result: _TrainingResult = checkpoint_path
assert isinstance(checkpoint_result, _TrainingResult), type(
checkpoint_result
)

checkpoint = checkpoint_result.checkpoint
checkpoint_metrics = checkpoint_result.metrics
self._iteration = checkpoint_metrics.get(TRAINING_ITERATION, 0)
self._time_total = checkpoint_metrics.get(TIME_TOTAL_S, 0)
Expand All @@ -941,7 +948,6 @@ def restore(
self._timesteps_since_restore = 0
self._episodes_total = checkpoint_metrics.get(EPISODES_TOTAL)

checkpoint = checkpoint_result.checkpoint
if not _exists_at_fs_path(checkpoint.filesystem, checkpoint.path):
raise ValueError(
f"Could not recover from checkpoint as it does not exist on "
Expand Down Expand Up @@ -973,8 +979,7 @@ def restore(
self._restored = True

logger.info(
f"Restored on {self._local_ip} from checkpoint: "
f"{checkpoint_result.checkpoint}"
f"Restored on {self._local_ip} from checkpoint: " f"{checkpoint}"
)
return True

Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/tests/test_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def new_mapping_fn(agent_id, episode, worker, **kwargs):
self.assertTrue(f"p{j}" in pol_map)
self.assertTrue(len(pol_map) == i + 1)
algo.train()
checkpoint = algo.save()
checkpoint = algo.save().checkpoint

# Test restoring from the checkpoint (which has more policies
# than what's defined in the config dict).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ def save_test(alg_name, framework="tf", multi_agent=False):
ray._private.utils.get_user_temp_dir(), "export_dir_%s" % alg_name
)

algo.train()
print("Exporting algo checkpoint", alg_name, export_dir)
export_dir = algo.save(export_dir)
export_dir = algo.save(export_dir).checkpoint.path
model_dir = os.path.join(
export_dir,
"policies",
Expand Down
12 changes: 11 additions & 1 deletion rllib/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.spaces.space_utils import flatten_to_single_ndarray
from ray.rllib.common import CLIArguments as cli
from ray.train._checkpoint import Checkpoint
from ray.train._internal.session import _TrainingResult
from ray.tune.utils import merge_dicts
from ray.tune.registry import get_trainable_cls, _global_registry, ENV_CREATOR

Expand Down Expand Up @@ -253,7 +255,15 @@ def run(

# Load state from checkpoint, if provided.
if checkpoint:
algorithm.restore(checkpoint)
if os.path.isdir(checkpoint):
checkpoint_dir = checkpoint
else:
checkpoint_dir = str(Path(checkpoint).parent)
print(f"Restoring algorithm from {checkpoint_dir}")
restore_result = _TrainingResult(
checkpoint=Checkpoint.from_directory(checkpoint_dir), metrics={}
)
algorithm.restore(restore_result)

# Do the actual rollout.
with RolloutSaver(
Expand Down
1 change: 0 additions & 1 deletion rllib/examples/connectors/adapt_connector_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def run(checkpoint_path, policy_id):
create_appo_cartpole_checkpoint(tmpdir)
policy_checkpoint_path = os.path.join(
tmpdir,
"checkpoint_000000",
"policies",
policy_id,
)
Expand Down
1 change: 0 additions & 1 deletion rllib/examples/connectors/run_connector_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def run(checkpoint_path, policy_id):
create_appo_cartpole_checkpoint(tmpdir)
policy_checkpoint_path = os.path.join(
tmpdir,
"checkpoint_000000",
"policies",
policy_id,
)
Expand Down
8 changes: 4 additions & 4 deletions rllib/examples/custom_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import argparse

import ray
from ray import tune
from ray import train, tune
import ray.rllib.algorithms.ppo as ppo

parser = argparse.ArgumentParser()
Expand All @@ -19,8 +19,8 @@ def experiment(config):
for i in range(iterations):
train_results = algo.train()
if i % 2 == 0 or i == iterations - 1:
checkpoint = algo.save(tune.get_trial_dir())
tune.report(**train_results)
checkpoint = algo.save(train.get_context().get_trial_dir())
train.report(train_results)
algo.stop()

# Manual Eval
Expand All @@ -38,7 +38,7 @@ def experiment(config):
eval_results["eval_reward"] += reward
eval_results["eval_eps_length"] += 1
results = {**train_results, **eval_results}
tune.report(results)
train.report(results)


if __name__ == "__main__":
Expand Down
8 changes: 4 additions & 4 deletions rllib/examples/custom_train_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os

import ray
from ray import tune
from ray import train, tune
from ray.rllib.algorithms.ppo import PPO, PPOConfig

parser = argparse.ArgumentParser()
Expand All @@ -21,7 +21,7 @@
)


def my_train_fn(config, reporter):
def my_train_fn(config):
iterations = config.pop("train-iterations", 10)

config = PPOConfig().update_from_dict(config).environment("CartPole-v1")
Expand All @@ -32,7 +32,7 @@ def my_train_fn(config, reporter):
for _ in range(iterations):
result = agent1.train()
result["phase"] = 1
reporter(**result)
train.report(result)
phase1_time = result["timesteps_total"]
state = agent1.save()
agent1.stop()
Expand All @@ -45,7 +45,7 @@ def my_train_fn(config, reporter):
result = agent2.train()
result["phase"] = 2
result["timesteps_total"] += phase1_time # keep time moving forward
reporter(**result)
train.report(result)
agent2.stop()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
# .. train one iteration ..
my_ppo.train()
# .. and call `save()` to create a checkpoint.
path_to_checkpoint = my_ppo.save()
save_result = my_ppo.save()
path_to_checkpoint = save_result.checkpoint.path
print(
"An Algorithm checkpoint has been created inside directory: "
f"'{path_to_checkpoint}'."
Expand Down Expand Up @@ -43,7 +44,7 @@
my_new_ppo = my_ppo_config.build()

# Restore the old (checkpointed) state.
my_new_ppo.restore(path_to_checkpoint)
my_new_ppo.restore(save_result)

# Continue training.
my_new_ppo.train()
Expand Down Expand Up @@ -85,8 +86,9 @@
)

my_ma_algo = my_ma_config.build()
my_ma_algo.train()

ma_checkpoint_dir = my_ma_algo.save()
ma_checkpoint_dir = my_ma_algo.save().checkpoint.path

print(
"An Algorithm checkpoint has been created inside directory: "
Expand Down Expand Up @@ -185,7 +187,7 @@
# .. train one iteration ..
algo_w_5_policies.train()
# .. and call `save()` to create a checkpoint.
path_to_checkpoint = algo_w_5_policies.save()
path_to_checkpoint = algo_w_5_policies.save().checkpoint.path
print(
"An Algorithm checkpoint has been created inside directory: "
f"'{path_to_checkpoint}'. It should contain 5 policies in the 'policies/' sub dir."
Expand Down Expand Up @@ -284,7 +286,7 @@ def new_policy_mapping_fn(agent_id, episode, worker, **kwargs):
# 3) .. via the Algorithm (Policy) checkpoint:

# __export-models-3-begin__
checkpoint_dir = ppo.save()
checkpoint_dir = ppo.save().checkpoint.path
# .. check `checkpoint_dir` for the Algorithm checkpoint files.
# For keras you should be able to recover the model via:
# keras_model = tf.saved_model.load(checkpoint_dir + "/policies/default_policy/model/")
Expand Down
8 changes: 4 additions & 4 deletions rllib/examples/inference_and_serving/serve_and_rllib.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def train_rllib_policy(config: AlgorithmConfig):
# Train for n iterations, then save, stop, and return the checkpoint path.
for _ in range(args.train_iters):
print(algo.train())
checkpoint_path = algo.save()
checkpoint_result = algo.save()
algo.stop()
return checkpoint_path
return checkpoint_result


if __name__ == "__main__":
Expand All @@ -81,14 +81,14 @@ def train_rllib_policy(config: AlgorithmConfig):
config = DQNConfig().environment("ALE/MsPacman-v5").framework(args.framework)

# Train the Algorithm for some time, then save it and get the checkpoint path.
checkpoint_path = train_rllib_policy(config)
checkpoint_result = train_rllib_policy(config)

ray.init(num_cpus=8)

# Start Ray serve (create the RLlib Policy service defined by
# our `ServeRLlibPolicy` class above).
client = serve.start()
client.create_backend("backend", ServeRLlibPolicy, config, checkpoint_path)
client.create_backend("backend", ServeRLlibPolicy, config, checkpoint_result)
client.create_endpoint(
"endpoint", backend="backend", route="/mspacman-rllib-policy"
)
Expand Down
4 changes: 2 additions & 2 deletions rllib/examples/serving/cartpole_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ def _input(ioctx):
for _ in range(args.stop_iters):
results = algo.train()
print(pretty_print(results))
checkpoint = algo.save()
checkpoint = algo.save().checkpoint
print("Last checkpoint", checkpoint)
with open(checkpoint_path, "w") as f:
f.write(checkpoint)
f.write(checkpoint.path)
if (
results["episode_reward_mean"] >= args.stop_reward
or ts >= args.stop_timesteps
Expand Down
4 changes: 2 additions & 2 deletions rllib/examples/serving/unity3d_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ def _input(ioctx):
print(algo.train())
if count % args.checkpoint_freq == 0:
print("Saving learning progress to checkpoint file.")
checkpoint = algo.save()
checkpoint = algo.save().checkpoint
# Write the latest checkpoint location to CHECKPOINT_FILE,
# so we can pick up from the latest one after a server re-start.
with open(checkpoint_path, "w") as f:
f.write(checkpoint)
f.write(checkpoint.path)
count += 1
5 changes: 3 additions & 2 deletions rllib/policy/tests/test_policy_checkpoint_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ def test_add_policy_connector_enabled(self):
APPOConfig().environment("CartPole-v1").rollouts(enable_connectors=True)
)
algo = config.build()
algo.save(checkpoint_dir=tmpdir)
algo.train()
result = algo.save(checkpoint_dir=tmpdir)

path_to_checkpoint = os.path.join(
tmpdir, "checkpoint_000000", "policies", "default_policy"
result.checkpoint.path, "policies", "default_policy"
)

policy = Policy.from_checkpoint(path_to_checkpoint)
Expand Down
4 changes: 2 additions & 2 deletions rllib/tests/test_nested_observation_spaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,12 @@ def test_rollout_dict_space(self):
config = PGConfig().environment("nested").framework("tf")
algo = config.build()
algo.train()
path = algo.save()
result = algo.save()
algo.stop()

# Test train works on restore
algo2 = config.build()
algo2.restore(path)
algo2.restore(result)
algo2.train()

# Test rollout works on restore
Expand Down
8 changes: 7 additions & 1 deletion rllib/tests/test_ray_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ def test_custom_experiment(self):

from ray.rllib.examples.custom_experiment import experiment

# Ray client does not seem to propagate the `fn._resources` property
# correctly for imported functions. As a workaround, we can wrap the
# imported function which forces a full transfer.
def wrapped_experiment(config):
experiment(config)

tune.Tuner(
tune.with_resources(
experiment, ppo.PPO.default_resource_request(config)
wrapped_experiment, ppo.PPO.default_resource_request(config)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I have no idea why this test started failing in this PR.

),
param_space=config,
).fit()
Expand Down
Loading