Skip to content

feat: add FFmpeg streaming write support via pipe protocol #89

@zhexuany

Description

@zhexuany

Summary

Add support for streaming video data to FFmpeg via stdin and reading encoded output via stdout (pipe protocol). This enables zero-copy video encoding without intermediate files.

Problem

Currently, video encoding in roboflow has limitations:

  • No streaming write: Output must be written to local files before FFmpeg can process
  • Disk overhead: Full intermediate files consume disk space
  • No direct S3 integration: Can't stream directly from S3 → FFmpeg → S3

Solution

Implement FFmpeg streaming using the pipe protocol:

┌─────────────┐      ┌─────────────┐      ┌─────────────┐
│   S3/OSS    │ ───▶ │   Roboflow  │ ───▶ │   FFmpeg    │
│  (input)    │      │  (parser)   │      │ (encoder)   │
└─────────────┘      └─────────────┘      └─────────────┘
                            │                     │
                            │ pipe:               │ pipe:
                            │ raw frames          │ encoded video
                            ▼                     ▼
                     ┌────────────────────────────┐
                     │     No intermediate files   │
                     └────────────────────────────┘
                            │
                            ▼
                     ┌─────────────┐
                     │   S3/OSS    │
                     │  (output)   │
                     └─────────────┘

Implementation Tasks

1. Create FFmpeg Pipe Writer Module

// crates/roboflow-pipeline/src/ffmpeg/pipe.rs

pub struct PipeEncoder {
    ffmpeg_cmd: Command,
    stdin: ChildStdin,
    stdout: ChildStdout,
}

impl PipeEncoder {
    /// Spawn FFmpeg with pipe protocol
    pub fn spawn(config: &EncoderConfig) -> Result<Self>;
    
    /// Write raw frame data to stdin
    pub fn write_frame(&mut self, data: &[u8]) -> Result<()>;
    
    /// Read encoded output from stdout (async)
    pub async fn read_output(&mut self) -> Result<Vec<u8>>;
    
    /// Flush and close stdin, finish encoding
    pub fn finish(&mut self) -> Result<()>;
}

2. Integrate with Existing Storage

Leverage existing roboflow-storage S3/OSS streaming:

pub async fn encode_streaming(
    storage: &dyn Storage,
    input_url: &str,
    output_url: &str,
    config: &EncoderConfig,
) -> Result<()> {
    // Stream input from S3
    let mut reader = storage.open(input_url).await?;
    
    // Spawn FFmpeg with pipes
    let mut encoder = PipeEncoder::spawn(config)?;
    
    // Copy reader → encoder stdin (async with backpressure)
    tokio::io::copy(&mut reader, &mut encoder.stdin).await?;
    
    // Read encoder stdout → storage
    let mut writer = storage.create(output_url).await?;
    tokio::io::copy(&mut encoder.stdout, &mut writer).await?;
    
    Ok(())
}

3. Add Configuration

pub struct PipeEncoderConfig {
    /// Video codec (h264, h265, etc.)
    pub codec: String,
    
    /// FFmpeg arguments (preset, crf, etc.)
    pub args: Vec<String>,
    
    /// Buffer size for stdin/stdout pipes
    pub pipe_buffer_size: usize,  // default: 64KB
}

4. Add to KPS Video Writer

Integrate with existing crates/roboflow-dataset/src/kps/video_encoder.rs:

pub enum VideoEncoderBackend {
    FfmpegPipe(PipeEncoder),     // NEW: streaming
    FfmpegFile(Process),         // EXISTING: file-based
    Nvenc(...),                  // EXISTING: GPU
}

5. Add Tests

  • Unit test: PipeEncoder spawn and basic I/O
  • Integration test: Small video through full pipeline
  • Error handling: FFmpeg crash detection

FFmpeg Command Examples

# Input from stdin pipe, output to stdout pipe
ffmpeg -f rawvideo -pix_fmt rgb24 -s 1280x720 -r 30 -i pipe:0 \
       -c:v h264 -preset fast -crf 22 -f mp4 pipe:1

# For MP4 format (needs moov atom at end - tricky with pipe)
# Alternative: use MPEG-TS for streaming (no seeking needed)
ffmpeg -f rawvideo -pix_fmt rgb24 -s 1280x720 -r 30 -i pipe:0 \
       -c:v h264 -preset fast -crf 22 -f mpegts pipe:1

Design Decisions

Question Decision
Output format MPEG-TS (streaming-friendly) or MP4 with faststart
Error handling Propagate FFmpeg exit codes
Buffer size 64KB pipes (configurable)
Backpressure Async I/O with bounded queues

Performance Benefits

Metric Before (file-based) After (pipe) Improvement
Disk I/O 2x file size Minimal ~100%
Latency Write + Read Direct ~50%
Disk space 2x required Temporary only ~50%

Related

References

Acceptance Criteria

  • PipeEncoder module implemented
  • Integration with roboflow-storage for streaming
  • Works with S3 and OSS backends
  • Error handling for FFmpeg failures
  • Tests pass (unit + integration)
  • Documentation updated

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions