Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

torchdata/datapipes/iter/load/fsspec: added fsspec datapipes #116

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
pip3 install -r requirements.txt
pip3 install --pre torch -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html
- name: Install test requirements
run: pip3 install expecttest iopath==0.1.9 numpy pytest rarfile
run: pip3 install expecttest fsspec iopath==0.1.9 numpy pytest rarfile
- name: Build TorchData
run: python setup.py develop
- name: Run DataPipes tests with pytest
Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ ignore_missing_imports = True

[mypy-rarfile.*]
ignore_missing_imports = True

[mypy-fsspec.*]
ignore_missing_imports = True
165 changes: 165 additions & 0 deletions test/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import expecttest
import os
import unittest
import warnings

from torchdata.datapipes.iter import (
FileLister,
IterableWrapper,
FSSpecFileLister,
FSSpecFileOpener,
FSSpecSaver,
)

from _utils._common_utils_for_test import (
create_temp_dir,
create_temp_files,
reset_after_n_next_calls,
)

try:
import fsspec

HAS_FSSPEC = True
except ImportError:
HAS_FSSPEC = False
skipIfNoFSSpec = unittest.skipIf(not HAS_FSSPEC, "no fsspec")


class TestDataPipeFSSpec(expecttest.TestCase):
def setUp(self):
self.temp_dir = create_temp_dir()
self.temp_files = create_temp_files(self.temp_dir)
self.temp_sub_dir = create_temp_dir(self.temp_dir.name)
self.temp_sub_files = create_temp_files(self.temp_sub_dir, 4, False)

def tearDown(self):
try:
self.temp_sub_dir.cleanup()
self.temp_dir.cleanup()
except Exception as e:
warnings.warn(
f"TestDataPipeLocalIO was not able to cleanup temp dir due to {e}"
)

def _write_text_files(self):
def filepath_fn(name: str) -> str:
return os.path.join(self.temp_dir.name, os.path.basename(name))

name_to_data = {"1.text": b"DATA", "2.text": b"DATA", "3.text": b"DATA"}
source_dp = IterableWrapper(sorted(name_to_data.items()))
saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb")
list(saver_dp)

@skipIfNoFSSpec
def test_fsspec_file_lister_iterdatapipe(self):
datapipe = FSSpecFileLister(root="file://" + self.temp_sub_dir.name)

# check all file paths within sub_folder are listed
for path in datapipe:
self.assertIn(
path.split("://")[1],
{
fsspec.implementations.local.make_path_posix(file)
for file in self.temp_sub_files
},
)

@skipIfNoFSSpec
def test_fsspec_file_loader_iterdatapipe(self):
datapipe1 = FSSpecFileLister(root="file://" + self.temp_sub_dir.name)
datapipe2 = FSSpecFileOpener(datapipe1)

# check contents of file match
for _, f in datapipe2:
self.assertEqual(f.read(), "0123456789abcdef")

# Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted
self._write_text_files()
lister_dp = FileLister(self.temp_dir.name, "*.text")
fsspec_file_loader_dp = FSSpecFileOpener(lister_dp, mode="rb")

n_elements_before_reset = 2
res_before_reset, res_after_reset = reset_after_n_next_calls(
fsspec_file_loader_dp, n_elements_before_reset
)
self.assertEqual(2, len(res_before_reset))
self.assertEqual(3, len(res_after_reset))
for _name, stream in res_before_reset:
self.assertEqual(b"DATA", stream.read())
for _name, stream in res_after_reset:
self.assertEqual(b"DATA", stream.read())

@skipIfNoFSSpec
def test_fsspec_saver_iterdatapipe(self):
def filepath_fn(name: str) -> str:
return "file://" + os.path.join(self.temp_dir.name, os.path.basename(name))

# Functional Test: Saving some data
name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2", "3.txt": b"DATA3"}
source_dp = IterableWrapper(sorted(name_to_data.items()))
saver_dp = source_dp.save_by_fsspec(filepath_fn=filepath_fn, mode="wb")
res_file_paths = list(saver_dp)
expected_paths = [filepath_fn(name) for name in name_to_data.keys()]
self.assertEqual(expected_paths, res_file_paths)
for name in name_to_data.keys():
p = filepath_fn(name).split("://")[1]
with open(p, "r") as f:
self.assertEqual(name_to_data[name], f.read().encode())

# Reset Test:
saver_dp = FSSpecSaver(source_dp, filepath_fn=filepath_fn, mode="wb")
n_elements_before_reset = 2
res_before_reset, res_after_reset = reset_after_n_next_calls(
saver_dp, n_elements_before_reset
)
self.assertEqual([filepath_fn("1.txt"), filepath_fn("2.txt")], res_before_reset)
self.assertEqual(expected_paths, res_after_reset)
for name in name_to_data.keys():
p = filepath_fn(name).split("://")[1]
with open(p, "r") as f:
self.assertEqual(name_to_data[name], f.read().encode())

# __len__ Test: returns the length of source DataPipe
self.assertEqual(3, len(saver_dp))

@skipIfNoFSSpec
def test_fsspec_memory_list(self):
fs = fsspec.filesystem("memory")
fs.mkdir("foo")
fs.touch("foo/bar1")
fs.touch("foo/bar2")

datapipe = FSSpecFileLister(root="memory://foo")
self.assertEqual(set(datapipe), {"memory:///foo/bar1", "memory:///foo/bar2"})

datapipe = FSSpecFileLister(root="memory://foo/bar1")
self.assertEqual(set(datapipe), {"memory://foo/bar1"})

@skipIfNoFSSpec
def test_fsspec_memory_load(self):
fs = fsspec.filesystem("memory")
with fs.open("file", "w") as f:
f.write("hello")
with fs.open("file2", "w") as f:
f.write("hello2")

files = ["memory://file", "memory://file2"]
datapipe = FSSpecFileOpener(files)
self.assertEqual([f.read() for _, f in datapipe], ["hello", "hello2"])

@skipIfNoFSSpec
def test_fsspec_memory_save(self):
def filepath_fn(name: str) -> str:
return "memory://" + name

name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2"}
source_dp = IterableWrapper(sorted(name_to_data.items()))
saver_dp = FSSpecSaver(source_dp, filepath_fn=filepath_fn, mode="wb")

self.assertEqual(set(saver_dp), {"memory://1.txt", "memory://2.txt"})


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions torchdata/datapipes/iter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
IoPathFileLoaderIterDataPipe as IoPathFileLoader,
IoPathSaverIterDataPipe as IoPathSaver,
)
from torchdata.datapipes.iter.load.fsspec import (
FSSpecFileListerIterDataPipe as FSSpecFileLister,
FSSpecFileOpenerIterDataPipe as FSSpecFileOpener,
FSSpecSaverIterDataPipe as FSSpecSaver,
)
d4l3k marked this conversation as resolved.
Show resolved Hide resolved
from torchdata.datapipes.iter.transform.bucketbatcher import BucketBatcherIterDataPipe as BucketBatcher
from torchdata.datapipes.iter.util.cacheholder import (
EndOnDiskCacheHolderIterDataPipe as EndOnDiskCacheHolder,
Expand Down Expand Up @@ -80,6 +85,9 @@
"EndOnDiskCacheHolder",
"Enumerator",
"Extractor",
"FSSpecFileLister",
"FSSpecFileOpener",
"FSSpecSaver",
"FileLister",
"FileLoader",
"Filter",
Expand Down
139 changes: 139 additions & 0 deletions torchdata/datapipes/iter/load/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import os
import posixpath

from typing import Any, Callable, Iterator, List, Optional, Tuple, Union

from torch.utils.data.datapipes.utils.common import match_masks

from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe
from torchdata.datapipes.utils import StreamWrapper

try:
import fsspec

except ImportError:
fsspec = None

U = Union[bytes, bytearray, str]


def _assert_fsspec() -> None:
if fsspec is None:
raise ModuleNotFoundError(
"Package `fsspec` is required to be installed to use this datapipe."
"Please use `pip install fsspec` or `conda install -c conda-forge fsspec`"
"to install the package"
)


class FSSpecFileListerIterDataPipe(IterDataPipe[str]):
r""":class:`FSSpecFileListerIterDataPipe`.
Iterable DataPipe to list the contents of the directory at the provided `root` pathname or url,
and yields the full pathname or url for each file within the directory.
Args:
root: The root fsspec path directory to list files from
masks: Unix style filter string or string list for filtering file name(s)
"""

def __init__(
self,
root: str,
masks: Union[str, List[str]] = "",
) -> None:
_assert_fsspec()

self.root: str = root
self.masks = masks

def __iter__(self) -> Iterator[str]:
fs, path = fsspec.core.url_to_fs(self.root)
is_local = fs.protocol == "file" or not self.root.startswith(fs.protocol)
if fs.isfile(path):
yield self.root
else:
for file_name in fs.ls(path):
if not match_masks(file_name, self.masks):
continue

# ensure the file name has the full fsspec protocol path
if file_name.startswith(fs.protocol):
yield file_name
else:
if is_local:
abs_path = os.path.join(path, file_name)
else:
abs_path = posixpath.join(path, file_name)

if self.root.startswith(fs.protocol):
yield fs.protocol + "://" + abs_path
else:
yield abs_path


@functional_datapipe("open_file_by_fsspec")
class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r""":class:`FSSpecFileOpenerIterDataPipe`.
Iterable DataPipe to open files from input datapipe which contains fsspec paths
and yields a tuple of pathname and opened file stream.
Args:
source_datapipe: Iterable DataPipe that provides the pathnames or urls
mode: An optional string that specifies the mode in which the file is opened ('r' by default)
"""

def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r") -> None:
_assert_fsspec()

self.source_datapipe: IterDataPipe[str] = source_datapipe
self.mode: str = mode

def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]:
for file_uri in self.source_datapipe:
fs, path = fsspec.core.url_to_fs(file_uri)
file = fs.open(path, self.mode)
yield file_uri, StreamWrapper(file)

def __len__(self) -> int:
return len(self.source_datapipe)


@functional_datapipe("save_by_fsspec")
class FSSpecSaverIterDataPipe(IterDataPipe[str]):
r"""
Iterable DataPipe that takes in a DataPipe of tuples of metadata and data, saves the data
to the target path (generated by the filepath_fn and metadata), and yields the resulting fsspec
path
Args:
source_datapipe: Iterable DataPipe with tuples of metadata and data
mode: Mode in which the file will be opened for write the data ("w" by default)
filepath_fn: Function that takes in metadata nad returns the target path of the new file
"""

def __init__(
self,
source_datapipe: IterDataPipe[Tuple[Any, U]],
mode: str = "w",
filepath_fn: Optional[Callable] = None,
):
_assert_fsspec()

self.source_datapipe: IterDataPipe[Tuple[Any, U]] = source_datapipe
self.mode: str = mode
self.filepath_fn: Optional[Callable] = filepath_fn

def __iter__(self) -> Iterator[str]:
for meta, data in self.source_datapipe:
filepath = meta if self.filepath_fn is None else self.filepath_fn(meta)
fs, path = fsspec.core.url_to_fs(filepath)
with fs.open(path, self.mode) as f:
f.write(data)
yield filepath

def __len__(self) -> int:
return len(self.source_datapipe)