Skip to content

[Multi Modal] Add an env var for message queue max chunk bytes #19242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions vllm/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
VLLM_MAX_TOKENS_PER_EXPERT_FP4_MOE: int = 163840
VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1
VLLM_SLEEP_WHEN_IDLE: bool = False
VLLM_MQ_MAX_CHUNK_BYTES_MB: int = 16


def get_default_cache_root():
Expand Down Expand Up @@ -847,6 +848,12 @@ def get_vllm_port() -> Optional[int]:
# latency penalty when a request eventually comes.
"VLLM_SLEEP_WHEN_IDLE":
lambda: bool(int(os.getenv("VLLM_SLEEP_WHEN_IDLE", "0"))),

# Control the max chunk bytes (in MB) for the rpc message queue.
# Object larger than this threshold will be broadcast to worker
# processes via zmq.
"VLLM_MQ_MAX_CHUNK_BYTES_MB":
lambda: int(os.getenv("VLLM_MQ_MAX_CHUNK_BYTES_MB", "16")),
}

# --8<-- [end:env-vars-definition]
Expand Down
6 changes: 5 additions & 1 deletion vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cloudpickle

import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.distributed import (destroy_distributed_environment,
destroy_model_parallel)
Expand Down Expand Up @@ -72,7 +73,10 @@ def _init_executor(self) -> None:

# Initialize worker and set up message queues for SchedulerOutputs
# and ModelRunnerOutputs
self.rpc_broadcast_mq = MessageQueue(self.world_size, self.world_size)
max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
self.rpc_broadcast_mq = MessageQueue(self.world_size,
self.world_size,
max_chunk_bytes=max_chunk_bytes)
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()

# Create workers
Expand Down