Skip to content

[Phase 7.1] Integrate pipeline with checkpoint hooks [PARTIALLY COMPLETE]Β #47

@zhexuany

Description

@zhexuany

Summary

Integrate the existing pipeline with checkpoint hooks for distributed mode. Add distributed context and checkpoint triggers at stage boundaries.

Status: PARTIALLY COMPLETE βœ…πŸ”§

βœ… Completed (via #72, #73)

  • DistributedPipelineContext equivalent - Worker.process_job() now integrates with TiKV
  • Source stage hook - ProgressCallback with byte offset tracking (via messages_processed)
  • Parser stage hook - Frame index tracking via frame counting
  • Checkpoint trigger - Configurable by frame count and time interval
  • Resume from checkpoint - Worker checks for existing checkpoints on job start

πŸ”„ Remaining Enhancements (Optional)

These are optional refinements to the current implementation:

  1. Processor stage hook (video encoder state)

  2. Uploader stage hook (detailed multipart state)

    • Current: Upload coordinator tracks completion at episode level
    • Enhancement: Track in-progress multipart upload IDs for part-by-part resume
    • Priority: Medium - useful for large video uploads
    • Note: See TODO in WorkerCheckpointCallback about episode-level tracking

Recommendation

This issue can be closed with the understanding that:

  1. Frame-level checkpointing works ([Phase 1] Add checkpoint save during pipeline processingΒ #73)
  2. Resume from checkpoint works
  3. Upload state tracking is simplified (completion-based, not multipart-detailed)

If detailed multipart upload resume is needed, create a new issue specifically for that feature.

Original Issue Content Below


Integrate the existing pipeline with checkpoint hooks for distributed mode. Add distributed context and checkpoint triggers at stage boundaries.

Pipeline Context

Add DistributedPipelineContext to carry:

  • TiKV client reference
  • Job ID and file hash
  • Pod ID
  • Checkpoint manager
  • Initial checkpoint state (for resume)

Checkpoint Hooks

Inject at stage boundaries:

Source β†’ [checkpoint: byte_offset] β†’ Parser
Parser β†’ [checkpoint: frame_idx] β†’ Processor  
Processor β†’ [checkpoint: encoder_state] β†’ Uploader
Uploader β†’ [checkpoint: parts[]] β†’ Complete

Acceptance Criteria

  • DistributedPipelineContext defined
  • Pipeline wrapper with hooks
  • Source stage tracks byte offset
  • Parser stage tracks frame index
  • Processor stage tracks encoder state (optional)
  • Uploader stage tracks parts (optional)
  • Checkpoint trigger at intervals
  • Resume works from any checkpoint
  • Completion handler works
  • No data loss or duplication
  • Integration test: interrupt and resume (basic test passes)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions