Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] Move storage handling to pyarrow.fs.FileSystem #23370

Merged
merged 38 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b4611cb
Add external storage class
Mar 21, 2022
6e23b34
[ml] Introduce Storage class for general cloud storage support
Mar 21, 2022
09e8923
Option to not override existing storages
Mar 21, 2022
26a1311
Nit docs
Mar 21, 2022
c23262e
Merge branch 'master' into ml/ext-storage
Mar 22, 2022
b2215d6
Enforce prefix://
Mar 22, 2022
4368b46
Rename to RemoteStorage
Mar 22, 2022
ebea68f
Add **kwargs parameters
Mar 22, 2022
ea7e5d5
Merge branch 'master' into ml/ext-storage
Apr 4, 2022
8b1dab5
Move to pyarrow.fs.FileSystem
Apr 5, 2022
ab3b3f6
Revert unrelated changes
Apr 5, 2022
05ca6e4
Add test for clear_bucket
Apr 5, 2022
281e05b
Add fs checkpoint pa test
Apr 5, 2022
9bddedd
More specific exceptions
Apr 5, 2022
036553c
Soft pyarrow dependency
Apr 5, 2022
a616d26
Add warnings in API and in Tune
Apr 5, 2022
8f56c8f
Better hints
Apr 5, 2022
d57eb1a
Add fs hints
Apr 5, 2022
a4ec8fb
Upgrade pyarrow requirements
Apr 5, 2022
efa809c
Fix pyarrow mock tests
Apr 6, 2022
1acbc64
Fix cloud tests
Apr 6, 2022
ef14309
Upgrade xgboost ray
Apr 6, 2022
17dbad0
Merge remote-tracking branch 'upstream/master' into ml/ext-storage
Apr 7, 2022
f667cc4
Rename functions
Apr 7, 2022
ef68c20
Fix durable tests
Apr 7, 2022
21d4162
Merge branch 'master' into ml/ext-storage
Apr 8, 2022
9021cf9
Lint
Apr 8, 2022
52c7d90
Fix datasets test for pyarrow >= 6
Apr 8, 2022
d0633c3
Min pyarrow 6.0.1
Apr 8, 2022
a745b19
pyarrow dependency in tune release tests
Apr 8, 2022
abcab96
Merge remote-tracking branch 'upstream/master' into ml/ext-storage
Apr 11, 2022
e5880d7
Use urllib.parse instead of regex
Apr 11, 2022
ac6895c
Merge remote-tracking branch 'upstream/master' into ml/ext-storage
Apr 12, 2022
e4417c4
Update docstring
Apr 12, 2022
db20309
Fix aws s3 test
Apr 12, 2022
6705485
Fix path parsing
Apr 13, 2022
1afce0b
Merge branch 'master' into ml/ext-storage
Apr 13, 2022
5db21df
Include gcsfs in app config for tune cloud tests
Apr 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/impl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

logger = logging.getLogger(__name__)

MIN_PYARROW_VERSION = (4, 0, 1)
MIN_PYARROW_VERSION = (6, 0, 1)
_VERSION_VALIDATED = False


Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ def test_convert_types(ray_start_regular_shared):

arrow_ds = ray.data.range_arrow(1)
assert arrow_ds.map(lambda x: "plain_{}".format(x["value"])).take() == ["plain_0"]
assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": (0,)}]
assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == [{"a": [0]}]


def test_from_items(ray_start_regular_shared):
Expand Down
32 changes: 18 additions & 14 deletions python/ray/ml/checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import io
import os
import shutil
import tarfile
import tempfile

import os
from typing import Optional, Union, Tuple

import ray
from ray import cloudpickle as pickle
from ray.util.annotations import DeveloperAPI
from ray.util.ml_utils.cloud import (
upload_to_bucket,
is_cloud_target,
download_from_bucket,
from ray.ml.utils.remote_storage import (
upload_to_uri,
is_non_local_path_uri,
download_from_uri,
fs_hint,
)

from ray.util.annotations import DeveloperAPI

_DICT_CHECKPOINT_FILE_NAME = "dict_checkpoint.pkl"
_FS_CHECKPOINT_KEY = "fs_checkpoint"
Expand Down Expand Up @@ -331,7 +330,7 @@ def to_directory(self, path: Optional[str] = None) -> str:
shutil.copytree(local_path, path)
elif external_path:
# If this exists on external storage (e.g. cloud), download
download_from_bucket(bucket=external_path, local_path=path)
download_from_uri(uri=external_path, local_path=path)
else:
raise RuntimeError(
f"No valid location found for checkpoint {self}: {self._uri}"
Expand All @@ -358,7 +357,7 @@ def from_uri(cls, uri: str) -> "Checkpoint":
def to_uri(self, uri: str) -> str:
"""Write checkpoint data to location URI (e.g. cloud storage).

ARgs:
Args:
uri (str): Target location URI to write data to.

Returns:
Expand All @@ -368,7 +367,12 @@ def to_uri(self, uri: str) -> str:
local_path = uri[7:]
return self.to_directory(local_path)

assert is_cloud_target(uri)
if not is_non_local_path_uri(uri):
raise RuntimeError(
f"Cannot upload checkpoint to URI: Provided URI "
f"does not belong to a registered storage provider: `{uri}`. "
f"Hint: {fs_hint(uri)}"
)

cleanup = False

Expand All @@ -377,7 +381,7 @@ def to_uri(self, uri: str) -> str:
cleanup = True
local_path = self.to_directory()

upload_to_bucket(bucket=uri, local_path=local_path)
upload_to_uri(local_path=local_path, uri=uri)

if cleanup:
shutil.rmtree(local_path)
Expand Down Expand Up @@ -429,7 +433,7 @@ def __setstate__(self, state):

def _get_local_path(path: Optional[str]) -> Optional[str]:
"""Check if path is a local path. Otherwise return None."""
if path is None or is_cloud_target(path):
if path is None or is_non_local_path_uri(path):
return None
if path.startswith("file://"):
path = path[7:]
Expand All @@ -440,7 +444,7 @@ def _get_local_path(path: Optional[str]) -> Optional[str]:

def _get_external_path(path: Optional[str]) -> Optional[str]:
"""Check if path is an external path. Otherwise return None."""
if not isinstance(path, str) or not is_cloud_target(path):
if not isinstance(path, str) or not is_non_local_path_uri(path):
return None
return path

Expand Down
102 changes: 78 additions & 24 deletions python/ray/ml/tests/test_checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,40 @@
import tempfile
import unittest
from typing import Any
from unittest.mock import patch

import ray
from ray.ml.checkpoint import Checkpoint
from ray.ml.tests.utils import mock_s3_sync
from ray.ml.utils.remote_storage import delete_at_uri, _ensure_directory


class CheckpointsConversionTest(unittest.TestCase):
def setUp(self):
self.tmpdir = os.path.realpath(tempfile.mkdtemp())
self.tmpdir_pa = os.path.realpath(tempfile.mkdtemp())

self.checkpoint_dict_data = {"metric": 5, "step": 4}
self.checkpoint_dir_data = {"metric": 2, "step": 6}

self.cloud_uri = "s3://invalid"
self.local_mock_cloud_path = os.path.realpath(tempfile.mkdtemp())
self.mock_s3 = mock_s3_sync(self.local_mock_cloud_path)
# We test two different in-memory filesystems as "cloud" providers,
# one for fsspec and one for pyarrow.fs

# fsspec URI
self.cloud_uri = "memory:///cloud/bucket"
# pyarrow URI
self.cloud_uri_pa = "mock://cloud/bucket/"

self.checkpoint_dir = os.path.join(self.tmpdir, "existing_checkpoint")
os.mkdir(self.checkpoint_dir, 0o755)
with open(os.path.join(self.checkpoint_dir, "test_data.pkl"), "wb") as fp:
pickle.dump(self.checkpoint_dir_data, fp)

self.old_dir = os.getcwd()
os.chdir(self.tmpdir)

def tearDown(self):
os.chdir(self.old_dir)
shutil.rmtree(self.tmpdir)
shutil.rmtree(self.local_mock_cloud_path)
shutil.rmtree(self.tmpdir_pa)

def _prepare_dict_checkpoint(self) -> Checkpoint:
# Create checkpoint from dict
Expand Down Expand Up @@ -111,17 +117,35 @@ def test_dict_checkpoint_uri(self):
"""Test conversion from dict to cloud checkpoint and back."""
checkpoint = self._prepare_dict_checkpoint()

with patch("subprocess.check_call", self.mock_s3):
# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri)
self.assertIsInstance(location, str)
self.assertIn("s3://", location)
# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri)
self.assertIsInstance(location, str)
self.assertIn("memory://", location)

# Create from dict
checkpoint = Checkpoint.from_uri(location)
self.assertTrue(checkpoint._uri)

self._assert_dict_checkpoint(checkpoint)

def test_dict_checkpoint_uri_pa(self):
"""Test conversion from dict to cloud checkpoint and back."""
checkpoint = self._prepare_dict_checkpoint()

# Clean up mock bucket
delete_at_uri(self.cloud_uri_pa)
_ensure_directory(self.cloud_uri_pa)

# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri_pa)
self.assertIsInstance(location, str)
self.assertIn("mock://", location)

# Create from dict
checkpoint = Checkpoint.from_uri(location)
self.assertTrue(checkpoint._uri)
# Create from dict
checkpoint = Checkpoint.from_uri(location)
self.assertTrue(checkpoint._uri)

self._assert_dict_checkpoint(checkpoint)
self._assert_dict_checkpoint(checkpoint)

def _prepare_fs_checkpoint(self) -> Checkpoint:
# Create checkpoint from fs
Expand Down Expand Up @@ -204,17 +228,47 @@ def test_fs_checkpoint_uri(self):
"""Test conversion from fs to cloud checkpoint and back."""
checkpoint = self._prepare_fs_checkpoint()

with patch("subprocess.check_call", self.mock_s3):
# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri)
self.assertIsInstance(location, str)
self.assertIn("s3://", location)
# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri)
self.assertIsInstance(location, str)
self.assertIn("memory://", location)

# Create from dict
checkpoint = Checkpoint.from_uri(location)
self.assertTrue(checkpoint._uri)

self._assert_fs_checkpoint(checkpoint)

def test_fs_checkpoint_uri_pa(self):
"""Test conversion from fs to cloud checkpoint and back."""
checkpoint = self._prepare_fs_checkpoint()

# Clean up mock bucket
delete_at_uri(self.cloud_uri_pa)
_ensure_directory(self.cloud_uri_pa)

# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri_pa)
self.assertIsInstance(location, str)
self.assertIn("mock://", location)

# Create from dict
checkpoint = Checkpoint.from_uri(location)
self.assertTrue(checkpoint._uri)

self._assert_fs_checkpoint(checkpoint)

def test_fs_delete_at_uri(self):
"""Test that clear bucket utility works"""
checkpoint = self._prepare_fs_checkpoint()

# Create from dict
checkpoint = Checkpoint.from_uri(location)
self.assertTrue(checkpoint._uri)
# Convert into dict checkpoint
location = checkpoint.to_uri(self.cloud_uri)
delete_at_uri(location)

self._assert_fs_checkpoint(checkpoint)
checkpoint = Checkpoint.from_uri(location)
with self.assertRaises(FileNotFoundError):
checkpoint.to_directory()


class CheckpointsSerdeTest(unittest.TestCase):
Expand Down
30 changes: 0 additions & 30 deletions python/ray/ml/tests/utils.py

This file was deleted.

Loading