Skip to content

Commit

Permalink
torchdata/datapipes/iter/load/fsspec: added fsspec datapipes (#116)
Browse files Browse the repository at this point in the history
Summary:
This adds fsspec datapipes that are equivalent to the existing iopath ones. The tests are largely equivalent and test the `file://` and `memory://` filesystems to ensure compatibility.

Closes #114

Pull Request resolved: #116

Test Plan: pytest tests/test_fsspec.py

Reviewed By: ejguan

Differential Revision: D32933676

Pulled By: d4l3k

fbshipit-source-id: 30a0b9da385f5d9328d07105234a92d7259ae05d
  • Loading branch information
d4l3k authored and facebook-github-bot committed Dec 10, 2021
1 parent 6309113 commit f102d25
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
pip3 install -r requirements.txt
pip3 install --pre torch -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html
- name: Install test requirements
run: pip3 install expecttest iopath==0.1.9 numpy pytest rarfile
run: pip3 install expecttest fsspec iopath==0.1.9 numpy pytest rarfile
- name: Build TorchData
run: python setup.py develop
- name: Run DataPipes tests with pytest
Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ ignore_missing_imports = True

[mypy-rarfile.*]
ignore_missing_imports = True

[mypy-fsspec.*]
ignore_missing_imports = True
165 changes: 165 additions & 0 deletions test/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import expecttest
import os
import unittest
import warnings

from torchdata.datapipes.iter import (
FileLister,
IterableWrapper,
FSSpecFileLister,
FSSpecFileOpener,
FSSpecSaver,
)

from _utils._common_utils_for_test import (
create_temp_dir,
create_temp_files,
reset_after_n_next_calls,
)

try:
import fsspec

HAS_FSSPEC = True
except ImportError:
HAS_FSSPEC = False
skipIfNoFSSpec = unittest.skipIf(not HAS_FSSPEC, "no fsspec")


class TestDataPipeFSSpec(expecttest.TestCase):
def setUp(self):
self.temp_dir = create_temp_dir()
self.temp_files = create_temp_files(self.temp_dir)
self.temp_sub_dir = create_temp_dir(self.temp_dir.name)
self.temp_sub_files = create_temp_files(self.temp_sub_dir, 4, False)

def tearDown(self):
try:
self.temp_sub_dir.cleanup()
self.temp_dir.cleanup()
except Exception as e:
warnings.warn(
f"TestDataPipeLocalIO was not able to cleanup temp dir due to {e}"
)

def _write_text_files(self):
def filepath_fn(name: str) -> str:
return os.path.join(self.temp_dir.name, os.path.basename(name))

name_to_data = {"1.text": b"DATA", "2.text": b"DATA", "3.text": b"DATA"}
source_dp = IterableWrapper(sorted(name_to_data.items()))
saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb")
list(saver_dp)

@skipIfNoFSSpec
def test_fsspec_file_lister_iterdatapipe(self):
datapipe = FSSpecFileLister(root="file://" + self.temp_sub_dir.name)

# check all file paths within sub_folder are listed
for path in datapipe:
self.assertIn(
path.split("://")[1],
{
fsspec.implementations.local.make_path_posix(file)
for file in self.temp_sub_files
},
)

@skipIfNoFSSpec
def test_fsspec_file_loader_iterdatapipe(self):
datapipe1 = FSSpecFileLister(root="file://" + self.temp_sub_dir.name)
datapipe2 = FSSpecFileOpener(datapipe1)

# check contents of file match
for _, f in datapipe2:
self.assertEqual(f.read(), "0123456789abcdef")

# Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted
self._write_text_files()
lister_dp = FileLister(self.temp_dir.name, "*.text")
fsspec_file_loader_dp = FSSpecFileOpener(lister_dp, mode="rb")

n_elements_before_reset = 2
res_before_reset, res_after_reset = reset_after_n_next_calls(
fsspec_file_loader_dp, n_elements_before_reset
)
self.assertEqual(2, len(res_before_reset))
self.assertEqual(3, len(res_after_reset))
for _name, stream in res_before_reset:
self.assertEqual(b"DATA", stream.read())
for _name, stream in res_after_reset:
self.assertEqual(b"DATA", stream.read())

@skipIfNoFSSpec
def test_fsspec_saver_iterdatapipe(self):
def filepath_fn(name: str) -> str:
return "file://" + os.path.join(self.temp_dir.name, os.path.basename(name))

# Functional Test: Saving some data
name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2", "3.txt": b"DATA3"}
source_dp = IterableWrapper(sorted(name_to_data.items()))
saver_dp = source_dp.save_by_fsspec(filepath_fn=filepath_fn, mode="wb")
res_file_paths = list(saver_dp)
expected_paths = [filepath_fn(name) for name in name_to_data.keys()]
self.assertEqual(expected_paths, res_file_paths)
for name in name_to_data.keys():
p = filepath_fn(name).split("://")[1]
with open(p, "r") as f:
self.assertEqual(name_to_data[name], f.read().encode())

# Reset Test:
saver_dp = FSSpecSaver(source_dp, filepath_fn=filepath_fn, mode="wb")
n_elements_before_reset = 2
res_before_reset, res_after_reset = reset_after_n_next_calls(
saver_dp, n_elements_before_reset
)
self.assertEqual([filepath_fn("1.txt"), filepath_fn("2.txt")], res_before_reset)
self.assertEqual(expected_paths, res_after_reset)
for name in name_to_data.keys():
p = filepath_fn(name).split("://")[1]
with open(p, "r") as f:
self.assertEqual(name_to_data[name], f.read().encode())

# __len__ Test: returns the length of source DataPipe
self.assertEqual(3, len(saver_dp))

@skipIfNoFSSpec
def test_fsspec_memory_list(self):
fs = fsspec.filesystem("memory")
fs.mkdir("foo")
fs.touch("foo/bar1")
fs.touch("foo/bar2")

datapipe = FSSpecFileLister(root="memory://foo")
self.assertEqual(set(datapipe), {"memory:///foo/bar1", "memory:///foo/bar2"})

datapipe = FSSpecFileLister(root="memory://foo/bar1")
self.assertEqual(set(datapipe), {"memory://foo/bar1"})

@skipIfNoFSSpec
def test_fsspec_memory_load(self):
fs = fsspec.filesystem("memory")
with fs.open("file", "w") as f:
f.write("hello")
with fs.open("file2", "w") as f:
f.write("hello2")

files = ["memory://file", "memory://file2"]
datapipe = FSSpecFileOpener(files)
self.assertEqual([f.read() for _, f in datapipe], ["hello", "hello2"])

@skipIfNoFSSpec
def test_fsspec_memory_save(self):
def filepath_fn(name: str) -> str:
return "memory://" + name

name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2"}
source_dp = IterableWrapper(sorted(name_to_data.items()))
saver_dp = FSSpecSaver(source_dp, filepath_fn=filepath_fn, mode="wb")

self.assertEqual(set(saver_dp), {"memory://1.txt", "memory://2.txt"})


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions torchdata/datapipes/iter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
IoPathFileLoaderIterDataPipe as IoPathFileLoader,
IoPathSaverIterDataPipe as IoPathSaver,
)
from torchdata.datapipes.iter.load.fsspec import (
FSSpecFileListerIterDataPipe as FSSpecFileLister,
FSSpecFileOpenerIterDataPipe as FSSpecFileOpener,
FSSpecSaverIterDataPipe as FSSpecSaver,
)
from torchdata.datapipes.iter.transform.bucketbatcher import BucketBatcherIterDataPipe as BucketBatcher
from torchdata.datapipes.iter.util.cacheholder import (
EndOnDiskCacheHolderIterDataPipe as EndOnDiskCacheHolder,
Expand Down Expand Up @@ -80,6 +85,9 @@
"EndOnDiskCacheHolder",
"Enumerator",
"Extractor",
"FSSpecFileLister",
"FSSpecFileOpener",
"FSSpecSaver",
"FileLister",
"FileLoader",
"Filter",
Expand Down
139 changes: 139 additions & 0 deletions torchdata/datapipes/iter/load/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import os
import posixpath

from typing import Any, Callable, Iterator, List, Optional, Tuple, Union

from torch.utils.data.datapipes.utils.common import match_masks

from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe
from torchdata.datapipes.utils import StreamWrapper

try:
import fsspec

except ImportError:
fsspec = None

U = Union[bytes, bytearray, str]


def _assert_fsspec() -> None:
if fsspec is None:
raise ModuleNotFoundError(
"Package `fsspec` is required to be installed to use this datapipe."
"Please use `pip install fsspec` or `conda install -c conda-forge fsspec`"
"to install the package"
)


class FSSpecFileListerIterDataPipe(IterDataPipe[str]):
r""":class:`FSSpecFileListerIterDataPipe`.
Iterable DataPipe to list the contents of the directory at the provided `root` pathname or url,
and yields the full pathname or url for each file within the directory.
Args:
root: The root fsspec path directory to list files from
masks: Unix style filter string or string list for filtering file name(s)
"""

def __init__(
self,
root: str,
masks: Union[str, List[str]] = "",
) -> None:
_assert_fsspec()

self.root: str = root
self.masks = masks

def __iter__(self) -> Iterator[str]:
fs, path = fsspec.core.url_to_fs(self.root)
is_local = fs.protocol == "file" or not self.root.startswith(fs.protocol)
if fs.isfile(path):
yield self.root
else:
for file_name in fs.ls(path):
if not match_masks(file_name, self.masks):
continue

# ensure the file name has the full fsspec protocol path
if file_name.startswith(fs.protocol):
yield file_name
else:
if is_local:
abs_path = os.path.join(path, file_name)
else:
abs_path = posixpath.join(path, file_name)

if self.root.startswith(fs.protocol):
yield fs.protocol + "://" + abs_path
else:
yield abs_path


@functional_datapipe("open_file_by_fsspec")
class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r""":class:`FSSpecFileOpenerIterDataPipe`.
Iterable DataPipe to open files from input datapipe which contains fsspec paths
and yields a tuple of pathname and opened file stream.
Args:
source_datapipe: Iterable DataPipe that provides the pathnames or urls
mode: An optional string that specifies the mode in which the file is opened ('r' by default)
"""

def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r") -> None:
_assert_fsspec()

self.source_datapipe: IterDataPipe[str] = source_datapipe
self.mode: str = mode

def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]:
for file_uri in self.source_datapipe:
fs, path = fsspec.core.url_to_fs(file_uri)
file = fs.open(path, self.mode)
yield file_uri, StreamWrapper(file)

