Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] Move storage handling to pyarrow.fs.FileSystem #23370

Merged
merged 38 commits into from
Apr 13, 2022

Conversation

krfricke
Copy link
Contributor

Why are these changes needed?

Currently we have hardcoded support for three external/cloud storage providers: S3, GS, and HDFS. In the future it is likely that we want to support more providers.
This PR introduces a Storage class that provides support for uploading, downloading, and deleting from storage providers. As the first application, this allows us to register a storage provider for test:// URIs that can be used in unit testing.
The naming has been chosen to be Storage to avoid confusion with the existing ExternalStorage package for Ray core, and to be general enough for any kind of storage (contrasting with e.g. CloudStorage).

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

S3_PREFIX = "s3://"
GS_PREFIX = "gs://"
HDFS_PREFIX = "hdfs://"
ALLOWED_REMOTE_PREFIXES = (S3_PREFIX, GS_PREFIX, HDFS_PREFIX)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are kept here because Tune still depends on them (the original file has been moved from ray.util.ml_utils.cloud)

register_storage(GS_PREFIX, GSStorage(), override=False)
register_storage(HDFS_PREFIX, HDFSStorage(), override=False)


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following methods are kept from ray.util.ml_utils.cloud.py

python/ray/ml/storage.py Outdated Show resolved Hide resolved


class S3Storage(Storage):
def upload(self, local_source: str, remote_target: str) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sync template has more args like options and exclude_template. Is this something that we want to support with Storage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about this, and the problem is that options and excludes are specific to the providers and won't work with others (e.g. HDFS does not support excludes at all). I think for those cases we'll likely drop the exclude_template but extend the S3Storage with tune-specific features. Alternatively, we can support kwargs to enable some of these things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I am also not sure if those features are necessary. If we can keep things simple, we should.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a **kwargs parameter to the base API. This will enable us to add arguments in the future.

For Ray Tune, I think we'll have something like

upload(source, target, sync=True, exclude=["*/checkpoint_*"]) or so.

Different storages can then choose to ignore these parameters (e.g. sync likely only applies to S3, and exclude only to a select few storages).

With kwargs we're flexible enough to make these changes in the future.

python/ray/ml/utils/test_utils.py Outdated Show resolved Hide resolved
python/ray/ml/storage.py Outdated Show resolved Hide resolved
python/ray/ml/storage.py Outdated Show resolved Hide resolved
python/ray/ml/storage.py Outdated Show resolved Hide resolved
python/ray/ml/storage.py Outdated Show resolved Hide resolved
@xwjiang2010
Copy link
Contributor

A more general question - where does this fit in the context of Ray External storage proposal? https://docs.google.com/document/d/128a6gSZVdgHnjNIQPomN2TuE-RPwo7j7pd0slECZ318/edit#heading=h.mi7e05f1j7f1

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 21, 2022
@krfricke
Copy link
Contributor Author

Ray external storage provides a Ray-based API to store and load data. In that sense it acts similar to S3/GS or other providers for our current means (even though it internally uses these providers for durable storage). We'll likely support the storage API in the future in Ray Tune (and AIR), and we may even make this the preferred way of configuring cloud storage.

That said, in the current world we're still dealing with cloud storage directly, and we thus need a good and general way to do this. After all, this is just a translator between local storage and remote storage - if this can be solved in the future by a higher-level API, that would be preferable.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krfricke , it makes sense, but why not just use pyarrow.fs.FileSystem as the interface then? It meets all the requirements for the PR, and supports URIs simply via pyarrow.fs.FileSystem.from_uri().

@krfricke krfricke requested review from ericl and matthewdeng April 8, 2022 23:59
@krfricke krfricke requested a review from gjoliver April 8, 2022 23:59
if not protocol_match:
return None, None

