-
Notifications
You must be signed in to change notification settings - Fork 67
Feat: Add support for parquet files #443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6bde708
34c8a64
c1c0f13
ff40eba
d53dc7d
3db6033
239e093
76efafb
09d1000
8b880ac
520ef21
211c987
4640756
53e8995
75b3163
5a4e83b
7da25e0
7e23490
95fb340
3376e0a
caabd03
8791f1a
1d0b446
363529b
c55269c
48f4a45
26e9a14
59d9d14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,3 +5,4 @@ pyarrow | |
tqdm | ||
lightning-sdk==0.1.46 # Must be pinned to ensure compatibility | ||
google-cloud-storage | ||
polars |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,13 @@ | |
import numpy as np | ||
import torch | ||
|
||
from litdata.constants import _FORCE_DOWNLOAD_TIME, _MAX_WAIT_TIME, _NUMPY_DTYPES_MAPPING, _TORCH_DTYPES_MAPPING | ||
from litdata.constants import ( | ||
_FORCE_DOWNLOAD_TIME, | ||
_MAX_WAIT_TIME, | ||
_NUMPY_DTYPES_MAPPING, | ||
_POLARS_AVAILABLE, | ||
_TORCH_DTYPES_MAPPING, | ||
) | ||
from litdata.streaming.serializers import Serializer | ||
from litdata.utilities._pytree import PyTree, tree_unflatten | ||
from litdata.utilities.encryption import Encryption, EncryptionLevel | ||
|
@@ -412,3 +418,84 @@ def close(self, chunk_index: int) -> None: | |
@classmethod | ||
def encode_data(cls, data: List[bytes], _: List[int], flattened: List[Any]) -> Tuple[bytes, Optional[int]]: | ||
return data[0], flattened[0].shape[0] | ||
|
||
|
||
class ParquetLoader(BaseItemLoader): | ||
def __init__(self) -> None: | ||
if not _POLARS_AVAILABLE: | ||
raise ModuleNotFoundError("Please, run: `pip install polars`") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be good to prepend "You are using the Parquet item loader, which depends on Polars. Please run: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to make bound checks on the version? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure on the exact version bound. To be on safer side for now, I've simply updated to Is this fine, or should I refine it further? |
||
self._chunk_filepaths: Dict[str, bool] = {} | ||
|
||
def setup( | ||
self, | ||
config: Dict, | ||
chunks: List, | ||
serializers: Dict[str, Serializer], | ||
region_of_interest: Optional[List[Tuple[int, int]]] = None, | ||
) -> None: | ||
self._config = config | ||
self._chunks = chunks | ||
self._serializers = {**serializers} | ||
self._data_format = self._config["data_format"] | ||
self._shift_idx = len(self._data_format) * 4 | ||
self.region_of_interest = region_of_interest | ||
self._df: Dict[str, Any] = {} | ||
|
||
def generate_intervals(self) -> List[Interval]: | ||
intervals = [] | ||
begin = 0 | ||
end = 0 | ||
for idx, curr_chunk in enumerate(self._chunks): | ||
end += curr_chunk["chunk_size"] | ||
start_idx, end_idx = begin, end | ||
if self.region_of_interest is not None: | ||
start_idx = begin + self.region_of_interest[idx][0] | ||
end_idx = begin + self.region_of_interest[idx][1] | ||
|
||
intervals.append(Interval(begin, start_idx, end_idx, end)) | ||
begin += curr_chunk["chunk_size"] | ||
return intervals | ||
|
||
def pre_load_chunk(self, chunk_index: int, chunk_filepath: str) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a fundamental reason why we're not pre-loading or is it just for sequencing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apologies for the oversight. Thanks for pointing that out! I've made the necessary changes now to include pre-loading as suggested. |
||
"""Logic to load the chunk in background to gain some time.""" | ||
pass | ||
|
||
def load_item_from_chunk( | ||
self, | ||
index: int, | ||
chunk_index: int, | ||
chunk_filepath: str, | ||
begin: int, | ||
filesize_bytes: int, | ||
) -> Any: | ||
"""Returns an item loaded from a chunk.""" | ||
if chunk_filepath in self._chunk_filepaths and not os.path.isfile(chunk_filepath): | ||
del self._chunk_filepaths[chunk_filepath] | ||
|
||
if chunk_filepath not in self._chunk_filepaths: | ||
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size >= filesize_bytes | ||
|
||
while not exists: | ||
sleep(0.1) | ||
exists = os.path.exists(chunk_filepath) and os.stat(chunk_filepath).st_size >= filesize_bytes | ||
|
||
self._chunk_filepaths[chunk_filepath] = True | ||
|
||
return self.get_df(chunk_filepath).row(index - begin) | ||
|
||
def get_df(self, chunk_filepath: str) -> Any: | ||
import polars as pl | ||
|
||
if chunk_filepath not in self._df: | ||
self._df[chunk_filepath] = pl.scan_parquet(chunk_filepath).collect() | ||
return self._df[chunk_filepath] | ||
|
||
def delete(self, chunk_index: int, chunk_filepath: str) -> None: | ||
"""Delete a chunk from the local filesystem.""" | ||
if os.path.exists(chunk_filepath): | ||
deependujha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
os.remove(chunk_filepath) | ||
if chunk_filepath in self._df: | ||
del self._df[chunk_filepath] | ||
|
||
def encode_data(self, data: List[bytes], sizes: List[int], flattened: List[Any]) -> Any: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Index" may not be immediately clear to users imo.
Ultimately what users get is the ability to "Stream Parquet datasets", I'd have this as the title. Index is a technical detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also add a line or two explaining how big of a deal this is : ) "Stream Parquet files directly without converting them to the LitData optimized binary format" or something of this nature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've the changes. Since this PR was already merged, so the new changes are in: PR: #460