Skip to content

Conversation

@saggl
Copy link

@saggl saggl commented Nov 5, 2025

What this does

Adds real-time video encoding capability to LeRobot dataset recording, eliminating post-episode encoding delays and significantly improving data collection workflow efficiency.

This PR introduces a new RealtimeVideoEncoder class that streams frames directly to ffmpeg during recording instead of saving PNGs and batch-encoding videos after each episode. Key improvements:

  • Real-time encoding: Frames are encoded to video concurrently with data capture using ffmpeg subprocess and background threading
  • Reduced I/O: Eliminates intermediate PNG file writes when real-time encoding is enabled
  • Backward compatible: Traditional batch encoding remains the default; real-time encoding is opt-in via --dataset.realtime_encoding=true

Changes:

  • Add RealtimeVideoEncoder class in video_utils.py with queue-based frame buffering
  • Integrate encoder lifecycle management into LeRobotDataset (init, add_frame, finalize)
  • Add realtime_encoding and video_codec config parameters to lerobot_record.py
  • Skip PNG saving and image writer initialization when using real-time encoding
  • Update episode stats computation to handle missing image data during real-time encoding

How it was tested

  • Manual testing: Recorded datasets with real-time encoding enabled on macOS with multiple cameras
  • Codec testing: Verified libsvtav1 encoders work correctly
  • Backward compatibility: Confirmed traditional PNG+batch-encoding workflow still works (default behavior)

How to checkout & try? (for the reviewer)

# Record a dataset with real-time encoding enabled
lerobot-record \
    --dataset.realtime_encoding=true

Add RealtimeVideoEncoder class that encodes video frames in real-time using
ffmpeg subprocess, eliminating post-episode encoding delays during data collection.

Changes:
- Add RealtimeVideoEncoder class in video_utils.py with hardware acceleration support
- Integrate real-time encoding into LeRobotDataset with encoder lifecycle management
- Add realtime_encoding and video_codec config options to lerobot_record.py
- Skip PNG saving and image writer when using real-time encoding
- Support multiple video codecs (libsvtav1, h264, hevc, videotoolbox variants)

The real-time encoder streams frames directly to ffmpeg via stdin using a background
thread and queue, enabling encoding to happen concurrently with data capture. This
significantly improves recording workflow efficiency.
Copilot AI review requested due to automatic review settings November 5, 2025 21:05
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces real-time video encoding during dataset recording to eliminate post-episode encoding delays. Instead of saving PNG frames and batch-encoding videos after each episode, frames are now streamed directly to ffmpeg during capture.

Key changes:

  • Adds RealtimeVideoEncoder class for streaming frames to ffmpeg subprocess with background threading
  • Integrates real-time encoding into LeRobotDataset as an opt-in feature via --dataset.realtime_encoding=true
  • Skips PNG file I/O when real-time encoding is enabled

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
lerobot_record.py Adds configuration parameters for real-time encoding and video codec selection
video_utils.py Implements RealtimeVideoEncoder class with queue-based frame buffering and ffmpeg subprocess management
lerobot_dataset.py Integrates encoder lifecycle management (init, add_frame, finalize) and modifies episode saving logic

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


def _start_writer_thread(self) -> None:
"""Start the background thread for writing frames."""
self.writer_thread = threading.Thread(target=self._writer_thread_func, daemon=True)
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a daemon thread for the writer could cause data loss if the main thread exits before encoding completes. The thread should be non-daemon and explicitly joined during finalize() to ensure all frames are written before program termination.

Copilot uses AI. Check for mistakes.
Copy link
Author

@saggl saggl Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! However, the thread is explicitly joined during finalize() (line 215):

  • Wait for all frames to be written and stdin to be closed by writer thread
  • The daemon=True flag is intentional as a safety mechanism, not a bug
  • In all cases, finalize() signals the thread to stop (line 211), waits up to 10 seconds for it to finish writing (line 215), then waits for ffmpeg to complete (line 222). All
    frames are written.

# stdin is already closed by writer thread, so we use wait() instead of communicate()
# to avoid "flush of closed file" error
try:
returncode = self.ffmpeg_process.wait(timeout=30.0)
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 30-second timeout for ffmpeg process completion is a magic number. Consider making this configurable based on expected video length or defining it as a named constant (e.g., FFMPEG_FINALIZE_TIMEOUT_S = 30.0).

