Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Using fsspec to download files #348

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
96ec15d
fsspec basic setup done and working for s3
deependujha Sep 1, 2024
45b59ae
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2024
74dae21
fix storage option in fsspec
deependujha Sep 2, 2024
fcb4d95
pass down `storage_options` in dataset utilities
deependujha Sep 2, 2024
3080c2c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 2, 2024
c31e259
tested successfully on S3 and GS for (mode= none | append | overwrite…
deependujha Sep 3, 2024
0c761b1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 3, 2024
2377983
fixed mypy errors and lock files when uploading/downloading
deependujha Sep 4, 2024
ffbf51d
update
deependujha Sep 4, 2024
de8b83b
fixed test `test_try_create_cache_dir`
deependujha Sep 4, 2024
e712327
fixed test: `test_reader_chunk_removal`
deependujha Sep 4, 2024
e118ba9
all tests passed
deependujha Sep 4, 2024
d3450dc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2024
ed0fff8
update
deependujha Sep 4, 2024
08236e8
update
deependujha Sep 4, 2024
12b049b
boto3 stop bothering me
deependujha Sep 4, 2024
d560d91
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2024
27644d3
update
deependujha Sep 4, 2024
bf06cf9
update
deependujha Sep 4, 2024
bdc13f4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 4, 2024
909b5cb
update
deependujha Sep 4, 2024
f661ae1
Merge branch 'main' into feat/using-fsspec-to-download-files
deependujha Sep 4, 2024
5671d11
tested on azure and made sure `storage_option` is working in all cases
deependujha Sep 5, 2024
2beebc9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
dd2e742
update
deependujha Sep 5, 2024
f555069
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
87a9556
update
deependujha Sep 5, 2024
8e9d448
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
555eb19
use s5cmd to download files if available
deependujha Sep 5, 2024
5ef4004
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 5, 2024
67205ea
add default storage_options
deependujha Sep 6, 2024
69fb43d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
b49a126
raise error if cloud is not supported
deependujha Sep 6, 2024
dbe8b0e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
5a81f04
update
deependujha Sep 6, 2024
848484a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
4d62fdd
fix windows error related to urllib parse scheme
deependujha Sep 6, 2024
6b961f7
Merge branch 'main' into feat/using-fsspec-to-download-files
deependujha Sep 6, 2024
e544d09
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 6, 2024
31f6e5a
Merge branch 'main' into feat/using-fsspec-to-download-files
bhimrazy Sep 16, 2024
5d1ec46
Merge branch 'main' into feat/using-fsspec-to-download-files
bhimrazy Sep 17, 2024
e68076d
cleanup commented code
deependujha Sep 18, 2024
79a3ad8
Merge branch 'main' into feat/using-fsspec-to-download-files
deependujha Sep 18, 2024
2036e37
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 18, 2024
e230ceb
update
deependujha Sep 18, 2024
e60f9ae
readme updated
deependujha Sep 18, 2024
feb5d48
increase test_dataset_resume_on_future_chunk timeout time to 120 seconds
deependujha Sep 18, 2024
b5ec077
update
deependujha Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ filelock
numpy
boto3
requests
fsspec
fsspec[s3] # aws s3
fsspec[gs] # google cloud storage
fsspec[abfs] # azure blob
6 changes: 4 additions & 2 deletions src/litdata/streaming/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def set_epoch(self, current_epoch: int) -> None:
def _create_cache(self, worker_env: _WorkerEnv) -> Cache:
if _should_replace_path(self.input_dir.path):
cache_path = _try_create_cache_dir(
input_dir=self.input_dir.path if self.input_dir.path else self.input_dir.url
input_dir=self.input_dir.path if self.input_dir.path else self.input_dir.url,
storage_options=self.storage_options,
)
if cache_path is not None:
self.input_dir.path = cache_path
Expand Down Expand Up @@ -438,7 +439,8 @@ def _validate_state_dict(self) -> None:
# In this case, validate the cache folder is the same.
if _should_replace_path(state["input_dir_path"]):
cache_path = _try_create_cache_dir(
input_dir=state["input_dir_path"] if state["input_dir_path"] else state["input_dir_url"]
input_dir=state["input_dir_path"] if state["input_dir_path"] else state["input_dir_url"],
storage_options=self.storage_options,
)
if cache_path != self.input_dir.path:
raise ValueError(
Expand Down
46 changes: 42 additions & 4 deletions src/litdata/streaming/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Any, Dict, List, Optional
from urllib import parse

import fsspec
from filelock import FileLock, Timeout

from litdata.constants import _AZURE_STORAGE_AVAILABLE, _GOOGLE_STORAGE_AVAILABLE, _INDEX_FILENAME
Expand All @@ -26,12 +27,17 @@

class Downloader(ABC):
def __init__(
self, remote_dir: str, cache_dir: str, chunks: List[Dict[str, Any]], storage_options: Optional[Dict] = {}
self,
cloud_provider: str,
remote_dir: str,
cache_dir: str,
chunks: List[Dict[str, Any]],
storage_options: Optional[Dict] = {},
):
self._remote_dir = remote_dir
self._cache_dir = cache_dir
self._chunks = chunks
self._storage_options = storage_options or {}
self.fs = fsspec.filesystem(cloud_provider, **storage_options)

def download_chunk_from_index(self, chunk_index: int) -> None:
chunk_filename = self._chunks[chunk_index]["filename"]
Expand Down Expand Up @@ -188,10 +194,42 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None:
}


_DOWNLOADERS = {
bhimrazy marked this conversation as resolved.
Show resolved Hide resolved
"s3://": "s3",
"gs://": "gs",
"azure://": "abfs",
"local:": "file",
"": "file",
}


class FsspecDownloader(Downloader):
bhimrazy marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
cloud_provider: str,
remote_dir: str,
cache_dir: str,
chunks: List[Dict[str, Any]],
storage_options: Dict | None = {},
):
remote_dir = remote_dir.replace("local:", "")
super().__init__(cloud_provider, remote_dir, cache_dir, chunks, storage_options)

def download_file(self, remote_filepath: str, local_filepath: str) -> None:
if os.path.exists(local_filepath) or remote_filepath == local_filepath:
return
try:
with FileLock(local_filepath + ".lock", timeout=3):
bhimrazy marked this conversation as resolved.
Show resolved Hide resolved
self.fs.get(remote_filepath, local_filepath, recursive=True)
except Timeout:
# another process is responsible to download that file, continue
pass


def get_downloader_cls(
remote_dir: str, cache_dir: str, chunks: List[Dict[str, Any]], storage_options: Optional[Dict] = {}
) -> Downloader:
for k, cls in _DOWNLOADERS.items():
for k, fs_cloud_provider in _DOWNLOADERS.items():
if str(remote_dir).startswith(k):
return cls(remote_dir, cache_dir, chunks, storage_options)
return FsspecDownloader(fs_cloud_provider, remote_dir, cache_dir, chunks, storage_options)
raise ValueError(f"The provided `remote_dir` {remote_dir} doesn't have a downloader associated.")
12 changes: 7 additions & 5 deletions src/litdata/utilities/dataset_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def subsample_streaming_dataset(

# Make sure input_dir contains cache path and remote url
if _should_replace_path(input_dir.path):
cache_path = _try_create_cache_dir(input_dir=input_dir.path if input_dir.path else input_dir.url)
cache_path = _try_create_cache_dir(
input_dir=input_dir.path if input_dir.path else input_dir.url, storage_options=storage_options
)
if cache_path is not None:
input_dir.path = cache_path

Expand Down Expand Up @@ -93,7 +95,7 @@ def _should_replace_path(path: Optional[str]) -> bool:
return path.startswith("/teamspace/datasets/") or path.startswith("/teamspace/s3_connections/")


def _read_updated_at(input_dir: Optional[Dir]) -> str:
def _read_updated_at(input_dir: Optional[Dir], storage_options: Optional[Dict]) -> str:
"""Read last updated timestamp from index.json file."""
last_updation_timestamp = "0"
index_json_content = None
Expand All @@ -107,7 +109,7 @@ def _read_updated_at(input_dir: Optional[Dir]) -> str:
# download index.json file and read last_updation_timestamp
with tempfile.TemporaryDirectory() as tmp_directory:
temp_index_filepath = os.path.join(tmp_directory, _INDEX_FILENAME)
downloader = get_downloader_cls(input_dir.url, tmp_directory, [])
downloader = get_downloader_cls(input_dir.url, tmp_directory, [], storage_options)
downloader.download_file(os.path.join(input_dir.url, _INDEX_FILENAME), temp_index_filepath)

index_json_content = load_index_file(tmp_directory)
Expand All @@ -132,9 +134,9 @@ def _clear_cache_dir_if_updated(input_dir_hash_filepath: str, updated_at_hash: s
shutil.rmtree(input_dir_hash_filepath)


def _try_create_cache_dir(input_dir: Optional[str]) -> Optional[str]:
def _try_create_cache_dir(input_dir: Optional[str], storage_options: Optional[Dict]) -> Optional[str]:
resolved_input_dir = _resolve_dir(input_dir)
updated_at = _read_updated_at(resolved_input_dir)
updated_at = _read_updated_at(resolved_input_dir, storage_options)

if updated_at == "0" and input_dir is not None:
updated_at = hashlib.md5(input_dir.encode()).hexdigest() # noqa: S324
Expand Down
Loading