Skip to content

add incremental processing example #1101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions examples/incremental_processing/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python
"""
File Generator Script using DataChain Delta

This script demonstrates:
1. Creating numbered text files in a 'test' directory
2. Using DataChain's delta flag for incremental dataset processing

Each execution:
- Creates a new numbered file in the 'test' directory
- Updates a DataChain dataset to track these files incrementally
"""

import re
import time

from utils import generate_next_file

import datachain as dc
from datachain import C, File


def extract_file_number(file: File) -> int:
"""Extract file number from the filename."""
match = re.search(r"file-(\d+)\.txt", file.name)
if match:
return int(match.group(1))
return -1


def process_files_with_delta():
"""
Process files in the test directory using DataChain with delta mode.
This demonstrates incremental processing - only new files are processed.
"""
chain = (
dc.read_storage("test/", update=True, delta=True, delta_on="file.path")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that without explicit delta_compare it will look for all fields in schema except file.path (since it's in delta_on already) to say if file is changed or not. This means that two rows need to be identical (all fields the same) in order to not count them as "modified / changed". If it count's them as changed there is no performance gain in delta update. You usually want to set `delta_compare=["file.version", "file.etag"].

There is the case though with non-versioned sources where file.version and file.etag are randomly set every time on re-index which causes the same thing to happen regardless as it will catch everything as modified. In this cases, and in every other case where user doesn't even want or can track changed rows, workaround is to put delta_update to be the same as delta_on but we need a better way.
Options are:

  • make default delta_compare=None to disable tracking changed rows instead to look into all fields. If we go with this path then DataChain.compare() and DataChain.diff() needs to be changed as well to be consistent.
  • Add additional flag for this, e.g delta_ignore_changed.

I'm leaning more on first option, although then user then needs to explicitly set all fields in some cases (we loose "shortcut" of default being all fields). I don't have strong opinion though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I think we are fine in this particular example (?)

There is the case though with non-versioned sources where file.version and file.etag are randomly set every time on re-index

where did you experience this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ilongin please let me know ^^

Copy link
Contributor

@ilongin ilongin May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I saw it in one of our gs buckets but now I checked and it seems like it's ok. version and etag should be set to empty string if they don't exist.

Regarding your example, yea you don't need to put anything as non of the columns will be changed since you only append new files. If you would re-create files every time when calling generate_next_file then it would be a problem as for local files etag we put mtime which would mean that delta would find all files being modified every time.

.filter(C("file.path").glob("*.txt"))
.map(file_number=extract_file_number)
.map(content=lambda file: file.read_text())
.map(processed_at=lambda: time.strftime("%Y-%m-%d %H:%M:%S"))
.save(name="test_files")
)

# Show information about the dataset
print(f"\nProcessed files. Total records: {chain.count()}")
print("\nDataset versions:")
test_dataset = dc.datasets().filter(C("name") == "test_files")

for version in test_dataset.collect("version"):
print(f"- Version: {version}")

# Show the last 3 records to demonstrate the incremental processing
print("\nLatest files processed:")
chain.order_by("file_number", descending=True).limit(3).show()


if __name__ == "__main__":
# Generate a new file
new_file = generate_next_file()
print(f"Created new file: {new_file}")

# Process all new file with (delta update)
process_files_with_delta()
41 changes: 41 additions & 0 deletions examples/incremental_processing/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
"""
File Generator Helper

This helper creates numbered text files in a 'test' directory each time it runs.
The files follow the naming pattern: file-0.txt, file-1.txt, file-2.txt, etc.

Each execution, the script:

1. Creates the 'test' directory if it doesn't exist
2. Finds the highest numbered file currently present
3. Creates a new file with the next number in sequence
4. Adds timestamped content to the file
"""

import re
import time
from pathlib import Path


def generate_next_file() -> Path:
"""
Generate (appends) a new numbered text file in the 'test' directory.
"""
test_dir = Path("test")
test_dir.mkdir(exist_ok=True)

max_num = -1
for file in test_dir.glob("file-*.txt"):
if file.is_file():
match = re.search(r"file-(\d+)\.txt", file.name)
if match:
max_num = max(max_num, int(match.group(1)))

next_num = max_num + 1
new_file_path = test_dir / f"file-{next_num}.txt"
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
content = f"This is file number {next_num}\nCreated at: {timestamp}\n"
new_file_path.write_text(content)

return new_file_path
11 changes: 11 additions & 0 deletions tests/examples/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

multimodal_examples = sorted(glob.glob("examples/multimodal/**/*.py", recursive=True))

incremental_processing_examples = sorted(
glob.glob("examples/incremental_processing/delta.py", recursive=True)
)

computer_vision_examples = sorted(
[
filename
Expand Down Expand Up @@ -86,6 +90,13 @@ def test_multimodal(example):
)


@pytest.mark.examples
@pytest.mark.incremental_processing
@pytest.mark.parametrize("example", incremental_processing_examples)
def test_incremental_processing_examples(example):
smoke_test(example)


@pytest.mark.examples
@pytest.mark.computer_vision
@pytest.mark.parametrize("example", computer_vision_examples)
Expand Down