Copilot uses AI. Check for mistakes.
Sagstetter, Christian (GDE-EDU7) added 4 commits November 6, 2025 10:41
Refactors realtime encoding to eliminate duplication and improve error handling.

Changes:
- Extract shared video file placement logic (_process_video_file_placement)
- Expose encoding params (video_crf, video_preset, max_queue_size) as config
- Fix stats computation by temporarily storing image arrays
- Add graceful fallback to PNG saving on encoder failures
- Fix phantom file paths (use empty strings instead)
- Return bool from _add_frame_to_realtime_encoder instead of raising

Eliminates ~80 lines of duplication and fixes training pipeline compatibility.
@saggl saggl requested a review from Copilot November 6, 2025 11:10
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


def _start_writer_thread(self) -> None:
"""Start the background thread for writing frames."""
self.writer_thread = threading.Thread(target=self._writer_thread_func, daemon=True)
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a daemon thread can cause data loss if the main program exits before all frames are written. Consider using a non-daemon thread and ensuring proper cleanup in the finalize method or use a context manager to guarantee thread completion.

Copilot uses AI. Check for mistakes.
self.realtime_encoding = False
# Initialize image writer if not already started
if self.image_writer is None:
self.start_image_writer(num_processes=0, num_threads=4)
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback to image writer uses hardcoded values (num_processes=0, num_threads=4). Consider making these configurable or using the original configuration values that were passed to the dataset during initialization.

Copilot uses AI. Check for mistakes.
Comment on lines +1342 to +1344
# Transpose to (N, C, H, W) for stats computation
if image_array.ndim == 4 and image_array.shape[-1] == 3:
image_array = np.transpose(image_array, (0, 3, 1, 2))
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transpose operation assumes images are in (N, H, W, C) format, but doesn't validate that the channel dimension is actually the last dimension. If images are already in (N, C, H, W) format, this will incorrectly transpose them. Add a check to ensure the array is in the expected format before transposing.

Suggested change
# Transpose to (N, C, H, W) for stats computation
if image_array.ndim == 4 and image_array.shape[-1] == 3:
image_array = np.transpose(image_array, (0, 3, 1, 2))
# Ensure image_array is in (N, C, H, W) format for stats computation
if image_array.ndim == 4:
if image_array.shape[-1] == 3 and image_array.shape[1] != 3:
# (N, H, W, C) -> (N, C, H, W)
image_array = np.transpose(image_array, (0, 3, 1, 2))
elif image_array.shape[1] == 3 and image_array.shape[-1] != 3:
# Already (N, C, H, W), do nothing
pass
elif image_array.shape[-1] == 3 and image_array.shape[1] == 3:
logging.warning(
f"Ambiguous image array shape {image_array.shape} for key '{original_key}'. "
"Assuming (N, H, W, C) and transposing, but please check your data."
)
image_array = np.transpose(image_array, (0, 3, 1, 2))
else:
raise ValueError(
f"Unexpected image array shape {image_array.shape} for key '{original_key}'. "
"Expected (N, H, W, C) or (N, C, H, W) with C=3."
)

Copilot uses AI. Check for mistakes.
@saggl saggl requested a review from Copilot November 6, 2025 20:48
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

)
except Exception as e:
self.error = e
logging.error(f"Failed to start ffmpeg: {e}")
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message when ffmpeg fails to start should include the command that was attempted. This would help users diagnose issues with codec availability or ffmpeg installation. Consider: logging.error(f\"Failed to start ffmpeg with command: {' '.join(cmd)}. Error: {e}\")

Suggested change
logging.error(f"Failed to start ffmpeg: {e}")
logging.error(f"Failed to start ffmpeg with command: {' '.join(cmd)}. Error: {e}")

Copilot uses AI. Check for mistakes.
try:
self.frame_queue.put(frame, timeout=FRAME_QUEUE_PUT_TIMEOUT_S)
except queue.Full:
logging.warning("Frame queue is full, frame encoding may be lagging behind capture")
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning message should include actionable information. Consider adding: 'Consider increasing max_queue_size or using a faster codec/preset' to help users resolve the issue.

Suggested change
logging.warning("Frame queue is full, frame encoding may be lagging behind capture")
logging.warning(
"Frame queue is full, frame encoding may be lagging behind capture. "
"Consider increasing max_queue_size or using a faster codec/preset to resolve this issue."
)

Copilot uses AI. Check for mistakes.
if not self.features[key].get("info", None):
video_path = self.root / self.video_path.format(video_key=key, chunk_index=0, file_index=0)
video_path = self.root / self.video_path.format(
video_key=video_key, chunk_index=0, file_index=0
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable video_key used in line 435 is incorrect. The parameter name is video_key but the method was called with video_key=key in line 432, where key is the loop variable. This should use key instead of video_key to match the loop variable: video_key=key.

Suggested change
video_key=video_key, chunk_index=0, file_index=0
video_key=key, chunk_index=0, file_index=0

Copilot uses AI. Check for mistakes.
Comment on lines +1291 to +1294
temp_stat_keys = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")]
for k in temp_stat_keys:
del episode_buffer[k]

Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code deletes temporary stat keys before validation, but the same cleanup is repeated at lines 1358-1360. The first deletion appears premature since the temporary arrays are needed for computing episode stats later (lines 1341-1355). This first deletion should be removed to avoid redundant cleanup and ensure stats can be computed correctly.

Suggested change
temp_stat_keys = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")]
for k in temp_stat_keys:
del episode_buffer[k]

Copilot uses AI. Check for mistakes.
Comment on lines +1358 to +1360
keys_to_remove = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")]
for k in keys_to_remove:
del episode_buffer[k]
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable keys_to_remove is redundant since temp_stat_keys was already computed at line 1291 with the same logic and could be reused here if the earlier deletion at lines 1291-1293 is removed.

Suggested change
keys_to_remove = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")]
for k in keys_to_remove:
del episode_buffer[k]

Copilot uses AI. Check for mistakes.
Comment on lines +1341 to +1355
for k in episode_buffer:
if k.endswith("_arrays_for_stats"):
original_key = k.replace("_arrays_for_stats", "")
# Stack arrays into (N, H, W, C) format
image_array = np.stack(episode_buffer[k])
# Transpose to (N, C, H, W) for stats computation
if image_array.ndim == 4 and image_array.shape[-1] == 3:
image_array = np.transpose(image_array, (0, 3, 1, 2))
# Compute stats with axis=(0,2,3) for per-channel stats
ep_stats[original_key] = get_feature_stats(image_array, axis=(0, 2, 3), keepdims=True)
# Normalize to [0,1] range like in compute_episode_stats
ep_stats[original_key] = {
k: v if k == "count" else np.squeeze(v / 255.0, axis=0)
for k, v in ep_stats[original_key].items()
}
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop iterates over all keys in episode_buffer to find stats keys, but these keys were already identified at line 1291 as temp_stat_keys. Reuse that list instead: for k in temp_stat_keys: to avoid redundant iteration and string matching.

Suggested change
for k in episode_buffer:
if k.endswith("_arrays_for_stats"):
original_key = k.replace("_arrays_for_stats", "")
# Stack arrays into (N, H, W, C) format
image_array = np.stack(episode_buffer[k])
# Transpose to (N, C, H, W) for stats computation
if image_array.ndim == 4 and image_array.shape[-1] == 3:
image_array = np.transpose(image_array, (0, 3, 1, 2))
# Compute stats with axis=(0,2,3) for per-channel stats
ep_stats[original_key] = get_feature_stats(image_array, axis=(0, 2, 3), keepdims=True)
# Normalize to [0,1] range like in compute_episode_stats
ep_stats[original_key] = {
k: v if k == "count" else np.squeeze(v / 255.0, axis=0)
for k, v in ep_stats[original_key].items()
}
for k in temp_stat_keys:
original_key = k.replace("_arrays_for_stats", "")
# Stack arrays into (N, H, W, C) format
image_array = np.stack(episode_buffer[k])
# Transpose to (N, C, H, W) for stats computation
if image_array.ndim == 4 and image_array.shape[-1] == 3:
image_array = np.transpose(image_array, (0, 3, 1, 2))
# Compute stats with axis=(0,2,3) for per-channel stats
ep_stats[original_key] = get_feature_stats(image_array, axis=(0, 2, 3), keepdims=True)
# Normalize to [0,1] range like in compute_episode_stats
ep_stats[original_key] = {
k: v if k == "count" else np.squeeze(v / 255.0, axis=0)
for k, v in ep_stats[original_key].items()
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant