Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vale/styles/config/vocabularies/Data/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Modin
[Mm]ultiget(s)?
ndarray(s)?
NLP
[Oo]mni
[Oo]utqueue(s)?
PDFs
PIL
Expand Down
66 changes: 64 additions & 2 deletions doc/source/data/doc_code/working-with-llms/basic_llm_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""

# __basic_llm_example_start__
import os
import shutil
import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_processor

Expand Down Expand Up @@ -125,6 +127,66 @@
)
# __s3_config_example_end__

base_dir = "/tmp/llm_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Reset directories
for path in (input_path, output_path, checkpoint_path):
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)

# __row_level_fault_tolerance_config_example_start__
# Row-level fault tolerance configuration
config = vLLMEngineProcessorConfig(
model_source="unsloth/Llama-3.1-8B-Instruct",
concurrency=1,
batch_size=64,
should_continue_on_error=True,
)
# __row_level_fault_tolerance_config_example_end__

# __checkpoint_config_setup_example_start__
from ray.data.checkpoint import CheckpointConfig

ctx = ray.data.DataContext.get_current()
ctx.checkpoint_config = CheckpointConfig(
id_column="id",
checkpoint_path=checkpoint_path,
delete_checkpoint_on_success=False,
)
# __checkpoint_config_setup_example_end__

# __checkpoint_usage_example_start__
processor_config = vLLMEngineProcessorConfig(
model_source="unsloth/Llama-3.1-8B-Instruct",
concurrency=1,
batch_size=16,
)

processor = build_processor(
processor_config,
preprocess=lambda row: dict(
id=row["id"], # Preserve the ID column for checkpointing
prompt=row["prompt"],
sampling_params=dict(
temperature=0.3,
max_tokens=10,
),
),
postprocess=lambda row: {
"id": row["id"], # Preserve the ID column for checkpointing
"answer": row.get("generated_text"),
},
)

ds = ray.data.read_parquet(input_path)
ds = processor(ds)
ds.write_parquet(output_path)
# __checkpoint_usage_example_end__
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoint demo runs during module import

High Severity

The new checkpoint example executes at import time: it deletes and recreates /tmp/llm_checkpoint_demo/*, sets global ray.data.DataContext checkpoint config, then calls ray.data.read_parquet(input_path) and write_parquet(output_path) without creating any input data. This can fail CI/docs builds and introduces unexpected filesystem and global state side effects.

Fix in Cursor Fix in Web



# __gpu_memory_config_example_start__
# GPU memory management configuration
# If you encounter CUDA out of memory errors, try these optimizations:
Expand Down Expand Up @@ -199,8 +261,8 @@ def create_embedding_processor():
),
batch_size=8,
concurrency=1,
apply_chat_template=False,
detokenize=False,
chat_template_stage=False,
detokenize_stage=False,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
),
batch_size=8,
concurrency=1,
apply_chat_template=False,
detokenize=False,
chat_template_stage=False,
detokenize_stage=False,
)

classification_processor = build_processor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
"enabled": True,
"chat_template_content_format": "openai",
},
chat_template_stage={"enabled": True},
tokenize_stage={"enabled": True},
detokenize_stage={"enabled": True},
chat_template_stage=True,
tokenize_stage=True,
detokenize_stage=True,
)
# __omni_audio_config_example_end__

Expand Down Expand Up @@ -112,6 +112,7 @@ def audio_postprocess(row: dict) -> dict:


def load_audio_dataset():
# __omni_audio_load_dataset_example_start__
"""
Load audio dataset from MRSAudio Hugging Face dataset.
"""
Expand Down Expand Up @@ -151,6 +152,7 @@ def load_audio_dataset():
except Exception as e:
print(f"Error loading dataset: {e}")
return None
# __omni_audio_load_dataset_example_end__


def create_omni_audio_config():
Expand All @@ -169,13 +171,13 @@ def create_omni_audio_config():
"enabled": True,
"chat_template_content_format": "openai",
},
chat_template_stage={"enabled": True},
tokenize_stage={"enabled": True},
detokenize_stage={"enabled": True},
chat_template_stage=True,
tokenize_stage=True,
detokenize_stage=True,
)


def run_omni_audio_example():
# __omni_audio_run_example_start__
"""Run the complete Omni audio example workflow."""
config = create_omni_audio_config()
audio_dataset = load_audio_dataset()
Expand All @@ -191,7 +193,7 @@ def run_omni_audio_example():
print(f"Has multimodal support: {config.prepare_multimodal_stage.get('enabled', False)}")
result = processor(audio_dataset).take_all()
return config, processor, result
# __omni_audio_run_example_end__
# __omni_audio_run_example_end__
return None, None, None


Expand Down
16 changes: 4 additions & 12 deletions doc/source/data/doc_code/working-with-llms/vlm_image_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,17 @@
fs = HfFileSystem()
vision_dataset = ray.data.read_parquet(path, filesystem=fs)

HF_TOKEN = "your-hf-token-here" # Replace with actual token if needed

# __vlm_config_example_start__
vision_processor_config = vLLMEngineProcessorConfig(
model_source="Qwen/Qwen2.5-VL-3B-Instruct",
engine_kwargs=dict(
tensor_parallel_size=1,
pipeline_parallel_size=1,
max_model_len=4096,
enable_chunked_prefill=True,
max_num_batched_tokens=2048,
trust_remote_code=True,
limit_mm_per_prompt={"image": 1},
),
runtime_env=dict(
env_vars=dict(
VLLM_USE_V1="1",
),
),
batch_size=16,
accelerator_type="L4",
concurrency=1,
prepare_multimodal_stage={"enabled": True},
)
Expand Down Expand Up @@ -146,6 +136,7 @@ def vision_postprocess(row: dict) -> dict:


def load_vision_dataset():
# __vlm_image_load_dataset_example_start__
"""
Load vision dataset from Hugging Face.

Expand All @@ -171,6 +162,7 @@ def load_vision_dataset():
except Exception as e:
print(f"Error loading dataset: {e}")
return None
# __vlm_image_load_dataset_example_end__


def create_vlm_config():
Expand All @@ -185,13 +177,13 @@ def create_vlm_config():
limit_mm_per_prompt={"image": 1},
),
batch_size=1,
accelerator_type="L4",
concurrency=1,
prepare_multimodal_stage={"enabled": True},
)


def run_vlm_example():
# __vlm_run_example_start__
"""Run the complete VLM example workflow."""
config = create_vlm_config()
vision_dataset = load_vision_dataset()
Expand All @@ -207,7 +199,7 @@ def run_vlm_example():
print(f"Has multimodal support: {config.prepare_multimodal_stage.get('enabled', False)}")
result = processor(vision_dataset).take_all()
return config, processor, result
# __vlm_run_example_end__
# __vlm_run_example_end__
return None, None, None


Expand Down
18 changes: 10 additions & 8 deletions doc/source/data/doc_code/working-with-llms/vlm_video_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
allowed_local_media_path="/tmp",
),
},
chat_template_stage={"enabled": True},
tokenize_stage={"enabled": True},
detokenize_stage={"enabled": True},
chat_template_stage=True,
tokenize_stage=True,
detokenize_stage=True,
)
# __vlm_video_config_example_end__

Expand Down Expand Up @@ -123,6 +123,7 @@ def video_postprocess(row: dict) -> dict:


def load_video_dataset():
# __vlm_video_load_dataset_example_start__
"""
Load video dataset from ShareGPTVideo Hugging Face dataset.
"""
Expand Down Expand Up @@ -166,7 +167,7 @@ def load_video_dataset():
except Exception as e:
print(f"Error loading dataset: {e}")
return None

# __vlm_video_load_dataset_example_end__

def create_vlm_video_config():
"""Create VLM video configuration."""
Expand All @@ -188,13 +189,14 @@ def create_vlm_video_config():
allowed_local_media_path="/tmp",
),
},
chat_template_stage={"enabled": True},
tokenize_stage={"enabled": True},
detokenize_stage={"enabled": True},
chat_template_stage=True,
tokenize_stage=True,
detokenize_stage=True,
)


def run_vlm_video_example():
# __vlm_video_run_example_start__
"""Run the complete VLM video example workflow."""
config = create_vlm_video_config()
video_dataset = load_video_dataset()
Expand All @@ -210,7 +212,7 @@ def run_vlm_video_example():
print(f"Has multimodal support: {config.prepare_multimodal_stage.get('enabled', False)}")
result = processor(video_dataset).take_all()
return config, processor, result
# __vlm_video_run_example_end__
# __vlm_video_run_example_end__
return None, None, None


Expand Down
Loading