Skip to content

ls-kelvin/VideoCaption

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

video-pipeline

A modular pipeline for:

  • Reading JSONL video samples (path field)
  • Building a Torch Dataset that reads video with qwen_vl_utils.process_vision_info inside __getitem__
  • Running vLLM offline inference (batch)
  • Multi-process GPU inference
  • Streaming JSONL output with resume/restart

Install

Recommended (editable install):

# ensure your torch + vllm are installed properly for your CUDA
pip install -e .

Input Format

Input is JSONL. Each line is a JSON object containing a video path:

{"path": "/abs/path/to/video.mp4", "meta": {...}}
{"path": "/abs/path/to/video2.mp4"}

Quick Start (torchrun data-parallel)

1) Prepare config (YAML)

Example configs/describe.yaml

2) Run

Example run.sh

Output will be sharded automatically by rank:

  • output.describe.rank0.jsonl
  • output.describe.rank1.jsonl
  • ...

Automatic consolidation: After the pipeline completes, all rank-sharded JSONL files are automatically consolidated into a single output file (e.g., output.describe.jsonl). The individual rank files are removed by default after successful consolidation.

Resume / Restart

Resume is enabled via:

data:
  resume: true

Each rank reads its own output file to collect __key and skips completed samples.

How to Add a New Task

  1. Create a new file video_pipeline/tasks/my_task.py
  2. Implement a Task class and register it.

Example:

from video_pipeline.tasks.base import Task
from video_pipeline.tasks.registry import register_task

@register_task
class MyTask(Task):
    name = "my_task"

    # Choose a dataset implementation for this task
    dataset_name = "qwen_video"

    def build_messages(self, sample):
        return [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": [
                {"type": "text", "text": "What happens in this video?"},
                {"type": "video", "video": sample["__video_uri"]},
            ]},
        ]

    def parse(self, generated_text, sample):
        return {"answer": generated_text.strip()}
  1. Make sure the module is imported somewhere (so registration happens). A simple approach is to import tasks in video_pipeline/tasks/__init__.py:
from . import describe, structured_caption, my_task  # noqa: F401

Then set:

run:
  task: my_task

How to Add a New Dataset Type

Datasets are selected by tasks via Task.dataset_name. We provide a dataset registry under video_pipeline.data.registry.

  1. Create video_pipeline/data/dataset_frames.py
  2. Register it:
from video_pipeline.data.base import BaseDataset
from video_pipeline.data.registry import register_dataset

@register_dataset("frames")
class FramesDataset(BaseDataset):
    def __len__(self):
        return len(self.samples)

    def __getitem__(self, i):
        # return dict with __key, raw, llm_input
        ...
  1. Import it so registry is populated (e.g. in video_pipeline/data/__init__.py):
from . import dataset_qwen_video, dataset_frames  # noqa: F401
  1. In your task:
class MyTask(Task):
    dataset_name = "frames"

Dataset Contract

A dataset item should return:

{
  "__key": "unique-id",
  "raw": {... original json object ...},
  "llm_input": {
     "prompt": "...",
     "multi_modal_data": {...},
     "mm_processor_kwargs": {...}
  }
}

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published