-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[air] Move storage handling to pyarrow.fs.FileSystem #23370
Conversation
python/ray/ml/storage.py
Outdated
S3_PREFIX = "s3://" | ||
GS_PREFIX = "gs://" | ||
HDFS_PREFIX = "hdfs://" | ||
ALLOWED_REMOTE_PREFIXES = (S3_PREFIX, GS_PREFIX, HDFS_PREFIX) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are kept here because Tune still depends on them (the original file has been moved from ray.util.ml_utils.cloud
)
python/ray/ml/storage.py
Outdated
register_storage(GS_PREFIX, GSStorage(), override=False) | ||
register_storage(HDFS_PREFIX, HDFSStorage(), override=False) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following methods are kept from ray.util.ml_utils.cloud.py
python/ray/ml/storage.py
Outdated
|
||
|
||
class S3Storage(Storage): | ||
def upload(self, local_source: str, remote_target: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the sync template has more args like options
and exclude_template
. Is this something that we want to support with Storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've thought about this, and the problem is that options and excludes are specific to the providers and won't work with others (e.g. HDFS does not support excludes at all). I think for those cases we'll likely drop the exclude_template but extend the S3Storage with tune-specific features. Alternatively, we can support kwargs to enable some of these things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I am also not sure if those features are necessary. If we can keep things simple, we should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a **kwargs
parameter to the base API. This will enable us to add arguments in the future.
For Ray Tune, I think we'll have something like
upload(source, target, sync=True, exclude=["*/checkpoint_*"])
or so.
Different storages can then choose to ignore these parameters (e.g. sync
likely only applies to S3, and exclude
only to a select few storages).
With kwargs we're flexible enough to make these changes in the future.
A more general question - where does this fit in the context of Ray External storage proposal? https://docs.google.com/document/d/128a6gSZVdgHnjNIQPomN2TuE-RPwo7j7pd0slECZ318/edit#heading=h.mi7e05f1j7f1 |
Ray external storage provides a Ray-based API to store and load data. In that sense it acts similar to S3/GS or other providers for our current means (even though it internally uses these providers for durable storage). We'll likely support the storage API in the future in Ray Tune (and AIR), and we may even make this the preferred way of configuring cloud storage. That said, in the current world we're still dealing with cloud storage directly, and we thus need a good and general way to do this. After all, this is just a translator between local storage and remote storage - if this can be solved in the future by a higher-level API, that would be preferable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@krfricke , it makes sense, but why not just use pyarrow.fs.FileSystem
as the interface then? It meets all the requirements for the PR, and supports URIs simply via pyarrow.fs.FileSystem.from_uri()
.
if not protocol_match: | ||
return None, None | ||
|
||
return protocol_match.group(1), protocol_match.group(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: You might want to look at using urllib.parse.urlparse()
for URL parsing and grabbing the protocol, e.g. as done here. Should at least allow us to get rid of the regex!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's great - updated, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments, looks very good.
if protocol in known_implementations: | ||
return known_implementations[protocol]["err"] | ||
|
||
return "Make sure to install and register your fsspec-compatible filesystem." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, can you instead do:
if protocol not in known_implementations:
return "Make sure to install and register your fsspec-compatible filesystem."
return known_implementations[protocol]["err"]
this way, all the early returns are for error conditions, and the final return is success.
also, what do these "err" message look like? if it's a known implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case actually all returns are error conditions - the file system URI (e.g. foo://bar
) could not be associated to a installed filesystem, so we raise an error how to fix this.
We thus follow a progression here - pyarrow is the base and required for everything, so that is checked first. If pyarrow is installed but the URI is not found (pyarrow only supports local, s3, hdfs), the filesystem can be provided with fsspec. Thus the user should install fsspec next if it's not already there.
fsspec provides a dict from fsspec.registry import known_implementations
which contains all fsspec-compatible filesystems that are "known" to the fsspec maintainers. Here is the current definition: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/registry.py#L87
For instance, for dropbox://something
, if it's not available, the user would get the message
'DropboxFileSystem requires "dropboxdrivefs",'
'"requests" and "dropbox" to be installed'
Only as a last resort we would end with the generic "Make sure to install and register your fsspec-compatible filesystem.", which should usually only come up when a user wants to use a custom URI and filesystem implementation.
IMO this order makes sense and we should keep it that way
|
||
This utility is needed e.g. for the mock filesystem to work. | ||
""" | ||
# This function is a private API used in testing. If this is required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to doc string of the function?
any smart trick to make sure a python function can only be used by _test.py files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be fine leaving it as a _private
method to limit the scope. I've updated the docstring, ptal
|
||
fs, bucket_path = get_fs_and_path(uri) | ||
if not fs: | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually looking at how get_fs_and_path is used everywhere.
it may be better to just raise in the util func itself, and say if get_fs_and_path returns, fs must be a valid arrow fs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to keep it as is because we can raise more actionable error messages that way - i.e. include the operation ("Could not clear URI contents", "Could not upload to URI...") etc. Let me know if you feel strongly about this
yield | ||
os.environ["AWS_SESSION_TOKEN"] = credentials["session_token"] | ||
|
||
yield credentials |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkzinzow I ran into problems on the CI machines with pyarrow 6.0.1 (but not with 4.0.1) and manual debugging suggested that there were credential issues. Calling fs.create_dir("tmp/test")
multiple times would result in 403 errors from moto_server
. Passing the credentials as kwargs to pa.fs.S3FileSystem
solved the problem for me.
I'm not sure how this comes about, and I couldn't reproduce it locally, but since this is a testing setup I think this adjustment should be fine.
I found the same error in your open PRs updating to PyArrow 6.0.1 so this might be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between the two versions? Do we want to update the version in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@krfricke Thanks for debugging that! That change sounds good to unblock, very strange that it's no longer respecting the default AWS credentials config, which should pull from those environment variables. I've looked at the version diff for Arrow's S3FileSystem
, both the thin Cython layer and the C++ core bit, nothing obvious pops out. I'll look into debugging this if it pops up as an issue again!
@xwjiang2010 I think the Arrow version update makes sense, we have ~3 PRs in-flight that require Arrow 6.0.1+.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only not working on Buildkite instances fwiw, I've debugged in a repro instance. Happy to hop on a call to show you how to debug this efficiently
Note that release tests are currently failing, I'm investigating |
AWS tests passing: https://buildkite.com/ray-project/release-tests-pr/builds/441 |
Why are these changes needed?
Currently we have hardcoded support for three external/cloud storage providers: S3, GS, and HDFS. In the future it is likely that we want to support more providers.
This PR introduces a
Storage
class that provides support for uploading, downloading, and deleting from storage providers. As the first application, this allows us to register a storage provider fortest://
URIs that can be used in unit testing.The naming has been chosen to be
Storage
to avoid confusion with the existingExternalStorage
package for Ray core, and to be general enough for any kind of storage (contrasting with e.g.CloudStorage
).Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.