def __len__(self) -> int:
return len(self.source_datapipe)


@functional_datapipe("save_by_fsspec")
class FSSpecSaverIterDataPipe(IterDataPipe[str]):
r"""
Iterable DataPipe that takes in a DataPipe of tuples of metadata and data, saves the data
to the target path (generated by the filepath_fn and metadata), and yields the resulting fsspec
path
Args:
source_datapipe: Iterable DataPipe with tuples of metadata and data
mode: Mode in which the file will be opened for write the data ("w" by default)
filepath_fn: Function that takes in metadata nad returns the target path of the new file
"""

def __init__(
self,
source_datapipe: IterDataPipe[Tuple[Any, U]],
mode: str = "w",
filepath_fn: Optional[Callable] = None,
):
_assert_fsspec()

self.source_datapipe: IterDataPipe[Tuple[Any, U]] = source_datapipe
self.mode: str = mode
self.filepath_fn: Optional[Callable] = filepath_fn

def __iter__(self) -> Iterator[str]:
for meta, data in self.source_datapipe:
filepath = meta if self.filepath_fn is None else self.filepath_fn(meta)
fs, path = fsspec.core.url_to_fs(filepath)
with fs.open(path, self.mode) as f:
f.write(data)
yield filepath

def __len__(self) -> int:
return len(self.source_datapipe)

0 comments on commit f102d25

Please sign in to comment.