Skip to content

Conversation

@fracapuano
Copy link
Collaborator

@fracapuano fracapuano commented Nov 6, 2025

What this does

This feature implements a highly scalable, parallel algorithm for merging a collection of input datasets into a single, concatenated dataset. The implementation is designed for a single, shared-memory machine (multi-core processor). However, the primitive dataset aggregation kernel _aggregate_datasets may as well be used in a distributed setting with datatrove.

Core Design

The algorithm uses tree-rased reduction.
Instead of a simple parallel "map" followed by a single-threaded "reduce" (which would be an $O(k)$ bottleneck, where $k$ is the number of datasets), we instead use a tree-based reduction.

The merge process is broken down into a series of dynamically-created tasks managed by a central thread pool and task queue. This ensures workers are constantly operating on hierarchically more aggregated datasets.

Phase 1 (Local Merges)

The initial set of lists is partitioned, and "Level 0" tasks (e.g., aggregate(d1, d2), aggregate(d3, d4)) are added to the task queue. Then, tasks are dispatched to workers.

Dynamic Task Creation

A pool of worker threads grabs these tasks. When a thread completes a task (e.g., producing partial_result_1), it coordinates with other completed tasks to create and enqueue a new "Level 1" task (e.g., merge(partial_result_1, partial_result_2)).

Parallel Aggregation

This process repeats, with threads continuously grabbing the next available merge task from any level of the tree. This ensures all CPU cores remain fully saturated.
The total aggregation time scales logarithmically, $O( \log k )$.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds parallel dataset merging capability to the LeRobot dataset aggregation system and fixes several issues with dataset creation and merging logic.

  • Introduces a num_workers parameter to enable parallel dataset aggregation using a tree-based reduction strategy
  • Fixes incorrect data file index mapping during aggregation by tracking actual destination files used
  • Replaces the previous pattern of merging default features with provided features with a validation check ensuring default features are present

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
src/lerobot/scripts/lerobot_edit_dataset.py Adds num_workers configuration parameter to the merge operation config and passes it to the merge function
src/lerobot/datasets/lerobot_dataset.py Changes feature merging logic from automatic merge to validation that default features are already present in provided features
src/lerobot/datasets/dataset_tools.py Adds num_workers parameter to merge_datasets function and contains an incorrect Path constructor call
src/lerobot/datasets/aggregate.py Implements parallel aggregation with ThreadPoolExecutor, adds file mapping logic to correctly track destination files during concatenation, and refactors the original function into _aggregate_datasets

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if repo_id is None:
repo_id = f"{dataset.repo_id}_modified"
output_dir = Path(output_dir) if output_dir is not None else HF_LEROBOT_HOME / repo_id
output_dir = Path(output_dir, exists_ok=True) if output_dir is not None else HF_LEROBOT_HOME / repo_id
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Path constructor does not accept an exists_ok parameter. This parameter belongs to the Path.mkdir() method. This should be Path(output_dir) without the exists_ok argument.

Suggested change
output_dir = Path(output_dir, exists_ok=True) if output_dir is not None else HF_LEROBOT_HOME / repo_id
output_dir = Path(output_dir) if output_dir is not None else HF_LEROBOT_HOME / repo_id

Copilot uses AI. Check for mistakes.

features = {**features, **DEFAULT_FEATURES}
assert set(DEFAULT_FEATURES.keys()).issubset(set(features.keys())), (
f"Default features must be a subset of the provided features! Got {set(features.keys())} and {set(DEFAULT_FEATURES.keys())}"
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message format is confusing - it shows the features first and default features second, which is opposite to the assertion order and the message text. Consider: f\"Default features {set(DEFAULT_FEATURES.keys())} must be present in the provided features {set(features.keys())}\"

Suggested change
f"Default features must be a subset of the provided features! Got {set(features.keys())} and {set(DEFAULT_FEATURES.keys())}"
f"Default features {set(DEFAULT_FEATURES.keys())} must be present in the provided features {set(features.keys())}"

Copilot uses AI. Check for mistakes.
Comment on lines +297 to +307
if num_workers is None or num_workers <= 1:
# Run aggregation sequentially
_aggregate_datasets(
repo_ids=repo_ids,
aggr_repo_id=aggr_repo_id,
aggr_root=aggr_root,
roots=roots,
data_files_size_in_mb=data_files_size_in_mb,
video_files_size_in_mb=video_files_size_in_mb,
chunk_size=chunk_size,
)
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequential path (when num_workers is None or <= 1) does not return a value or log completion, but the parallel path returns explicitly at line 417. This creates inconsistent behavior. Either add a return statement after line 307, or move the logging and return after the entire if/elif block to ensure both paths behave consistently.

Copilot uses AI. Check for mistakes.
)

# Uses a parallel fan-out/fan-in strategy when num_workers is provided
elif num_workers > 1:
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is always true, because of this condition.

Suggested change
elif num_workers > 1:
else:

Copilot uses AI. Check for mistakes.
@fracapuano fracapuano added enhancement Suggestions for new features or improvements dataset Issues regarding data inputs, processing, or datasets performance Issues aimed at improving speed or resource usage labels Nov 6, 2025
@fracapuano fracapuano self-assigned this Nov 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataset Issues regarding data inputs, processing, or datasets enhancement Suggestions for new features or improvements performance Issues aimed at improving speed or resource usage

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants