Skip to content

Commit

Permalink
[tune] New persistence mode cleanup: Tune internals (ray-project#40175)
Browse files Browse the repository at this point in the history
This PR removes the old persistence codepath from all Tune internals except for Trainable, FunctionTrainable, and Tune session. Those are larger changes that are split into a follow-up.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
  • Loading branch information
justinvyu authored Oct 6, 2023
1 parent c5d2a42 commit 4b0aa52
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 2,032 deletions.
1,057 changes: 9 additions & 1,048 deletions python/ray/tune/analysis/experiment_analysis.py

Large diffs are not rendered by default.

185 changes: 23 additions & 162 deletions python/ray/tune/execution/experiment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

from ray.air._internal.remote_storage import list_at_uri
from ray.air._internal.uri_utils import _join_path_or_uri
from ray.train._internal.storage import _use_storage_context, StorageContext
from ray.train._internal.storage import StorageContext
from ray.tune.experiment import Trial
from ray.tune.impl.out_of_band_serialize_dataset import out_of_band_serialize_dataset
from ray.train._internal.syncer import SyncConfig, get_node_to_storage_syncer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,34 +124,11 @@ class _ExperimentCheckpointManager:
def __init__(
self,
*,
storage: Optional[StorageContext],
checkpoint_period: Union[int, float, str],
# TODO(justinvyu): Make `storage` required
storage: Optional[StorageContext] = None,
sync_every_n_trial_checkpoints: Optional[int] = None,
# TODO(justinvyu): Remove these args.
sync_config: SyncConfig = None,
local_checkpoint_dir: str = None,
remote_checkpoint_dir: str = None,
):
self._storage = storage
if _use_storage_context():
assert storage

self._legacy_local_checkpoint_dir = None
self._legacy_remote_checkpoint_dir = None
self._legacy_sync_config = None
self._legacy_syncer = None
else:
# Checkpoint directories
self._legacy_local_checkpoint_dir = local_checkpoint_dir
self._legacy_remote_checkpoint_dir = remote_checkpoint_dir

# Synch to/from cloud
self._legacy_sync_config = sync_config or SyncConfig()
# Resolves syncer="auto" to an actual syncer if needed
self._legacy_syncer = get_node_to_storage_syncer(
self._legacy_sync_config, self._legacy_remote_checkpoint_dir
)

# Last save + sync time
self._last_save_time = 0.0
Expand Down Expand Up @@ -216,8 +192,7 @@ def checkpoint(
force: bool = False,
wait: bool = False,
):
"""Saves execution state to `self._legacy_local_checkpoint_dir`.
"""Saves execution state to the local experiment directory.
Overwrites the current session checkpoint, which starts when self
is instantiated. Throttle depends on self._checkpoint_period.
Expand All @@ -231,11 +206,7 @@ def checkpoint(
wait: Wait until sync to cloud has finished.
"""
experiment_local_path = (
self._storage.experiment_local_path
if _use_storage_context()
else self._legacy_local_checkpoint_dir
)
experiment_local_path = self._storage.experiment_local_path
if not experiment_local_path:
return

Expand Down Expand Up @@ -269,33 +240,18 @@ def checkpoint(
return experiment_local_path

def sync_up(self, force: bool = False, wait: bool = False) -> bool:
# self._legacy_remote_checkpoint_dir can be empty in tests, but shouldn't
# be empty when using in end-to-end tune.
# Todo (krfricke): We may want to not store directories in this manager
# but instead always pass them from the trial runner.
syncer = self._storage.syncer if _use_storage_context() else self._legacy_syncer
syncer = self._storage.syncer

if not syncer: # or not self._legacy_remote_checkpoint_dir:
if not syncer:
return False

if _use_storage_context():
# Always exclude checkpoints in the new persistence path.
# TODO(justinvyu, krfricke): Ideally, this excludes all trial directories.
# But for now, this is needed to upload driver artifacts that live in the
# trial directory.
exclude = _DRIVER_SYNC_EXCLUDE_PATTERNS
experiment_local_path = self._storage.experiment_local_path
experiment_fs_path = self._storage.experiment_fs_path
else:
if bool(self._legacy_remote_checkpoint_dir):
# If an upload dir is given, trainable actors upload checkpoints
# themselves. Then the driver does not need to sync checkpoints.
exclude = _DRIVER_SYNC_EXCLUDE_PATTERNS
else:
# Otherwise, we sync the full trial dir.
exclude = None
experiment_local_path = self._legacy_local_checkpoint_dir
experiment_fs_path = self._legacy_remote_checkpoint_dir
# Always exclude checkpoints in the new persistence path.
# TODO(justinvyu, krfricke): Ideally, this excludes all trial directories.
# But for now, this is needed to upload driver artifacts that live in the
# trial directory.
exclude = _DRIVER_SYNC_EXCLUDE_PATTERNS
experiment_local_path = self._storage.experiment_local_path
experiment_fs_path = self._storage.experiment_fs_path

if force:
# Wait until previous sync command finished
Expand Down Expand Up @@ -406,91 +362,18 @@ def sync_down_experiment_state(self) -> None:
)
logger.debug(f"Copied {matches} from:\n{remote_path}\n-> {local_path}")

def sync_down(self, force: bool = False, wait: bool = False) -> bool:
assert not _use_storage_context()
if not self._legacy_syncer or not self._legacy_remote_checkpoint_dir:
return False

if bool(self._legacy_remote_checkpoint_dir):
# If an upload dir is given, trainable actors upload checkpoints
# themselves. Then the driver does not need to sync checkpoints.
exclude = ["*/checkpoint_*"]
else:
# Otherwise, we sync the full trial dir.
exclude = None
syncer = self._legacy_syncer
experiment_local_path = self._legacy_local_checkpoint_dir
experiment_fs_path = self._legacy_remote_checkpoint_dir

if force:
# Wait until previous sync command finished
try:
syncer.wait()
except TimeoutError as e:
logger.warning(
"The previous sync of the experiment directory from the cloud "
f"timed out with the error: {str(e)}\nSyncing will be retried. "
+ _EXPERIMENT_SYNC_TIMEOUT_MESSAGE
)
except Exception as e:
logger.warning(
"The previous sync of the experiment directory from the cloud "
f"failed with the error: {str(e)}\nSyncing will be retried. "
)

synced = syncer.sync_down(
remote_dir=experiment_fs_path,
local_dir=experiment_local_path,
exclude=exclude,
)
else:
synced = syncer.sync_down_if_needed(
remote_dir=experiment_fs_path,
local_dir=experiment_local_path,
exclude=exclude,
)

if wait:
try:
syncer.wait()
except Exception as e:
raise RuntimeError(
"Downloading the remote experiment directory from the cloud "
f"(remote path: {experiment_fs_path}) to the driver "
f"(local path: {experiment_local_path}) failed. "
"Please check the error message above. "
"If you expected an experiment to already exist, check if "
"you supplied the correct restoration path."
) from e

return synced

def _resume_auto(self) -> bool:
if _use_storage_context():
experiment_local_path = self._storage.experiment_local_path
experiment_fs_path = self._storage.experiment_fs_path
syncer = self._storage.syncer
else:
experiment_local_path = self._legacy_local_checkpoint_dir
experiment_fs_path = self._legacy_remote_checkpoint_dir
syncer = self._legacy_syncer
experiment_local_path = self._storage.experiment_local_path
experiment_fs_path = self._storage.experiment_fs_path
syncer = self._storage.syncer

if experiment_fs_path and syncer:
logger.info(
f"Trying to find and download experiment checkpoint at "
f"{experiment_fs_path}"
)
try:
if _use_storage_context():
self.sync_down_experiment_state()
else:
# Todo: This syncs the entire experiment including trial
# checkpoints. We should exclude these in the future.
syncer.sync_down_if_needed(
remote_dir=experiment_fs_path,
local_dir=experiment_local_path,
)
syncer.wait()
self.sync_down_experiment_state()
except Exception:
logger.exception(
"Got error when trying to sync down.\n"
Expand All @@ -504,15 +387,15 @@ def _resume_auto(self) -> bool:
"when trying to download the experiment checkpoint. "
"Please check the previous warning message for more "
"details. "
"Ray Tune will now start a new experiment."
"Starting a new run..."
)
return False
if not _experiment_checkpoint_exists(experiment_local_path):
logger.warning(
"A remote checkpoint was fetched, but no checkpoint "
"data was found. This can happen when e.g. the cloud "
"bucket exists but does not contain any data. "
"Ray Tune will start a new, fresh run."
"Starting a new run..."
)
return False
logger.info(
Expand All @@ -521,10 +404,7 @@ def _resume_auto(self) -> bool:
)
return True
elif not _experiment_checkpoint_exists(experiment_local_path):
logger.info(
"No local checkpoint was found. "
"Ray Tune will now start a new experiment."
)
logger.info("No local checkpoint was found. Starting a new run...")
return False
logger.info(
"A local experiment checkpoint was found and will be used "
Expand All @@ -551,17 +431,8 @@ def resume(self, resume_type: Union[str, bool]) -> Optional[_ResumeConfig]:

resume_type, resume_config = _resume_str_to_config(resume_type)

if _use_storage_context():
experiment_local_path = self._storage.experiment_local_path
experiment_fs_path = self._storage.experiment_fs_path
else:
# Not clear if we need this assertion, since we should always have a
# local checkpoint dir.
assert self._legacy_local_checkpoint_dir or (
self._legacy_remote_checkpoint_dir and self._legacy_syncer
)
experiment_local_path = self._legacy_local_checkpoint_dir
experiment_fs_path = self._legacy_remote_checkpoint_dir
experiment_local_path = self._storage.experiment_local_path
experiment_fs_path = self._storage.experiment_fs_path

if resume_type == "AUTO":
if self._resume_auto():
Expand Down Expand Up @@ -591,22 +462,12 @@ def resume(self, resume_type: Union[str, bool]) -> Optional[_ResumeConfig]:
f"Try downloading from remote directory? " f"({experiment_fs_path})"
):
return None
if not experiment_fs_path or not self._legacy_syncer:
raise ValueError(
"Called resume from remote without remote directory or "
"without valid syncer. "
"Fix this by passing a `SyncConfig` object with "
"`upload_dir` set to `Tuner(sync_config=...)`."
)

# Try syncing down the upload directory.
logger.info(
f"Downloading experiment checkpoint from " f"{experiment_fs_path}"
)
if _use_storage_context():
self.sync_down_experiment_state()
else:
self.sync_down(force=True, wait=True)
self.sync_down_experiment_state()

if not _experiment_checkpoint_exists(experiment_local_path):
raise ValueError(
Expand Down
Loading

0 comments on commit 4b0aa52

Please sign in to comment.