-
Notifications
You must be signed in to change notification settings - Fork 3k
Adds real-time video encoding during recording #2390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add RealtimeVideoEncoder class that encodes video frames in real-time using ffmpeg subprocess, eliminating post-episode encoding delays during data collection. Changes: - Add RealtimeVideoEncoder class in video_utils.py with hardware acceleration support - Integrate real-time encoding into LeRobotDataset with encoder lifecycle management - Add realtime_encoding and video_codec config options to lerobot_record.py - Skip PNG saving and image writer when using real-time encoding - Support multiple video codecs (libsvtav1, h264, hevc, videotoolbox variants) The real-time encoder streams frames directly to ffmpeg via stdin using a background thread and queue, enabling encoding to happen concurrently with data capture. This significantly improves recording workflow efficiency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces real-time video encoding during dataset recording to eliminate post-episode encoding delays. Instead of saving PNG frames and batch-encoding videos after each episode, frames are now streamed directly to ffmpeg during capture.
Key changes:
- Adds
RealtimeVideoEncoderclass for streaming frames to ffmpeg subprocess with background threading - Integrates real-time encoding into
LeRobotDatasetas an opt-in feature via--dataset.realtime_encoding=true - Skips PNG file I/O when real-time encoding is enabled
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| lerobot_record.py | Adds configuration parameters for real-time encoding and video codec selection |
| video_utils.py | Implements RealtimeVideoEncoder class with queue-based frame buffering and ffmpeg subprocess management |
| lerobot_dataset.py | Integrates encoder lifecycle management (init, add_frame, finalize) and modifies episode saving logic |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| def _start_writer_thread(self) -> None: | ||
| """Start the background thread for writing frames.""" | ||
| self.writer_thread = threading.Thread(target=self._writer_thread_func, daemon=True) |
Copilot
AI
Nov 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a daemon thread for the writer could cause data loss if the main thread exits before encoding completes. The thread should be non-daemon and explicitly joined during finalize() to ensure all frames are written before program termination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! However, the thread is explicitly joined during finalize() (line 215):
- Wait for all frames to be written and stdin to be closed by writer thread
- The daemon=True flag is intentional as a safety mechanism, not a bug
- In all cases, finalize() signals the thread to stop (line 211), waits up to 10 seconds for it to finish writing (line 215), then waits for ffmpeg to complete (line 222). All
frames are written.
src/lerobot/datasets/video_utils.py
Outdated
| # stdin is already closed by writer thread, so we use wait() instead of communicate() | ||
| # to avoid "flush of closed file" error | ||
| try: | ||
| returncode = self.ffmpeg_process.wait(timeout=30.0) |
Copilot
AI
Nov 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 30-second timeout for ffmpeg process completion is a magic number. Consider making this configurable based on expected video length or defining it as a named constant (e.g., FFMPEG_FINALIZE_TIMEOUT_S = 30.0).
Refactors realtime encoding to eliminate duplication and improve error handling. Changes: - Extract shared video file placement logic (_process_video_file_placement) - Expose encoding params (video_crf, video_preset, max_queue_size) as config - Fix stats computation by temporarily storing image arrays - Add graceful fallback to PNG saving on encoder failures - Fix phantom file paths (use empty strings instead) - Return bool from _add_frame_to_realtime_encoder instead of raising Eliminates ~80 lines of duplication and fixes training pipeline compatibility.
…better maintainability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| def _start_writer_thread(self) -> None: | ||
| """Start the background thread for writing frames.""" | ||
| self.writer_thread = threading.Thread(target=self._writer_thread_func, daemon=True) |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a daemon thread can cause data loss if the main program exits before all frames are written. Consider using a non-daemon thread and ensuring proper cleanup in the finalize method or use a context manager to guarantee thread completion.
| self.realtime_encoding = False | ||
| # Initialize image writer if not already started | ||
| if self.image_writer is None: | ||
| self.start_image_writer(num_processes=0, num_threads=4) |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback to image writer uses hardcoded values (num_processes=0, num_threads=4). Consider making these configurable or using the original configuration values that were passed to the dataset during initialization.
| # Transpose to (N, C, H, W) for stats computation | ||
| if image_array.ndim == 4 and image_array.shape[-1] == 3: | ||
| image_array = np.transpose(image_array, (0, 3, 1, 2)) |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transpose operation assumes images are in (N, H, W, C) format, but doesn't validate that the channel dimension is actually the last dimension. If images are already in (N, C, H, W) format, this will incorrectly transpose them. Add a check to ensure the array is in the expected format before transposing.
| # Transpose to (N, C, H, W) for stats computation | |
| if image_array.ndim == 4 and image_array.shape[-1] == 3: | |
| image_array = np.transpose(image_array, (0, 3, 1, 2)) | |
| # Ensure image_array is in (N, C, H, W) format for stats computation | |
| if image_array.ndim == 4: | |
| if image_array.shape[-1] == 3 and image_array.shape[1] != 3: | |
| # (N, H, W, C) -> (N, C, H, W) | |
| image_array = np.transpose(image_array, (0, 3, 1, 2)) | |
| elif image_array.shape[1] == 3 and image_array.shape[-1] != 3: | |
| # Already (N, C, H, W), do nothing | |
| pass | |
| elif image_array.shape[-1] == 3 and image_array.shape[1] == 3: | |
| logging.warning( | |
| f"Ambiguous image array shape {image_array.shape} for key '{original_key}'. " | |
| "Assuming (N, H, W, C) and transposing, but please check your data." | |
| ) | |
| image_array = np.transpose(image_array, (0, 3, 1, 2)) | |
| else: | |
| raise ValueError( | |
| f"Unexpected image array shape {image_array.shape} for key '{original_key}'. " | |
| "Expected (N, H, W, C) or (N, C, H, W) with C=3." | |
| ) |
…r for cleaner data handling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
| except Exception as e: | ||
| self.error = e | ||
| logging.error(f"Failed to start ffmpeg: {e}") |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message when ffmpeg fails to start should include the command that was attempted. This would help users diagnose issues with codec availability or ffmpeg installation. Consider: logging.error(f\"Failed to start ffmpeg with command: {' '.join(cmd)}. Error: {e}\")
| logging.error(f"Failed to start ffmpeg: {e}") | |
| logging.error(f"Failed to start ffmpeg with command: {' '.join(cmd)}. Error: {e}") |
| try: | ||
| self.frame_queue.put(frame, timeout=FRAME_QUEUE_PUT_TIMEOUT_S) | ||
| except queue.Full: | ||
| logging.warning("Frame queue is full, frame encoding may be lagging behind capture") |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The warning message should include actionable information. Consider adding: 'Consider increasing max_queue_size or using a faster codec/preset' to help users resolve the issue.
| logging.warning("Frame queue is full, frame encoding may be lagging behind capture") | |
| logging.warning( | |
| "Frame queue is full, frame encoding may be lagging behind capture. " | |
| "Consider increasing max_queue_size or using a faster codec/preset to resolve this issue." | |
| ) |
| if not self.features[key].get("info", None): | ||
| video_path = self.root / self.video_path.format(video_key=key, chunk_index=0, file_index=0) | ||
| video_path = self.root / self.video_path.format( | ||
| video_key=video_key, chunk_index=0, file_index=0 |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable video_key used in line 435 is incorrect. The parameter name is video_key but the method was called with video_key=key in line 432, where key is the loop variable. This should use key instead of video_key to match the loop variable: video_key=key.
| video_key=video_key, chunk_index=0, file_index=0 | |
| video_key=key, chunk_index=0, file_index=0 |
| temp_stat_keys = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")] | ||
| for k in temp_stat_keys: | ||
| del episode_buffer[k] | ||
|
|
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code deletes temporary stat keys before validation, but the same cleanup is repeated at lines 1358-1360. The first deletion appears premature since the temporary arrays are needed for computing episode stats later (lines 1341-1355). This first deletion should be removed to avoid redundant cleanup and ensure stats can be computed correctly.
| temp_stat_keys = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")] | |
| for k in temp_stat_keys: | |
| del episode_buffer[k] |
| keys_to_remove = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")] | ||
| for k in keys_to_remove: | ||
| del episode_buffer[k] |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable keys_to_remove is redundant since temp_stat_keys was already computed at line 1291 with the same logic and could be reused here if the earlier deletion at lines 1291-1293 is removed.
| keys_to_remove = [k for k in list(episode_buffer.keys()) if k.endswith("_arrays_for_stats")] | |
| for k in keys_to_remove: | |
| del episode_buffer[k] |
| for k in episode_buffer: | ||
| if k.endswith("_arrays_for_stats"): | ||
| original_key = k.replace("_arrays_for_stats", "") | ||
| # Stack arrays into (N, H, W, C) format | ||
| image_array = np.stack(episode_buffer[k]) | ||
| # Transpose to (N, C, H, W) for stats computation | ||
| if image_array.ndim == 4 and image_array.shape[-1] == 3: | ||
| image_array = np.transpose(image_array, (0, 3, 1, 2)) | ||
| # Compute stats with axis=(0,2,3) for per-channel stats | ||
| ep_stats[original_key] = get_feature_stats(image_array, axis=(0, 2, 3), keepdims=True) | ||
| # Normalize to [0,1] range like in compute_episode_stats | ||
| ep_stats[original_key] = { | ||
| k: v if k == "count" else np.squeeze(v / 255.0, axis=0) | ||
| for k, v in ep_stats[original_key].items() | ||
| } |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop iterates over all keys in episode_buffer to find stats keys, but these keys were already identified at line 1291 as temp_stat_keys. Reuse that list instead: for k in temp_stat_keys: to avoid redundant iteration and string matching.
| for k in episode_buffer: | |
| if k.endswith("_arrays_for_stats"): | |
| original_key = k.replace("_arrays_for_stats", "") | |
| # Stack arrays into (N, H, W, C) format | |
| image_array = np.stack(episode_buffer[k]) | |
| # Transpose to (N, C, H, W) for stats computation | |
| if image_array.ndim == 4 and image_array.shape[-1] == 3: | |
| image_array = np.transpose(image_array, (0, 3, 1, 2)) | |
| # Compute stats with axis=(0,2,3) for per-channel stats | |
| ep_stats[original_key] = get_feature_stats(image_array, axis=(0, 2, 3), keepdims=True) | |
| # Normalize to [0,1] range like in compute_episode_stats | |
| ep_stats[original_key] = { | |
| k: v if k == "count" else np.squeeze(v / 255.0, axis=0) | |
| for k, v in ep_stats[original_key].items() | |
| } | |
| for k in temp_stat_keys: | |
| original_key = k.replace("_arrays_for_stats", "") | |
| # Stack arrays into (N, H, W, C) format | |
| image_array = np.stack(episode_buffer[k]) | |
| # Transpose to (N, C, H, W) for stats computation | |
| if image_array.ndim == 4 and image_array.shape[-1] == 3: | |
| image_array = np.transpose(image_array, (0, 3, 1, 2)) | |
| # Compute stats with axis=(0,2,3) for per-channel stats | |
| ep_stats[original_key] = get_feature_stats(image_array, axis=(0, 2, 3), keepdims=True) | |
| # Normalize to [0,1] range like in compute_episode_stats | |
| ep_stats[original_key] = { | |
| k: v if k == "count" else np.squeeze(v / 255.0, axis=0) | |
| for k, v in ep_stats[original_key].items() | |
| } |
What this does
Adds real-time video encoding capability to LeRobot dataset recording, eliminating post-episode encoding delays and significantly improving data collection workflow efficiency.
This PR introduces a new
RealtimeVideoEncoderclass that streams frames directly to ffmpeg during recording instead of saving PNGs and batch-encoding videos after each episode. Key improvements:--dataset.realtime_encoding=trueChanges:
RealtimeVideoEncoderclass invideo_utils.pywith queue-based frame bufferingLeRobotDataset(init, add_frame, finalize)realtime_encodingandvideo_codecconfig parameters tolerobot_record.pyHow it was tested
How to checkout & try? (for the reviewer)
# Record a dataset with real-time encoding enabled lerobot-record \ --dataset.realtime_encoding=true