A modular pipeline for:
- Reading JSONL video samples (
pathfield) - Building a Torch Dataset that reads video with
qwen_vl_utils.process_vision_infoinside__getitem__ - Running vLLM offline inference (batch)
- Multi-process GPU inference
- Streaming JSONL output with resume/restart
Recommended (editable install):
# ensure your torch + vllm are installed properly for your CUDA
pip install -e .Input is JSONL. Each line is a JSON object containing a video path:
{"path": "/abs/path/to/video.mp4", "meta": {...}}
{"path": "/abs/path/to/video2.mp4"}Example configs/describe.yaml
Example run.sh
Output will be sharded automatically by rank:
output.describe.rank0.jsonloutput.describe.rank1.jsonl- ...
Automatic consolidation: After the pipeline completes, all rank-sharded JSONL files are automatically consolidated into a single output file (e.g., output.describe.jsonl). The individual rank files are removed by default after successful consolidation.
Resume is enabled via:
data:
resume: trueEach rank reads its own output file to collect __key and skips completed samples.
- Create a new file
video_pipeline/tasks/my_task.py - Implement a
Taskclass and register it.
Example:
from video_pipeline.tasks.base import Task
from video_pipeline.tasks.registry import register_task
@register_task
class MyTask(Task):
name = "my_task"
# Choose a dataset implementation for this task
dataset_name = "qwen_video"
def build_messages(self, sample):
return [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": [
{"type": "text", "text": "What happens in this video?"},
{"type": "video", "video": sample["__video_uri"]},
]},
]
def parse(self, generated_text, sample):
return {"answer": generated_text.strip()}- Make sure the module is imported somewhere (so registration happens).
A simple approach is to import tasks in
video_pipeline/tasks/__init__.py:
from . import describe, structured_caption, my_task # noqa: F401Then set:
run:
task: my_taskDatasets are selected by tasks via Task.dataset_name.
We provide a dataset registry under video_pipeline.data.registry.
- Create
video_pipeline/data/dataset_frames.py - Register it:
from video_pipeline.data.base import BaseDataset
from video_pipeline.data.registry import register_dataset
@register_dataset("frames")
class FramesDataset(BaseDataset):
def __len__(self):
return len(self.samples)
def __getitem__(self, i):
# return dict with __key, raw, llm_input
...- Import it so registry is populated (e.g. in
video_pipeline/data/__init__.py):
from . import dataset_qwen_video, dataset_frames # noqa: F401- In your task:
class MyTask(Task):
dataset_name = "frames"A dataset item should return:
{
"__key": "unique-id",
"raw": {... original json object ...},
"llm_input": {
"prompt": "...",
"multi_modal_data": {...},
"mm_processor_kwargs": {...}
}
}