Skip to content

[Phase 5] Frame-level checkpoint with TiKV and multipart resume #19

@zhexuany

Description

@zhexuany

Summary

Implement frame-level checkpoint system using TiKV for state persistence, enabling resume from any point including mid-multipart uploads. Essential for Spot Instance tolerance and large file processing.

Parent Epic

Dependencies

Architecture Change

Previous Design (Episode-level):

  • Checkpoint after each episode completes
  • Lost work if interrupted mid-episode
  • File-based checkpoint storage

New Design (Frame-level):

  • Checkpoint every N frames or N seconds
  • Resume from exact frame position
  • TiKV-based checkpoint storage
  • Multipart upload state preserved

Checkpoint Schema

CheckpointState (stored in TiKV at /state/{job_hash})

job_id: String
byte_offset: u64              # Position in source file
frame_idx: u64                # Last completed frame
episode_idx: u64              # Current episode index

video_uploads: Map<camera_id, VideoUploadState>
  - upload_id: String
  - parts: Vec<{part_num, etag, size}>
  - last_keyframe_offset: u64

parquet_upload: ParquetUploadState
  - upload_id: String
  - parts: Vec<{part_num, etag, size}>
  - rows_written: u64
  - buffer_state: Vec<u8>     # Serialized partial buffer (optional)

last_updated: Timestamp

Tasks

5.1 Define Checkpoint Schema

  1. Create src/distributed/checkpoint/schema.rs
  2. Define CheckpointState struct with all fields
  3. Define VideoUploadState and ParquetUploadState
  4. Implement serde serialization (JSON or bincode)
  5. Add schema versioning for future migrations

5.2 Implement CheckpointManager

  1. Create src/distributed/checkpoint/manager.rs
  2. Define CheckpointManager struct with TiKV client reference
  3. Implement load(job_id) -> Option<CheckpointState>:
    • Read from /state/{job_hash}
    • Deserialize and validate
  4. Implement save(checkpoint) -> Result<()>:
    • Serialize checkpoint
    • Write to TiKV with heartbeat in single transaction
  5. Implement delete(job_id):
    • Remove checkpoint after job completion
  6. Add async variants for non-blocking saves

5.3 Checkpoint Frequency Configuration

  1. Add to config:
    • checkpoint_interval_frames: u64 (default: 100)
    • checkpoint_interval_seconds: u64 (default: 10)
    • checkpoint_async: bool (default: true)
  2. Implement checkpoint trigger logic:
    • Check after each frame batch
    • Trigger if frames >= interval OR time >= interval
  3. For async mode: Queue checkpoint, don't block pipeline

5.4 Resume Logic - Source Stream

  1. On job start, load checkpoint
  2. If byte_offset > 0:
  3. If frame_idx > 0:
    • Skip frames from parser until frame_idx reached
  4. Validate source file hasn't changed (size, modification time)

5.5 Resume Logic - Multipart Uploads

  1. If video_uploads not empty:
    • For each camera, resume upload with existing upload_id
    • Start from next part number
    • Handle expired uploads (>7 days): abort and restart
  2. If parquet_upload not empty:
    • Resume parquet upload similarly
  3. Track new parts, append to checkpoint

5.6 Resume Logic - Buffers

  1. Option A (Simple): Discard partial buffers on resume
    • Re-process frames since last checkpoint
    • Slight duplication but simpler
  2. Option B (Complex): Serialize buffer state
    • Store partial parquet buffer in checkpoint
    • Restore on resume
    • More complex, larger checkpoint
  3. Recommendation: Start with Option A

5.7 Integration with Pipeline

  1. Add CheckpointContext to pipeline context
  2. After each batch completion:
    • Update checkpoint state
    • If trigger condition met: save to TiKV
  3. On pipeline completion:
    • Final checkpoint (for verification)
    • Delete checkpoint after success

5.8 Heartbeat Integration

  1. Combine checkpoint save with heartbeat update
  2. Single TiKV transaction:
    • Write /state/{job_hash} with checkpoint
    • Write /heartbeat/{pod_id} with timestamp
  3. Reduces TiKV round trips

Acceptance Criteria

  • CheckpointState schema defined and serializable
  • CheckpointManager implements load/save/delete
  • Checkpoint saved at configured intervals
  • Source stream resume from byte offset works
  • Multipart upload resume works (video and parquet)
  • Checkpoint + heartbeat in single transaction
  • Job completes correctly after simulated failure
  • No data loss or duplication on resume
  • Expired uploads handled gracefully
  • Unit tests for checkpoint manager
  • Integration test: kill and resume mid-job

Files to Create

  • src/distributed/checkpoint/mod.rs
  • src/distributed/checkpoint/schema.rs
  • src/distributed/checkpoint/manager.rs

Files to Modify

  • src/distributed/worker.rs (load checkpoint on job start)
  • src/dataset/lerobot/writer.rs (add checkpoint hooks)
  • src/storage/multipart.rs (add resume support)

Testing Strategy

  1. Unit tests for serialization/deserialization
  2. Unit tests for checkpoint manager with mock TiKV
  3. Integration test: Process file, kill at 50%, resume, verify output
  4. Stress test: Random kills during processing
  5. Edge cases: Expired uploads, corrupted checkpoints, config changes

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions