Closed
Description
Issue
When loading data using my data loader with a streaming dataset from S3, I'm receiving multiple warnings:
Skipping chunk-1-153.bin.7e0CabeA: Not a valid chunk file. It will be excluded from cache size calculation.
Skipping chunk-2-197.bin.E72711bF: Not a valid chunk file. It will be excluded from cache size calculation.
Context
- The dataset is stored in S3 within Lightning AI studio
- My account has read-only access to this data
- I have not rebuilt or modified this dataset recently
- The same code works fine with other Lightning data datasets
- The warnings originate from:
litdata/src/litdata/streaming/reader.py
Line 456 in 4f4b38b
Questions
- What causes a chunk file to be considered "not valid"?
- Why would previously working .bin files suddenly be flagged as invalid?
- Does this warning indicate a data loading problem, or is it just a calculation issue?
- Is there a way to validate/repair these chunk files?
Impact
This warning may confuse users, particularly new ones, who might think their data isn't loading at all.
Additional Information
- Lightning AI version: [version]
- Litdata version: 0.2.40 (I created the dataset with another version though, not sure which one)
- Python version: Python 3.12.9
- Operating system: Linux ip-10-192-11-192 5.15.0-1077-aws Bump coverage from 7.4.3 to 7.4.4 #84~20.04.1-Ubuntu SMP Mon Jan 20 22:14:54 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Possible Solutions
- Improve error messaging to clarify if data loading is affected
- Add documentation about chunk file validation criteria
- Provide troubleshooting steps when this warning appears
Code snippet
For what it's worth, this is the code I'm using, but as mentioned, I'm not getting the issue on a different streaming dataset.
class GroupedFastDataset(StreamingDataset):
def __init__(self, input_dir: str | None, config: DataConfig, use_augmentations: bool, *args, **kwargs):
self.input_dir = input_dir or config.input_dir
super().__init__(self.input_dir, *args, **kwargs)
self.config = config
self.serializer = JPEGSerializer()
self.processor = AutoImageProcessor.from_pretrained(
config.model_name, use_fast=True
)
# Only apply transforms during training
if use_augmentations and self.config.transforms is not None:
all_transforms = []
if "Rotate" in self.config.transforms:
all_transforms.append(v2.RandomRotation(degrees=(-180, 180)))
if "RandomCrop" in self.config.transforms:
all_transforms.append(
v2.RandomResizedCrop(
size=224,
scale=(0.8, 1.0), # Only crop 0-20% of the image
ratio=(0.75, 1.33), # Keep aspect ratio relatively balanced
antialias=True,
)
)
if "GaussianBlur" in self.config.transforms:
all_transforms.append(
v2.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 2.0))
)
if "ColorJitter" in self.config.transforms:
all_transforms.append(
v2.ColorJitter(
brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05
)
)
if "Cutout" in self.config.transforms:
all_transforms.append(
v2.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3))
)
if "GaussianNoise" in self.config.transforms:
all_transforms.append(v2.GaussianNoise(mean=0.0, sigma=0.05))
self.transforms = v2.Compose(all_transforms)
else:
self.transforms = None
def __getitem__(self, index) -> dict[str, torch.Tensor | str | bool | dict | int]:
data = super().__getitem__(index)
if self.config.byte_encoded_list_keys is not None:
for k in self.config.byte_encoded_list_keys:
data[k] = json.loads(data[k].decode("utf-8"))
if self.config.jpegs_hex_encoded:
image_bytes = [bytes.fromhex(img) for img in data["img_bytes"]]
# 1. First deserialize raw images
imgs_deserialised = [self.serializer.deserialize(img) for img in image_bytes]
# 3. Apply torchvision transforms to normalized images
if self.transforms is not None:
imgs_float = [img.float() / 255.0 for img in imgs_deserialised]
imgs_transformed = [self.transforms(img) for img in imgs_float]
processed = self.processor(
images=imgs_transformed, return_tensors="pt", do_rescale=False
)
else:
processed = self.processor(images=imgs_deserialised, return_tensors="pt")
if "provided_by_a_consumer" in data and isinstance(
data["provided_by_a_consumer"][0], np.number
):
data["provided_by_a_consumer"] = [
bool(x) for x in data["provided_by_a_consumer"]
]
return {
"images": processed["pixel_values"],
"image_ids": data["image_id"],
"num_items": int(data["content_length"]),
"work_ids": data["work_id"],
"provided_by_a_consumer": data["provided_by_a_consumer"],
"best_resolution_urls": data["best_resolution_url"],
}
train_dataset = GroupedFastDataset(
input_dir=config.data.train_s3_path, config=config.data, use_augmentations=True
)
dataloader = StreamingDataLoader(
train_dataset,
batch_size=16,
collate_fn=my_collate_fn,
pin_memory=True,
num_workers=8,
shuffle=True,
drop_last=True,
)
Activity