Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gfile: add support for fsspec filesystems #5248

Merged
merged 1 commit into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ jobs:
- name: 'Bazel: run manual tests'
run: |
bazel test //tensorboard/compat/tensorflow_stub:gfile_s3_test &&
bazel test //tensorboard/summary/writer:event_file_writer_s3_test
bazel test //tensorboard/summary/writer:event_file_writer_s3_test &&
bazel test //tensorboard/compat/tensorflow_stub:gfile_fsspec_test &&
bazel test //tensorboard/summary/writer:event_file_writer_fsspec_test

build-data-server-pip:
runs-on: ${{ matrix.platform }}
Expand Down
7 changes: 7 additions & 0 deletions tensorboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,13 @@ py_library(name = "expect_requests_installed")
# optional dependency.
py_library(name = "expect_pandas_installed")

# This is a dummy rule used as a fsspec dependency in open-source.
# We expect fsspec to already be installed on the system, e.g. via
# `pip install fsspec`.
# NOTE: Unlike other parallel dependencies in this file, fsspec is an
# optional dependency.
py_library(name = "expect_fsspec_installed")

py_library(
name = "data_compat",
srcs = ["data_compat.py"],
Expand Down
14 changes: 14 additions & 0 deletions tensorboard/compat/tensorflow_stub/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ py_library(
srcs_version = "PY3",
deps = [
"//tensorboard:expect_absl_flags_installed",
"//tensorboard:expect_fsspec_installed",
"//tensorboard:expect_numpy_installed",
"//tensorboard/compat/proto:protos_all_py_pb2",
],
Expand Down Expand Up @@ -59,3 +60,16 @@ py_test(
"//tensorboard:test",
],
)

py_test(
name = "gfile_fsspec_test",
size = "small",
srcs = ["io/gfile_fsspec_test.py"],
srcs_version = "PY3",
tags = ["support_notf"],
deps = [
":tensorflow_stub",
"//tensorboard:expect_fsspec_installed",
"//tensorboard:test",
d4l3k marked this conversation as resolved.
Show resolved Hide resolved
],
)
247 changes: 247 additions & 0 deletions tensorboard/compat/tensorflow_stub/io/gfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import glob as py_glob
import io
import os
import os.path
import sys
import tempfile

Expand All @@ -35,6 +36,13 @@
except ImportError:
S3_ENABLED = False

try:
import fsspec

FSSPEC_ENABLED = True
except ImportError:
FSSPEC_ENABLED = False

if sys.version_info < (3, 0):
# In Python 2 FileExistsError is not defined and the
# error manifests it as OSError.
Expand Down Expand Up @@ -69,6 +77,8 @@ def get_filesystem(filename):
if index >= 0:
prefix = filename[:index]
fs = _REGISTERED_FILESYSTEMS.get(prefix, None)
if fs is None:
fs = _get_fsspec_filesystem(filename)
if fs is None:
raise ValueError("No recognized filesystem for prefix %s" % prefix)
return fs
Expand Down Expand Up @@ -401,6 +411,242 @@ def stat(self, filename):
raise


class FSSpecFileSystem(object):
"""Provides filesystem access via fsspec.

The current gfile interface doesn't map perfectly to the fsspec interface
leading to some notable inefficiencies.

* Reads and writes to files cause the file to be reopened each time which
can cause a performance hit when accessing local file systems.
* walk doesn't use the native fsspec walk function so performance may be
slower.

See https://github.com/tensorflow/tensorboard/issues/5286 for more info on
limitations.
"""

SEPARATOR = "://"
CHAIN_SEPARATOR = "::"
d4l3k marked this conversation as resolved.
Show resolved Hide resolved

def _validate_path(self, path):
parts = path.split(self.CHAIN_SEPARATOR)
for part in parts[:-1]:
if self.SEPARATOR in part:
raise errors.InvalidArgumentError(
None,
None,
"fsspec URL must only have paths in the last chained filesystem, got {}".format(
path
),
)

def _translate_errors(func):
def func_wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except FileNotFoundError as e:
raise errors.NotFoundError(None, None, str(e))

return func_wrapper

def _fs_path(self, filename):
if isinstance(filename, bytes):
filename = filename.decode("utf-8")
self._validate_path(filename)

fs, path = fsspec.core.url_to_fs(filename)
return fs, path

@_translate_errors
def exists(self, filename):
"""Determines whether a path exists or not."""
fs, path = self._fs_path(filename)
return fs.exists(path)

def _join(self, sep, paths):
"""
_join joins the paths with the given separator.
"""
result = []
for part in paths:
if part.startswith(sep):
result = []
if result and result[-1] and not result[-1].endswith(sep):
result.append(sep)
result.append(part)
return "".join(result)

@_translate_errors
def join(self, path, *paths):
Copy link
Contributor

@nfelt nfelt Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to simplify this to something like:

_, _, base_url = path.rpartition(self.CHAIN_SEPARATOR)
protocol, path = fsspec.core.split_protocol(base_url)
fs = fsspec.get_filesystem_class(protocol)
return fs.sep.join((path,) + paths)

Also, it'd be better if the join step would work like os.path.join() and collapse adjacent separators (and handle absolute paths), though I see the existing S3 implementation doesn't do that. I think a helper like this would suffice:

def _join(sep, parts):
   result = []
   for part in parts:
     if part.startswith(sep):
       result = []
     if result and result[-1] and not result[-1].endswith(sep):
       result.append(sep)
     result.append(part)
   return "".join(result)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize it's an edge case, but mind adding back the and result[-1] part of the condition? That was deliberate; without it we get different behavior from os.path.join():

>>> _join("/", ("foo", "", "bar"))
'foo/bar'
>>> _join_without_extra_check("/", ("foo", "", "bar"))
'foo//bar'
>>> os.path.join("foo", "", "bar")
'foo/bar'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added and updated unit test to check for this case

"""Join paths with a slash."""
self._validate_path(path)

before, sep, last_path = path.rpartition(self.CHAIN_SEPARATOR)
chain_prefix = before + sep
protocol, path = fsspec.core.split_protocol(last_path)
fs = fsspec.get_filesystem_class(protocol)
if protocol:
chain_prefix += protocol + self.SEPARATOR
return chain_prefix + self._join(fs.sep, ((path,) + paths))

@_translate_errors
def read(self, filename, binary_mode=False, size=None, continue_from=None):
"""Reads contents of a file to a string.

Args:
filename: string, a path
binary_mode: bool, read as binary if True, otherwise text
size: int, number of bytes or characters to read, otherwise
read all the contents of the file (from the continuation
marker, if present).
continue_from: An opaque value returned from a prior invocation of
`read(...)` marking the last read position, so that reading
may continue from there. Otherwise read from the beginning.

Returns:
A tuple of `(data, continuation_token)` where `data' provides either
bytes read from the file (if `binary_mode == true`) or the decoded
string representation thereof (otherwise), and `continuation_token`
is an opaque value that can be passed to the next invocation of
`read(...) ' in order to continue from the last read position.
"""
fs, path = self._fs_path(filename)

mode = "rb" if binary_mode else "r"
encoding = None if binary_mode else "utf8"
if not exists(filename):
raise errors.NotFoundError(
None, None, "Not Found: " + compat.as_text(filename)
)
with fs.open(path, mode, encoding=encoding) as f:
if continue_from is not None:
if not f.seekable():
raise errors.InvalidArgumentError(
None,
None,
"{} is not seekable".format(filename),
)
offset = continue_from.get("opaque_offset", None)
if offset is not None:
f.seek(offset)

data = f.read(size)
# The new offset may not be `offset + len(data)`, due to decoding
# and newline translation.
# So, just measure it in whatever terms the underlying stream uses.
continuation_token = (
{"opaque_offset": f.tell()} if f.seekable() else {}
)
return (data, continuation_token)

@_translate_errors
def write(self, filename, file_content, binary_mode=False):
"""Writes string file contents to a file.

Args:
filename: string, a path
file_content: string, the contents
binary_mode: bool, write as binary if True, otherwise text
"""
self._write(filename, file_content, "wb" if binary_mode else "w")

@_translate_errors
def append(self, filename, file_content, binary_mode=False):
"""Append string file contents to a file.

Args:
filename: string, a path
file_content: string, the contents to append
binary_mode: bool, write as binary if True, otherwise text
"""
self._write(filename, file_content, "ab" if binary_mode else "a")

def _write(self, filename, file_content, mode):
fs, path = self._fs_path(filename)
encoding = None if "b" in mode else "utf8"
with fs.open(path, mode, encoding=encoding) as f:
compatify = compat.as_bytes if "b" in mode else compat.as_text
f.write(compatify(file_content))

def _get_chain_protocol_prefix(self, filename):
chain_prefix, chain_sep, last_path = filename.rpartition(
self.CHAIN_SEPARATOR
)
protocol, sep, _ = last_path.rpartition(self.SEPARATOR)
return chain_prefix + chain_sep + protocol + sep

@_translate_errors
def glob(self, filename):
"""Returns a list of files that match the given pattern(s)."""
if isinstance(filename, bytes):
filename = filename.decode("utf-8")

fs, path = self._fs_path(filename)
files = fs.glob(path)

# check if applying the original chaining is required.
if (
self.SEPARATOR not in filename
and self.CHAIN_SEPARATOR not in filename
):
return files

prefix = self._get_chain_protocol_prefix(filename)

return [
file
if (self.SEPARATOR in file or self.CHAIN_SEPARATOR in file)
else prefix + file
for file in files
]

@_translate_errors
def isdir(self, dirname):
"""Returns whether the path is a directory or not."""
fs, path = self._fs_path(dirname)
return fs.isdir(path)

@_translate_errors
def listdir(self, dirname):
"""Returns a list of entries contained within a directory."""
fs, path = self._fs_path(dirname)
files = fs.listdir(path, detail=False)
files = [os.path.basename(fname) for fname in files]
return files

@_translate_errors
def makedirs(self, dirname):
"""Creates a directory and all parent/intermediate directories."""
fs, path = self._fs_path(dirname)
return fs.makedirs(path, exist_ok=True)

@_translate_errors
def stat(self, filename):
"""Returns file statistics for a given path."""
fs, path = self._fs_path(filename)
return StatData(fs.size(path))


_FSSPEC_FILESYSTEM = FSSpecFileSystem()


def _get_fsspec_filesystem(filename):
"""
_get_fsspec_filesystem checks if the provided protocol is known to fsspec
and if so returns the filesystem wrapper for it.
"""
if not FSSPEC_ENABLED:
return None

segment = filename.partition(FSSpecFileSystem.CHAIN_SEPARATOR)[0]
protocol = segment.partition(FSSpecFileSystem.SEPARATOR)[0]
if fsspec.get_filesystem_class(protocol):
return _FSSPEC_FILESYSTEM
else:
return None


register_filesystem("", LocalFileSystem())
if S3_ENABLED:
register_filesystem("s3", S3FileSystem())
Expand Down Expand Up @@ -514,6 +760,7 @@ def write(self, file_content):
# write the first chunk to truncate file if it already exists
self.fs.write(self.filename, file_content, self.binary_mode)
self.write_started = True

else:
# append the later chunks
self.fs.append(self.filename, file_content, self.binary_mode)
Expand Down
Loading