-
Notifications
You must be signed in to change notification settings - Fork 3k
Add Distributed, Parallel Dataset Merging #2391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… then we raise. this is a first step towards standardazing the dataset format, or otherwise (as it is now) everything would be allowed
…ee-based thread pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds parallel dataset merging capability to the LeRobot dataset aggregation system and fixes several issues with dataset creation and merging logic.
- Introduces a
num_workersparameter to enable parallel dataset aggregation using a tree-based reduction strategy - Fixes incorrect data file index mapping during aggregation by tracking actual destination files used
- Replaces the previous pattern of merging default features with provided features with a validation check ensuring default features are present
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/lerobot/scripts/lerobot_edit_dataset.py | Adds num_workers configuration parameter to the merge operation config and passes it to the merge function |
| src/lerobot/datasets/lerobot_dataset.py | Changes feature merging logic from automatic merge to validation that default features are already present in provided features |
| src/lerobot/datasets/dataset_tools.py | Adds num_workers parameter to merge_datasets function and contains an incorrect Path constructor call |
| src/lerobot/datasets/aggregate.py | Implements parallel aggregation with ThreadPoolExecutor, adds file mapping logic to correctly track destination files during concatenation, and refactors the original function into _aggregate_datasets |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if repo_id is None: | ||
| repo_id = f"{dataset.repo_id}_modified" | ||
| output_dir = Path(output_dir) if output_dir is not None else HF_LEROBOT_HOME / repo_id | ||
| output_dir = Path(output_dir, exists_ok=True) if output_dir is not None else HF_LEROBOT_HOME / repo_id |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Path constructor does not accept an exists_ok parameter. This parameter belongs to the Path.mkdir() method. This should be Path(output_dir) without the exists_ok argument.
| output_dir = Path(output_dir, exists_ok=True) if output_dir is not None else HF_LEROBOT_HOME / repo_id | |
| output_dir = Path(output_dir) if output_dir is not None else HF_LEROBOT_HOME / repo_id |
|
|
||
| features = {**features, **DEFAULT_FEATURES} | ||
| assert set(DEFAULT_FEATURES.keys()).issubset(set(features.keys())), ( | ||
| f"Default features must be a subset of the provided features! Got {set(features.keys())} and {set(DEFAULT_FEATURES.keys())}" |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message format is confusing - it shows the features first and default features second, which is opposite to the assertion order and the message text. Consider: f\"Default features {set(DEFAULT_FEATURES.keys())} must be present in the provided features {set(features.keys())}\"
| f"Default features must be a subset of the provided features! Got {set(features.keys())} and {set(DEFAULT_FEATURES.keys())}" | |
| f"Default features {set(DEFAULT_FEATURES.keys())} must be present in the provided features {set(features.keys())}" |
| if num_workers is None or num_workers <= 1: | ||
| # Run aggregation sequentially | ||
| _aggregate_datasets( | ||
| repo_ids=repo_ids, | ||
| aggr_repo_id=aggr_repo_id, | ||
| aggr_root=aggr_root, | ||
| roots=roots, | ||
| data_files_size_in_mb=data_files_size_in_mb, | ||
| video_files_size_in_mb=video_files_size_in_mb, | ||
| chunk_size=chunk_size, | ||
| ) |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sequential path (when num_workers is None or <= 1) does not return a value or log completion, but the parallel path returns explicitly at line 417. This creates inconsistent behavior. Either add a return statement after line 307, or move the logging and return after the entire if/elif block to ensure both paths behave consistently.
| ) | ||
|
|
||
| # Uses a parallel fan-out/fan-in strategy when num_workers is provided | ||
| elif num_workers > 1: |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is always true, because of this condition.
| elif num_workers > 1: | |
| else: |
What this does
This feature implements a highly scalable, parallel algorithm for merging a collection of input datasets into a single, concatenated dataset. The implementation is designed for a single, shared-memory machine (multi-core processor). However, the primitive dataset aggregation kernel
_aggregate_datasetsmay as well be used in a distributed setting withdatatrove.Core Design
The algorithm uses tree-rased reduction.$O(k)$ bottleneck, where $k$ is the number of datasets), we instead use a tree-based reduction.
Instead of a simple parallel "map" followed by a single-threaded "reduce" (which would be an
The merge process is broken down into a series of dynamically-created tasks managed by a central thread pool and task queue. This ensures workers are constantly operating on hierarchically more aggregated datasets.
Phase 1 (Local Merges)
The initial set of lists is partitioned, and "Level 0" tasks (e.g.,
aggregate(d1, d2),aggregate(d3, d4)) are added to the task queue. Then, tasks are dispatched to workers.Dynamic Task Creation
A pool of worker threads grabs these tasks. When a thread completes a task (e.g., producing partial_result_1), it coordinates with other completed tasks to create and enqueue a new "Level 1" task (e.g., merge(partial_result_1, partial_result_2)).
Parallel Aggregation
This process repeats, with threads continuously grabbing the next available merge task from any level of the tree. This ensures all CPU cores remain fully saturated.$O( \log k )$ .
The total aggregation time scales logarithmically,