A Python worker that processes video files using OpenAI APIs and stores results in PostgreSQL. The worker implements a 6-stage pipeline that transforms raw video uploads into searchable, AI-analyzed content.
The worker processes videos through a sequential 6-stage pipeline:
Input: uploads/{id}_{name}.mp4
│
├─▶ [1. NORMALIZE] → processed/{id}/normalized.mp4
│ → processed/{id}/audio.wav
│
├─▶ [2. TRANSCRIBE] → transcript_segments table
│ → subs/{id}.srt
│
├─▶ [3. SCENES] → scenes table (t_start, t_end)
│
├─▶ [4. FRAMES] → frames/{id}/scene_*.jpg
│ → frames table (phash, path)
│
├─▶ [5. VISION] → frame_captions table (caption, entities)
│
└─▶ [6. EMBEDDINGS] → UPDATE embeddings (1536-dim vectors)
Output: video.status = 'ready'
Purpose: Convert video to standard format and extract audio
- Input: Original video file (any format)
- Output: 720p/30fps video + 16kHz mono audio
- Tools: FFmpeg
- Database: Updates
videos.normalized_path,videos.duration_sec
Purpose: Generate accurate transcript from audio
- Input: 16kHz mono audio file
- Output: Timestamped transcript segments
- Tools: OpenAI Whisper API
- Database: Inserts
transcript_segmentsrecords - Files: Generates SRT subtitle file
Purpose: Detect scene boundaries for frame extraction
- Input: Normalized video file
- Output: Scene time boundaries
- Tools: PySceneDetect 0.6.7+ (new API with AdaptiveDetector)
- Database: Inserts
scenesrecords witht_start,t_end
Purpose: Extract representative frames and deduplicate
- Input: Normalized video + scene boundaries
- Output: Frame images with perceptual hashes
- Tools: FFmpeg + imagehash
- Database: Inserts
framesrecords withphashfor deduplication - Files: Saves frame images to
frames/{video_id}/
Purpose: Analyze frames with AI vision (parallel processing)
- Input: Frame images
- Output: Structured captions, controls, text detection
- Tools: OpenAI GPT-4o Vision API with parallel processing
- Database: Inserts
frame_captionsrecords with JSONB entities - Performance: Concurrent API calls with semaphore limiting
- Features: Structured output for consistent data
Purpose: Generate searchable vector embeddings
- Input: Transcript text + frame captions
- Output: 1536-dimensional vectors
- Tools: OpenAI text-embedding-3-small
- Database: Updates embedding columns in
transcript_segmentsandframe_captions
The worker is tightly coupled to the PostgreSQL schema:
videos.original_path- Input video locationjobstable - Job queue polling- Existing records for idempotency
scenes- Scene boundariesframes- Extracted frames with hashestranscript_segments- Audio transcriptionframe_captions- Vision analysis results
videos.status- Processing statusvideos.normalized_path- Processed video locationvideos.duration_sec- Video durationjobs.status- Job completion status
The worker follows strict ID patterns for consistency:
# Scene ID: "{video_id}_scene_{idx:03d}"
scene_id = f"{video_id}_scene_{i:03d}"
# Frame ID: "{video_id}_frame_{idx:03d}"
frame_id = f"{video_id}_frame_{i:03d}"
# Segment ID: "{video_id}_segment_{idx:03d}"
segment_id = f"{video_id}_segment_{i:03d}"
# Caption ID: "{frame_id}_caption"
caption_id = f"{frame_id}_caption"- Interval: 1.5 seconds (configurable via
WORKER_POLL_MS) - Strategy:
FOR UPDATE SKIP LOCKEDfor atomic job claiming - Backoff: Exponential backoff when no jobs available
- Retry: Up to 3 attempts per job (configurable via
WORKER_MAX_ATTEMPTS)
pending → processing → done/failed
- Job Failures: Marked as
failedwith error message - Video Failures: Status remains
processinguntil retry - Logging: Comprehensive error logging with stack traces
- Recovery: Jobs can be retried manually
| Variable | Required | Default | Description |
|---|---|---|---|
DATABASE_URL |
✅ | - | PostgreSQL connection string |
OPENAI_API_KEY |
✅ | - | OpenAI API key |
DATA_DIR |
❌ | /app/data |
Data directory path |
WORKER_POLL_MS |
❌ | 1500 |
Polling interval (milliseconds) |
WORKER_MAX_ATTEMPTS |
❌ | 3 |
Max retry attempts |
LOG_LEVEL |
❌ | INFO |
Logging level |
WORKER_DEV_HTTP |
❌ | false |
Enable HTTP endpoints |
WORKER_HTTP_PORT |
❌ | 8000 |
HTTP server port |
VISION_MAX_CONCURRENT |
❌ | 5 |
Max concurrent vision API calls |
When WORKER_DEV_HTTP=true:
| Endpoint | Method | Purpose | Response |
|---|---|---|---|
/healthz |
GET | Health check | {ok: true, status: "healthy"} |
/jobs/peek |
GET | View pending jobs | {pending_jobs: number, jobs: [...]} |
/stats |
GET | Processing statistics | {jobs: {...}, videos: {...}, processing: {...}} |
2024-01-15 10:30:00 - video_worker - INFO - [run.py:75] - CLAIMED: Processing job abc123 for video def456
- CLAIMED: Job claimed for processing
- NORMALIZED: Video normalization complete
- TRANSCRIBED: Audio transcription complete
- SCENES: Scene detection complete
- FRAMES: Frame extraction complete
- VISION: Vision analysis complete
- EMBEDDINGS: Embedding generation complete
- READY: Pipeline completed successfully
- FAILED: Pipeline failed with error
- Location:
{DATA_DIR}/worker/log.log - Rotation: 5MB max size, 3 backup files
- Format: Structured logging with timestamps
| Video Length | Normalize | Transcribe | Scenes | Frames | Vision | Embeddings | Total |
|---|---|---|---|---|---|---|---|
| 1 minute | 5s | 10s | 2s | 3s | 8s* | 5s | 33s |
| 5 minutes | 15s | 30s | 5s | 10s | 30s* | 20s | 2min |
| 30 minutes | 60s | 3min | 20s | 45s | 2min* | 2min | 8.5min |
*Vision processing times reduced with parallel API calls (3-5x improvement)
- CPU: High during FFmpeg operations and parallel AI API calls
- Memory: Moderate (image processing, embeddings, concurrent requests)
- Storage: 2-3x original video size
- Network: Concurrent OpenAI API calls for transcription and vision
- Database: Connection pooling for concurrent operations
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export DATABASE_URL="postgresql://user:pass@localhost:5432/videoqa"
export OPENAI_API_KEY="your-key"
export DATA_DIR="/path/to/data"
export WORKER_DEV_HTTP=true
# Run worker
python -m worker.run# Insert test job
psql $DATABASE_URL -c "INSERT INTO jobs (id, video_id) VALUES ('test-job', 'test-video');"
# Monitor logs
tail -f data/worker/log.log# Check worker health
curl http://localhost:8000/healthz
# View pending jobs
curl http://localhost:8000/jobs/peek
# Check processing stats
curl http://localhost:8000/statsFROM python:3.11-slim
RUN apt-get update && apt-get install -y ffmpeg tesseract-ocr
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY worker ./worker
CMD ["python", "-m", "worker.run"]worker:
image: videoqa-worker:0.0.17
environment:
- DATABASE_URL=postgresql://postgres:postgres@postgres:5432/videoqa
- OPENAI_API_KEY=${OPENAI_KEY}
- DATA_DIR=/app/data
volumes:
- ./data:/app/data
depends_on:
- postgres-
"Video path not found"
- Check
DATA_DIRenvironment variable - Verify file exists at resolved path
- Check database
videos.original_pathvalue
- Check
-
"OpenAI API error"
- Verify
OPENAI_API_KEYis valid - Check API key has sufficient credits
- Monitor API rate limits
- Verify
-
"Database connection failed"
- Check
DATABASE_URLformat - Verify PostgreSQL is running
- Check network connectivity
- Check
-
"FFmpeg not found"
- Ensure FFmpeg is installed in container
- Check Dockerfile includes FFmpeg installation
# Check worker logs
docker-compose logs worker
# Check database connection
docker-compose exec worker python -c "from worker.db import Database; db = Database('$DATABASE_URL'); db.connect()"
# Test OpenAI API
curl -H "Authorization: Bearer $OPENAI_API_KEY" https://api.openai.com/v1/models