return protocol_match.group(1), protocol_match.group(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You might want to look at using urllib.parse.urlparse() for URL parsing and grabbing the protocol, e.g. as done here. Should at least allow us to get rid of the regex!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's great - updated, thanks!

python/ray/ml/utils/remote_storage.py Outdated Show resolved Hide resolved
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments, looks very good.

if protocol in known_implementations:
return known_implementations[protocol]["err"]

return "Make sure to install and register your fsspec-compatible filesystem."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, can you instead do:

if protocol not in known_implementations:
    return "Make sure to install and register your fsspec-compatible filesystem."

return known_implementations[protocol]["err"]

this way, all the early returns are for error conditions, and the final return is success.
also, what do these "err" message look like? if it's a known implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case actually all returns are error conditions - the file system URI (e.g. foo://bar) could not be associated to a installed filesystem, so we raise an error how to fix this.

We thus follow a progression here - pyarrow is the base and required for everything, so that is checked first. If pyarrow is installed but the URI is not found (pyarrow only supports local, s3, hdfs), the filesystem can be provided with fsspec. Thus the user should install fsspec next if it's not already there.

fsspec provides a dict from fsspec.registry import known_implementations which contains all fsspec-compatible filesystems that are "known" to the fsspec maintainers. Here is the current definition: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/registry.py#L87

For instance, for dropbox://something, if it's not available, the user would get the message

            'DropboxFileSystem requires "dropboxdrivefs",'
            '"requests" and "dropbox" to be installed'

Only as a last resort we would end with the generic "Make sure to install and register your fsspec-compatible filesystem.", which should usually only come up when a user wants to use a custom URI and filesystem implementation.

IMO this order makes sense and we should keep it that way


This utility is needed e.g. for the mock filesystem to work.
"""
# This function is a private API used in testing. If this is required
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to doc string of the function?
any smart trick to make sure a python function can only be used by _test.py files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be fine leaving it as a _private method to limit the scope. I've updated the docstring, ptal


fs, bucket_path = get_fs_and_path(uri)
if not fs:
raise ValueError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually looking at how get_fs_and_path is used everywhere.
it may be better to just raise in the util func itself, and say if get_fs_and_path returns, fs must be a valid arrow fs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep it as is because we can raise more actionable error messages that way - i.e. include the operation ("Could not clear URI contents", "Could not upload to URI...") etc. Let me know if you feel strongly about this

yield
os.environ["AWS_SESSION_TOKEN"] = credentials["session_token"]

yield credentials
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clarkzinzow I ran into problems on the CI machines with pyarrow 6.0.1 (but not with 4.0.1) and manual debugging suggested that there were credential issues. Calling fs.create_dir("tmp/test") multiple times would result in 403 errors from moto_server. Passing the credentials as kwargs to pa.fs.S3FileSystem solved the problem for me.

I'm not sure how this comes about, and I couldn't reproduce it locally, but since this is a testing setup I think this adjustment should be fine.

I found the same error in your open PRs updating to PyArrow 6.0.1 so this might be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between the two versions? Do we want to update the version in this PR?

Copy link
Contributor

@clarkzinzow clarkzinzow Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krfricke Thanks for debugging that! That change sounds good to unblock, very strange that it's no longer respecting the default AWS credentials config, which should pull from those environment variables. I've looked at the version diff for Arrow's S3FileSystem, both the thin Cython layer and the C++ core bit, nothing obvious pops out. I'll look into debugging this if it pops up as an issue again!

@xwjiang2010 I think the Arrow version update makes sense, we have ~3 PRs in-flight that require Arrow 6.0.1+.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only not working on Buildkite instances fwiw, I've debugged in a repro instance. Happy to hop on a call to show you how to debug this efficiently

@krfricke krfricke added the do-not-merge Do not merge this PR! label Apr 12, 2022
@krfricke
Copy link
Contributor Author

Note that release tests are currently failing, I'm investigating

@krfricke
Copy link
Contributor Author

krfricke commented Apr 13, 2022

@krfricke krfricke added tests-ok The tagger certifies test failures are unrelated and assumes personal liability. and removed do-not-merge Do not merge this PR! labels Apr 13, 2022
@ericl ericl merged commit e3bd598 into ray-project:master Apr 13, 2022
@krfricke krfricke deleted the ml/ext-storage branch April 14, 2022 05:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants