Skip to content

Commit

Permalink
gfile: add support for fsspec filesystems (tensorflow#5248)
Browse files Browse the repository at this point in the history
This adds support for fsspec filesystem backends to the gfile compat library. This allows for tensorboard to be used with a wider range of file system backends.

This adds a new GFile filesystem backend that's used if no other registered filesystems are present. In that case fsspec is checked to see if it has an available one and then returns a wrapped version.

fsspec provides similar semantics to normal usage however, it doesn't support seeking for files opened in text mode which can be problematic for longer text files. However, this seems to work regardless since most of tensorboards larger files are in a binary format.

Tests:
```sh
$ bazel run //tensorboard -- --logdir file:///tmp/torchxrun/lightning_logs/version_1/
$ bazel test //tensorboard/compat/tensorflow_stub:gfile_fsspec_test //tensorboard/summary/writer:event_file_writer_fsspec_test --test_output=errors
```

See: tensorflow#5165
  • Loading branch information
d4l3k authored and yatbear committed Mar 27, 2023
1 parent d618093 commit e4d2493
Show file tree
Hide file tree
Showing 8 changed files with 931 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ jobs:
- name: 'Bazel: run manual tests'
run: |
bazel test //tensorboard/compat/tensorflow_stub:gfile_s3_test &&
bazel test //tensorboard/summary/writer:event_file_writer_s3_test
bazel test //tensorboard/summary/writer:event_file_writer_s3_test &&
bazel test //tensorboard/compat/tensorflow_stub:gfile_fsspec_test &&
bazel test //tensorboard/summary/writer:event_file_writer_fsspec_test
build-data-server-pip:
runs-on: ${{ matrix.platform }}
Expand Down
7 changes: 7 additions & 0 deletions tensorboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,13 @@ py_library(name = "expect_requests_installed")
# optional dependency.
py_library(name = "expect_pandas_installed")

# This is a dummy rule used as a fsspec dependency in open-source.
# We expect fsspec to already be installed on the system, e.g. via
# `pip install fsspec`.
# NOTE: Unlike other parallel dependencies in this file, fsspec is an
# optional dependency.
py_library(name = "expect_fsspec_installed")

py_library(
name = "data_compat",
srcs = ["data_compat.py"],
Expand Down
14 changes: 14 additions & 0 deletions tensorboard/compat/tensorflow_stub/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ py_library(
srcs_version = "PY3",
deps = [
"//tensorboard:expect_absl_flags_installed",
"//tensorboard:expect_fsspec_installed",
"//tensorboard:expect_numpy_installed",
"//tensorboard/compat/proto:protos_all_py_pb2",
],
Expand Down Expand Up @@ -59,3 +60,16 @@ py_test(
"//tensorboard:test",
],
)

py_test(
name = "gfile_fsspec_test",
size = "small",
srcs = ["io/gfile_fsspec_test.py"],
srcs_version = "PY3",
tags = ["support_notf"],
deps = [
":tensorflow_stub",
"//tensorboard:expect_fsspec_installed",
"//tensorboard:test",
],
)
247 changes: 247 additions & 0 deletions tensorboard/compat/tensorflow_stub/io/gfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import glob as py_glob
import io
import os
import os.path
import sys
import tempfile

Expand All @@ -35,6 +36,13 @@
except ImportError:
S3_ENABLED = False

try:
import fsspec

FSSPEC_ENABLED = True
except ImportError:
FSSPEC_ENABLED = False

if sys.version_info < (3, 0):
# In Python 2 FileExistsError is not defined and the
# error manifests it as OSError.
Expand Down Expand Up @@ -69,6 +77,8 @@ def get_filesystem(filename):
if index >= 0:
prefix = filename[:index]
fs = _REGISTERED_FILESYSTEMS.get(prefix, None)
if fs is None:
fs = _get_fsspec_filesystem(filename)
if fs is None:
raise ValueError("No recognized filesystem for prefix %s" % prefix)
return fs
Expand Down Expand Up @@ -401,6 +411,242 @@ def stat(self, filename):
raise


class FSSpecFileSystem(object):
"""Provides filesystem access via fsspec.
The current gfile interface doesn't map perfectly to the fsspec interface
leading to some notable inefficiencies.
* Reads and writes to files cause the file to be reopened each time which
can cause a performance hit when accessing local file systems.
* walk doesn't use the native fsspec walk function so performance may be
slower.
See https://github.com/tensorflow/tensorboard/issues/5286 for more info on
limitations.
"""

SEPARATOR = "://"
CHAIN_SEPARATOR = "::"

def _validate_path(self, path):
parts = path.split(self.CHAIN_SEPARATOR)
for part in parts[:-1]:
if self.SEPARATOR in part:
raise errors.InvalidArgumentError(
None,
None,
"fsspec URL must only have paths in the last chained filesystem, got {}".format(
path
),
)

def _translate_errors(func):
def func_wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except FileNotFoundError as e:
raise errors.NotFoundError(None, None, str(e))

return func_wrapper

def _fs_path(self, filename):
if isinstance(filename, bytes):
filename = filename.decode("utf-8")
self._validate_path(filename)

fs, path = fsspec.core.url_to_fs(filename)
return fs, path

@_translate_errors
def exists(self, filename):
"""Determines whether a path exists or not."""
fs, path = self._fs_path(filename)
return fs.exists(path)

def _join(self, sep, paths):
"""
_join joins the paths with the given separator.
"""
result = []
for part in paths:
if part.startswith(sep):
result = []
if result and result[-1] and not result[-1].endswith(sep):
result.append(sep)
result.append(part)
return "".join(result)

@_translate_errors
def join(self, path, *paths):
"""Join paths with a slash."""
self._validate_path(path)

before, sep, last_path = path.rpartition(self.CHAIN_SEPARATOR)
chain_prefix = before + sep
protocol, path = fsspec.core.split_protocol(last_path)
fs = fsspec.get_filesystem_class(protocol)
if protocol:
chain_prefix += protocol + self.SEPARATOR
return chain_prefix + self._join(fs.sep, ((path,) + paths))

@_translate_errors
def read(self, filename, binary_mode=False, size=None, continue_from=None):
"""Reads contents of a file to a string.
Args:
filename: string, a path
binary_mode: bool, read as binary if True, otherwise text
size: int, number of bytes or characters to read, otherwise
read all the contents of the file (from the continuation
marker, if present).
continue_from: An opaque value returned from a prior invocation of
`read(...)` marking the last read position, so that reading
may continue from there. Otherwise read from the beginning.
Returns:
A tuple of `(data, continuation_token)` where `data' provides either
bytes read from the file (if `binary_mode == true`) or the decoded
string representation thereof (otherwise), and `continuation_token`
is an opaque value that can be passed to the next invocation of
`read(...) ' in order to continue from the last read position.
"""
fs, path = self._fs_path(filename)

mode = "rb" if binary_mode else "r"
encoding = None if binary_mode else "utf8"
if not exists(filename):
raise errors.NotFoundError(
None, None, "Not Found: " + compat.as_text(filename)
)
with fs.open(path, mode, encoding=encoding) as f:
if continue_from is not None:
if not f.seekable():
raise errors.InvalidArgumentError(
None,
None,
"{} is not seekable".format(filename),
)
offset = continue_from.get("opaque_offset", None)
if offset is not None:
f.seek(offset)

data = f.read(size)
# The new offset may not be `offset + len(data)`, due to decoding
# and newline translation.
# So, just measure it in whatever terms the underlying stream uses.
continuation_token = (
{"opaque_offset": f.tell()} if f.seekable() else {}
)
return (data, continuation_token)

@_translate_errors
def write(self, filename, file_content, binary_mode=False):
"""Writes string file contents to a file.
Args:
filename: string, a path
file_content: string, the contents
binary_mode: bool, write as binary if True, otherwise text
"""
self._write(filename, file_content, "wb" if binary_mode else "w")

@_translate_errors
def append(self, filename, file_content, binary_mode=False):
"""Append string file contents to a file.
Args:
filename: string, a path
file_content: string, the contents to append
binary_mode: bool, write as binary if True, otherwise text
"""
self._write(filename, file_content, "ab" if binary_mode else "a")

def _write(self, filename, file_content, mode):
fs, path = self._fs_path(filename)
encoding = None if "b" in mode else "utf8"
with fs.open(path, mode, encoding=encoding) as f:
compatify = compat.as_bytes if "b" in mode else compat.as_text
f.write(compatify(file_content))

def _get_chain_protocol_prefix(self, filename):
chain_prefix, chain_sep, last_path = filename.rpartition(
self.CHAIN_SEPARATOR
)
protocol, sep, _ = last_path.rpartition(self.SEPARATOR)
return chain_prefix + chain_sep + protocol + sep

@_translate_errors
def glob(self, filename):
"""Returns a list of files that match the given pattern(s)."""
if isinstance(filename, bytes):
filename = filename.decode("utf-8")

fs, path = self._fs_path(filename)
files = fs.glob(path)

# check if applying the original chaining is required.
if (
self.SEPARATOR not in filename
and self.CHAIN_SEPARATOR not in filename
):
return files

prefix = self._get_chain_protocol_prefix(filename)

return [
file
if (self.SEPARATOR in file or self.CHAIN_SEPARATOR in file)
else prefix + file
for file in files
]

@_translate_errors
def isdir(self, dirname):
"""Returns whether the path is a directory or not."""
fs, path = self._fs_path(dirname)
return fs.isdir(path)

@_translate_errors
def listdir(self, dirname):
"""Returns a list of entries contained within a directory."""
fs, path = self._fs_path(dirname)
files = fs.listdir(path, detail=False)
files = [os.path.basename(fname) for fname in files]
return files

@_translate_errors
def makedirs(self, dirname):
"""Creates a directory and all parent/intermediate directories."""
fs, path = self._fs_path(dirname)
return fs.makedirs(path, exist_ok=True)

@_translate_errors
def stat(self, filename):
"""Returns file statistics for a given path."""
fs, path = self._fs_path(filename)
return StatData(fs.size(path))


_FSSPEC_FILESYSTEM = FSSpecFileSystem()


def _get_fsspec_filesystem(filename):
"""
_get_fsspec_filesystem checks if the provided protocol is known to fsspec
and if so returns the filesystem wrapper for it.
"""
if not FSSPEC_ENABLED:
return None

segment = filename.partition(FSSpecFileSystem.CHAIN_SEPARATOR)[0]
protocol = segment.partition(FSSpecFileSystem.SEPARATOR)[0]
if fsspec.get_filesystem_class(protocol):
return _FSSPEC_FILESYSTEM
else:
return None


register_filesystem("", LocalFileSystem())
if S3_ENABLED:
register_filesystem("s3", S3FileSystem())
Expand Down Expand Up @@ -514,6 +760,7 @@ def write(self, file_content):
# write the first chunk to truncate file if it already exists
self.fs.write(self.filename, file_content, self.binary_mode)
self.write_started = True

else:
# append the later chunks
self.fs.append(self.filename, file_content, self.binary_mode)
Expand Down
Loading

0 comments on commit e4d2493

Please sign in to comment.