-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Labels
area/lerobotLeRobot dataset formatLeRobot dataset formatpriority/highHigh priorityHigh prioritysize/LLarge: 1-2 weeksLarge: 1-2 weekstype/featureNew feature or functionalityNew feature or functionality
Description
Summary
Implement frame-level checkpoint system using TiKV for state persistence, enabling resume from any point including mid-multipart uploads. Essential for Spot Instance tolerance and large file processing.
Parent Epic
- [Epic] Distributed Roboflow with Alibaba Cloud (OSS + ACK) #9 Distributed Roboflow with TiKV Coordination
Dependencies
- Depends on: [Phase 1.1] Add core dependencies for storage abstraction (#10) #29 (Distributed Lock Manager), chore: update .gitignore #31 (Worker Loop)
- Enables: [Phase 2.3] Add retry logic and error handling for cloud operations #33, feat: implement multipart upload for large files (#12) #34, feat: [Phase 2.4] Implement cached storage backend with local buffer #35
Architecture Change
Previous Design (Episode-level):
- Checkpoint after each episode completes
- Lost work if interrupted mid-episode
- File-based checkpoint storage
New Design (Frame-level):
- Checkpoint every N frames or N seconds
- Resume from exact frame position
- TiKV-based checkpoint storage
- Multipart upload state preserved
Checkpoint Schema
CheckpointState (stored in TiKV at /state/{job_hash})
job_id: String
byte_offset: u64 # Position in source file
frame_idx: u64 # Last completed frame
episode_idx: u64 # Current episode index
video_uploads: Map<camera_id, VideoUploadState>
- upload_id: String
- parts: Vec<{part_num, etag, size}>
- last_keyframe_offset: u64
parquet_upload: ParquetUploadState
- upload_id: String
- parts: Vec<{part_num, etag, size}>
- rows_written: u64
- buffer_state: Vec<u8> # Serialized partial buffer (optional)
last_updated: Timestamp
Tasks
5.1 Define Checkpoint Schema
- Create
src/distributed/checkpoint/schema.rs - Define
CheckpointStatestruct with all fields - Define
VideoUploadStateandParquetUploadState - Implement serde serialization (JSON or bincode)
- Add schema versioning for future migrations
5.2 Implement CheckpointManager
- Create
src/distributed/checkpoint/manager.rs - Define
CheckpointManagerstruct with TiKV client reference - Implement
load(job_id) -> Option<CheckpointState>:- Read from
/state/{job_hash} - Deserialize and validate
- Read from
- Implement
save(checkpoint) -> Result<()>:- Serialize checkpoint
- Write to TiKV with heartbeat in single transaction
- Implement
delete(job_id):- Remove checkpoint after job completion
- Add async variants for non-blocking saves
5.3 Checkpoint Frequency Configuration
- Add to config:
checkpoint_interval_frames: u64(default: 100)checkpoint_interval_seconds: u64(default: 10)checkpoint_async: bool(default: true)
- Implement checkpoint trigger logic:
- Check after each frame batch
- Trigger if frames >= interval OR time >= interval
- For async mode: Queue checkpoint, don't block pipeline
5.4 Resume Logic - Source Stream
- On job start, load checkpoint
- If
byte_offset > 0:- Seek source stream to offset (requires [Phase 2.3] Add retry logic and error handling for cloud operations #33)
- Or: Read and discard bytes until offset
- If
frame_idx > 0:- Skip frames from parser until frame_idx reached
- Validate source file hasn't changed (size, modification time)
5.5 Resume Logic - Multipart Uploads
- If
video_uploadsnot empty:- For each camera, resume upload with existing upload_id
- Start from next part number
- Handle expired uploads (>7 days): abort and restart
- If
parquet_uploadnot empty:- Resume parquet upload similarly
- Track new parts, append to checkpoint
5.6 Resume Logic - Buffers
- Option A (Simple): Discard partial buffers on resume
- Re-process frames since last checkpoint
- Slight duplication but simpler
- Option B (Complex): Serialize buffer state
- Store partial parquet buffer in checkpoint
- Restore on resume
- More complex, larger checkpoint
- Recommendation: Start with Option A
5.7 Integration with Pipeline
- Add
CheckpointContextto pipeline context - After each batch completion:
- Update checkpoint state
- If trigger condition met: save to TiKV
- On pipeline completion:
- Final checkpoint (for verification)
- Delete checkpoint after success
5.8 Heartbeat Integration
- Combine checkpoint save with heartbeat update
- Single TiKV transaction:
- Write
/state/{job_hash}with checkpoint - Write
/heartbeat/{pod_id}with timestamp
- Write
- Reduces TiKV round trips
Acceptance Criteria
-
CheckpointStateschema defined and serializable -
CheckpointManagerimplements load/save/delete - Checkpoint saved at configured intervals
- Source stream resume from byte offset works
- Multipart upload resume works (video and parquet)
- Checkpoint + heartbeat in single transaction
- Job completes correctly after simulated failure
- No data loss or duplication on resume
- Expired uploads handled gracefully
- Unit tests for checkpoint manager
- Integration test: kill and resume mid-job
Files to Create
src/distributed/checkpoint/mod.rssrc/distributed/checkpoint/schema.rssrc/distributed/checkpoint/manager.rs
Files to Modify
src/distributed/worker.rs(load checkpoint on job start)src/dataset/lerobot/writer.rs(add checkpoint hooks)src/storage/multipart.rs(add resume support)
Testing Strategy
- Unit tests for serialization/deserialization
- Unit tests for checkpoint manager with mock TiKV
- Integration test: Process file, kill at 50%, resume, verify output
- Stress test: Random kills during processing
- Edge cases: Expired uploads, corrupted checkpoints, config changes
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area/lerobotLeRobot dataset formatLeRobot dataset formatpriority/highHigh priorityHigh prioritysize/LLarge: 1-2 weeksLarge: 1-2 weekstype/featureNew feature or functionalityNew feature or functionality