-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
area/pipelinePipeline processingPipeline processingpriority/mediumMedium priorityMedium prioritysize/MMedium: 3-5 daysMedium: 3-5 daystype/featureNew feature or functionalityNew feature or functionality
Description
Summary
Add support for streaming video data to FFmpeg via stdin and reading encoded output via stdout (pipe protocol). This enables zero-copy video encoding without intermediate files.
Problem
Currently, video encoding in roboflow has limitations:
- No streaming write: Output must be written to local files before FFmpeg can process
- Disk overhead: Full intermediate files consume disk space
- No direct S3 integration: Can't stream directly from S3 → FFmpeg → S3
Solution
Implement FFmpeg streaming using the pipe protocol:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ S3/OSS │ ───▶ │ Roboflow │ ───▶ │ FFmpeg │
│ (input) │ │ (parser) │ │ (encoder) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
│ pipe: │ pipe:
│ raw frames │ encoded video
▼ ▼
┌────────────────────────────┐
│ No intermediate files │
└────────────────────────────┘
│
▼
┌─────────────┐
│ S3/OSS │
│ (output) │
└─────────────┘
Implementation Tasks
1. Create FFmpeg Pipe Writer Module
// crates/roboflow-pipeline/src/ffmpeg/pipe.rs
pub struct PipeEncoder {
ffmpeg_cmd: Command,
stdin: ChildStdin,
stdout: ChildStdout,
}
impl PipeEncoder {
/// Spawn FFmpeg with pipe protocol
pub fn spawn(config: &EncoderConfig) -> Result<Self>;
/// Write raw frame data to stdin
pub fn write_frame(&mut self, data: &[u8]) -> Result<()>;
/// Read encoded output from stdout (async)
pub async fn read_output(&mut self) -> Result<Vec<u8>>;
/// Flush and close stdin, finish encoding
pub fn finish(&mut self) -> Result<()>;
}2. Integrate with Existing Storage
Leverage existing roboflow-storage S3/OSS streaming:
pub async fn encode_streaming(
storage: &dyn Storage,
input_url: &str,
output_url: &str,
config: &EncoderConfig,
) -> Result<()> {
// Stream input from S3
let mut reader = storage.open(input_url).await?;
// Spawn FFmpeg with pipes
let mut encoder = PipeEncoder::spawn(config)?;
// Copy reader → encoder stdin (async with backpressure)
tokio::io::copy(&mut reader, &mut encoder.stdin).await?;
// Read encoder stdout → storage
let mut writer = storage.create(output_url).await?;
tokio::io::copy(&mut encoder.stdout, &mut writer).await?;
Ok(())
}3. Add Configuration
pub struct PipeEncoderConfig {
/// Video codec (h264, h265, etc.)
pub codec: String,
/// FFmpeg arguments (preset, crf, etc.)
pub args: Vec<String>,
/// Buffer size for stdin/stdout pipes
pub pipe_buffer_size: usize, // default: 64KB
}4. Add to KPS Video Writer
Integrate with existing crates/roboflow-dataset/src/kps/video_encoder.rs:
pub enum VideoEncoderBackend {
FfmpegPipe(PipeEncoder), // NEW: streaming
FfmpegFile(Process), // EXISTING: file-based
Nvenc(...), // EXISTING: GPU
}5. Add Tests
- Unit test: PipeEncoder spawn and basic I/O
- Integration test: Small video through full pipeline
- Error handling: FFmpeg crash detection
FFmpeg Command Examples
# Input from stdin pipe, output to stdout pipe
ffmpeg -f rawvideo -pix_fmt rgb24 -s 1280x720 -r 30 -i pipe:0 \
-c:v h264 -preset fast -crf 22 -f mp4 pipe:1
# For MP4 format (needs moov atom at end - tricky with pipe)
# Alternative: use MPEG-TS for streaming (no seeking needed)
ffmpeg -f rawvideo -pix_fmt rgb24 -s 1280x720 -r 30 -i pipe:0 \
-c:v h264 -preset fast -crf 22 -f mpegts pipe:1Design Decisions
| Question | Decision |
|---|---|
| Output format | MPEG-TS (streaming-friendly) or MP4 with faststart |
| Error handling | Propagate FFmpeg exit codes |
| Buffer size | 64KB pipes (configurable) |
| Backpressure | Async I/O with bounded queues |
Performance Benefits
| Metric | Before (file-based) | After (pipe) | Improvement |
|---|---|---|---|
| Disk I/O | 2x file size | Minimal | ~100% |
| Latency | Write + Read | Direct | ~50% |
| Disk space | 2x required | Temporary only | ~50% |
Related
- Epic: [Epic] Distributed Roboflow with Alibaba Cloud (OSS + ACK) #9 Distributed Roboflow with Alibaba Cloud
- Depends on:
roboflow-storagestreaming (already implemented) - Enables: Direct S3/OSS → FFmpeg → S3/OSS pipelines
References
- FFmpeg Pipe Protocol
- ffmpeg-s3 review - Node.js wrapper (not production-ready)
- Discussion: FFmpeg with native S3 via libav (compile-time option)
Acceptance Criteria
-
PipeEncodermodule implemented - Integration with
roboflow-storagefor streaming - Works with S3 and OSS backends
- Error handling for FFmpeg failures
- Tests pass (unit + integration)
- Documentation updated
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area/pipelinePipeline processingPipeline processingpriority/mediumMedium priorityMedium prioritysize/MMedium: 3-5 daysMedium: 3-5 daystype/featureNew feature or functionalityNew feature or functionality