Skip to content

fix: multi-node parquet indexing #583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

deependujha
Copy link
Collaborator

Before submitting
  • Was this discussed/agreed via a Github issue? (no need for typos and docs improvements)
  • Did you read the contributor guideline, Pull Request section?
  • Did you make sure to update the docs?
  • Did you write any new necessary tests?

What does this PR do?

Fixes #578

PR review

Anyone in the community is free to review the PR once the tests have passed.
If we didn't discuss your PR in GitHub issues there's a high chance it will not be merged.

Did you have fun?

Make sure you had fun coding 🙃

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes issues with multi-node Parquet dataset indexing by introducing distributed synchronization using a new barrier utility and updating indexing logic across the Hugging Face dataset and Parquet writer modules.

  • Introduces a new maybe_barrier function in torch_utils to synchronize distributed processes.
  • Updates the index_hf_dataset function to better handle multi-node environments via _DistributedEnv and barrier synchronization.
  • Adjusts the index_parquet_dataset function in the streaming writer to ensure only one process writes the index in a distributed setting.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
src/litdata/utilities/torch_utils.py Added a new maybe_barrier function for distributed process sync.
src/litdata/utilities/hf_dataset.py Updated distributed synchronization and index retrieval logic.
src/litdata/streaming/writer.py Integrated maybe_barrier and refined index creation in distributed mode.
Comments suppressed due to low confidence (1)

src/litdata/utilities/hf_dataset.py:39

  • The condition for reusing the existing index may fail in scenarios where nodes have an unequal number of processes. Consider refactoring this check to use a more robust distributed coordination mechanism.
if (env.num_nodes == 1 and env.global_rank == 0) or (env.num_nodes > 1 and env.global_rank % env.num_nodes == 0):

Copy link

codecov bot commented May 6, 2025

Codecov Report

Attention: Patch coverage is 92.72727% with 4 lines in your changes missing coverage. Please review.

Project coverage is 79%. Comparing base (76ec34b) to head (337f7a3).
Report is 1 commits behind head on main.

Additional details and impacted files
@@         Coverage Diff         @@
##           main   #583   +/-   ##
===================================
- Coverage    79%    79%   -0%     
===================================
  Files        40     41    +1     
  Lines      6112   6135   +23     
===================================
+ Hits       4819   4835   +16     
- Misses     1293   1300    +7     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@deependujha deependujha requested a review from Copilot May 6, 2025 18:18
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes issues with multi-node indexing for Parquet datasets by improving process synchronization and avoiding concurrent index creation. Key changes include:

  • Adding synchronization utilities (maybe_barrier and is_local_rank_0) in torch_utils.
  • Updating the HF dataset indexing logic to use these utilities.
  • Adjusting the Parquet dataset writer to ensure proper barrier synchronization in distributed settings.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
src/litdata/utilities/torch_utils.py Added utility functions for distributed synchronization.
src/litdata/utilities/hf_dataset.py Updated cache index creation and synchronization via barriers.
src/litdata/streaming/writer.py Modified distributed index creation with barrier synchronization.
Comments suppressed due to low confidence (1)

src/litdata/utilities/hf_dataset.py:34

  • Consider inserting a barrier (maybe_barrier()) before returning cache_directory to ensure that all processes are synchronized and the index is fully available before any process proceeds.
if cache_directory:

Copy link
Collaborator

@bhimrazy bhimrazy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, @deependujha. It seems like a more solid solution.

Maybe, in case it works as a decorator, we could introduce another decorator, something like local_rank_zero_only, inspired by this.

@bhimrazy bhimrazy requested a review from Borda May 7, 2025 08:58
@deependujha
Copy link
Collaborator Author

I also thought of decorator inspired by lightning, but, right now, only interested in blocking a segment of code, and not a function.

Maybe in future, we might introduce it.

@deependujha
Copy link
Collaborator Author

For the failing docs in CI

Issue is open in Sphinx repo: sphinx-doc/sphinx#13533

@deependujha deependujha merged commit 716af8b into Lightning-AI:main May 11, 2025
32 checks passed
@deependujha deependujha deleted the fix/multi-node-parquet-indexing branch May 11, 2025 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

There is still a risk of race condition when indexing an s3 bucket with multi node (Parquet datasets)
2 participants