Skip to content

Commit

Permalink
[ray.serve.llm] OSS LLM Serving (#50643)
Browse files Browse the repository at this point in the history
This PR adds all the components required to run llm serving application.
Main components added are:
- `build_vllm_deployment`
- `build_openai_app`
- `VLLMDeployment`
- `LLMModelRouterDeployment`


Signed-off-by: Gene Su <e870252314@gmail.com>
  • Loading branch information
GeneDer authored Feb 19, 2025
1 parent 094fde6 commit ee2ed35
Show file tree
Hide file tree
Showing 67 changed files with 9,945 additions and 740 deletions.
34 changes: 32 additions & 2 deletions python/ray/llm/_internal/serve/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
from ray.llm._internal.serve.configs import LLMConfig, ModelLoadingConfig
from ray.llm._internal.serve.observability import setup_observability
from ray.llm._internal.serve.observability.logging.setup import (
disable_vllm_custom_ops_logger_on_cpu_nodes,
)
from ray.llm._internal.serve.configs import (
LLMConfig,
ModelLoadingConfig,
LLMServingArgs,
)
from ray.llm._internal.serve.deployments import VLLMDeployment
from ray.llm._internal.serve.deployments import LLMModelRouterDeployment
from ray.llm._internal.serve.builders import build_vllm_deployment, build_openai_app

__all__ = ["LLMConfig", "ModelLoadingConfig"]
# Set up observability
disable_vllm_custom_ops_logger_on_cpu_nodes()
setup_observability()


def _worker_process_setup_hook():
"""Noop setup hook used for ENABLE_WORKER_PROCESS_SETUP_HOOK
(see python/ray/llm/_internal/serve/configs/constants.py)."""
pass


__all__ = [
"LLMConfig",
"ModelLoadingConfig",
"LLMServingArgs",
"VLLMDeployment",
"LLMModelRouterDeployment",
"build_vllm_deployment",
"build_openai_app",
]
6 changes: 6 additions & 0 deletions python/ray/llm/_internal/serve/builders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ray.llm._internal.serve.builders.application_builders import (
build_vllm_deployment,
build_openai_app,
)

__all__ = ["build_vllm_deployment", "build_openai_app"]
152 changes: 152 additions & 0 deletions python/ray/llm/_internal/serve/builders/application_builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import List, Optional, Sequence

import ray
from ray.serve.deployment import Application
from ray.serve.handle import DeploymentHandle

from ray.llm._internal.serve.observability.logging import get_logger
from ray.llm._internal.serve.deployments.llm.vllm.vllm_deployment import VLLMDeployment
from ray.llm._internal.serve.configs.server_models import (
LLMConfig,
LLMServingArgs,
LLMEngine,
)
from ray.llm._internal.serve.deployments.routers.router import LLMModelRouterDeployment
from ray.llm._internal.serve.configs.constants import (
ENABLE_WORKER_PROCESS_SETUP_HOOK,
)

logger = get_logger(__name__)


def _set_deployment_placement_options(llm_config: LLMConfig) -> dict:
deployment_config = llm_config.deployment_config.model_copy(deep=True).model_dump()
engine_config = llm_config.get_engine_config()

ray_actor_options = deployment_config["ray_actor_options"] or {}
deployment_config["ray_actor_options"] = ray_actor_options

replica_actor_resources = {
"CPU": ray_actor_options.get("num_cpus", 1),
"GPU": ray_actor_options.get("num_gpus", 0),
**ray_actor_options.get("resources", {}),
}
if "memory" in ray_actor_options:
replica_actor_resources["memory"] = ray_actor_options["memory"]

if (
"placement_group_bundles" in deployment_config
or "placement_group_strategy" in deployment_config
):
raise ValueError(
"placement_group_bundles and placement_group_strategy must not be specified in deployment_config. "
"Use scaling_config to configure replica placement group."
)

# TODO (Kourosh): There is some test code leakage happening here that should be removed.
try:
# resources.mock_resource is a special key we used in tests to skip placement
# group on the gpu nodes.
if "mock_resource" in ray_actor_options.get("resources", {}):
bundles = []
else:
bundles = engine_config.placement_bundles
except ValueError:
# May happen if all bundles are empty.
bundles = []

bundles = [replica_actor_resources] + bundles
deployment_config.update(
{
"placement_group_bundles": bundles,
"placement_group_strategy": engine_config.placement_strategy,
}
)

return deployment_config


def _get_deployment_name(llm_config: LLMConfig, name_prefix: str):
unsanitized_deployment_name = name_prefix + llm_config.model_id
return unsanitized_deployment_name.replace("/", "--").replace(".", "_")


def get_serve_deployment_args(
llm_config: LLMConfig,
*,
name_prefix: str,
default_runtime_env: Optional[dict] = None,
):
deployment_config = _set_deployment_placement_options(llm_config)

if default_runtime_env:
ray_actor_options = deployment_config.get("ray_actor_options", {})
ray_actor_options["runtime_env"] = {
**default_runtime_env,
# Existing runtime_env should take precedence over the default.
**ray_actor_options.get("runtime_env", {}),
**(llm_config.runtime_env if llm_config.runtime_env else {}),
}
deployment_config["ray_actor_options"] = ray_actor_options

# Set the name of the deployment config to map to the model ID.
deployment_config["name"] = _get_deployment_name(llm_config, name_prefix)
return deployment_config


def build_vllm_deployment(
llm_config: LLMConfig,
deployment_kwargs: Optional[dict] = None,
) -> Application:
if deployment_kwargs is None:
deployment_kwargs = {}

default_runtime_env = ray.get_runtime_context().runtime_env
if ENABLE_WORKER_PROCESS_SETUP_HOOK:
default_runtime_env[
"worker_process_setup_hook"
] = "ray.llm._internal.serve._worker_process_setup_hook"

deployment_options = get_serve_deployment_args(
llm_config,
name_prefix="VLLMDeployment:",
default_runtime_env=default_runtime_env,
)

return VLLMDeployment.options(**deployment_options).bind(
llm_config=llm_config, **deployment_kwargs
)


def _get_llm_deployments(
llm_base_models: Optional[Sequence[LLMConfig]] = None,
deployment_kwargs: Optional[dict] = None,
) -> List[DeploymentHandle]:
llm_deployments = []
for llm_config in llm_base_models:
if llm_config.llm_engine == LLMEngine.VLLM:
llm_deployments.append(build_vllm_deployment(llm_config, deployment_kwargs))
else:
# Note (genesu): This should never happen because we validate the engine
# in the config.
raise ValueError(f"Unsupported engine: {llm_config.llm_engine}")

return llm_deployments


def build_openai_app(llm_serving_args: LLMServingArgs) -> Application:
rayllm_args = LLMServingArgs.model_validate(llm_serving_args).parse_args()

llm_configs = rayllm_args.llm_configs
model_ids = {m.model_id for m in llm_configs}
if len(model_ids) != len(llm_configs):
raise ValueError("Duplicate models found. Make sure model ids are unique.")

if len(llm_configs) == 0:
logger.error(
"List of models is empty. Maybe some parameters cannot be parsed into the LLMConfig config."
)

llm_deployments = _get_llm_deployments(llm_configs)

return LLMModelRouterDeployment.bind(llm_deployments=llm_deployments)
8 changes: 6 additions & 2 deletions python/ray/llm/_internal/serve/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from ray.llm._internal.serve.configs.models import LLMConfig, ModelLoadingConfig
from ray.llm._internal.serve.configs.server_models import (
LLMConfig,
ModelLoadingConfig,
LLMServingArgs,
)

__all__ = ["LLMConfig", "ModelLoadingConfig"]
__all__ = ["LLMConfig", "ModelLoadingConfig", "LLMServingArgs"]
46 changes: 46 additions & 0 deletions python/ray/llm/_internal/serve/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,49 @@
DEFAULT_TARGET_ONGOING_REQUESTS = 16

FALLBACK_MAX_ONGOING_REQUESTS = 64

# If true, a default runtime_env will be injected to import rayllm on worker startup.
# This is a startup time optimization to avoid the latency penalty of sequentially
# importing rayllm in multiple layers of worker processes.
ENABLE_WORKER_PROCESS_SETUP_HOOK = (
os.environ.get("RAYLLM_ENABLE_WORKER_PROCESS_SETUP_HOOK", "1") == "1"
)


CLOUD_OBJECT_MISSING_EXPIRE_S = 30
CLOUD_OBJECT_EXISTS_EXPIRE_S = 60 * 60

# Sentinel object used to indicate that a LoRA adapter config file is missing.
LORA_ADAPTER_CONFIG_NAME = "adapter_config.json"

DEFAULT_HEALTH_CHECK_PERIOD_S = int(
os.getenv("RAYLLM_DEFAULT_HEALTH_CHECK_PERIOD_S", "10")
)
DEFAULT_HEALTH_CHECK_TIMEOUT_S = int(
os.getenv("RAYLLM_DEFAULT_HEALTH_CHECK_TIMEOUT_S", "10")
)
ENGINE_START_TIMEOUT_S = int(os.getenv("RAYLLM_ENGINE_START_TIMEOUT_S", str(60 * 60)))

MIN_NUM_TOPLOGPROBS_ALLOWED = 0
MAX_NUM_TOPLOGPROBS_ALLOWED = 5
MODEL_RESPONSE_BATCH_TIMEOUT_MS = float(
os.getenv("RAYLLM_MODEL_RESPONSE_BATCH_TIMEOUT_MS", "200")
)
RAYLLM_ENABLE_REQUEST_PROMPT_LOGS = (
os.environ.get("RAYLLM_ENABLE_REQUEST_PROMPT_LOGS", "1") == "1"
)
RAYLLM_GUIDED_DECODING_BACKEND = os.environ.get(
"RAYLLM_GUIDED_DECODING_BACKEND", "xgrammar"
)

MAX_NUM_STOPPING_SEQUENCES = int(os.getenv("RAYLLM_MAX_NUM_STOPPING_SEQUENCES", "8"))
ENV_VARS_TO_PROPAGATE = {
"HUGGING_FACE_HUB_TOKEN",
"HF_TOKEN",
}
# timeout in 10 minutes. Streaming can take longer than 3 min
RAYLLM_ROUTER_HTTP_TIMEOUT = float(os.environ.get("RAYLLM_ROUTER_HTTP_TIMEOUT", 600))

ENABLE_VERBOSE_TELEMETRY = bool(int(os.getenv("RAYLLM_ENABLE_VERBOSE_TELEMETRY", "0")))

RAYLLM_VLLM_ENGINE_CLS_ENV = "RAYLLM_VLLM_ENGINE_CLS"
90 changes: 90 additions & 0 deletions python/ray/llm/_internal/serve/configs/error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# TODO (genesu): revisit these data structures
from pydantic import ValidationError as PydanticValidationError
from abc import ABC, abstractmethod


class ValidationError(ValueError):
status_code = 400
pass


class ValidationErrorWithPydantic(ValidationError):
"""Wraps a PydanticValidationError to be used as a ValidationError.
This is necessary as pydantic.ValidationError cannot be subclassed,
which causes errors when Ray tries to wrap it in a
RayTaskError/RayActorError."""

def __init__(self, exc: PydanticValidationError) -> None:
self.exc = exc
# BaseException implements a __reduce__ method that returns
# a tuple with the type and the value of self.args.
# https://stackoverflow.com/a/49715949/2213289
self.args = (exc,)

def __getattr__(self, name):
return getattr(self.exc, name)

def __repr__(self) -> str:
return self.exc.__repr__()

def __str__(self) -> str:
return self.exc.__str__()


class PromptTooLongError(ValidationError):
pass


class TooManyStoppingSequencesError(ValidationError):
pass


class ErrorReason(ABC):
@abstractmethod
def get_message(self) -> str:
raise NotImplementedError

def __str__(self) -> str:
return self.get_message()

@property
@abstractmethod
def exception(self) -> Exception:
raise NotImplementedError

def raise_exception(self) -> Exception:
raise self.exception


class InputTooLong(ErrorReason):
def __init__(self, num_tokens: int, max_num_tokens: int) -> None:
self.num_tokens = num_tokens
self.max_num_tokens = max_num_tokens

def get_message(self) -> str:
if self.num_tokens < 0:
return f"Input too long. The maximum input length is {self.max_num_tokens} tokens."
return f"Input too long. Recieved {self.num_tokens} tokens, but the maximum input length is {self.max_num_tokens} tokens."

@property
def exception(self) -> Exception:
return PromptTooLongError(self.get_message())


class TooManyStoppingSequences(ErrorReason):
def __init__(
self, num_stopping_sequences: int, max_num_stopping_sequences: int
) -> None:
self.num_stopping_sequences = num_stopping_sequences
self.max_num_stopping_sequences = max_num_stopping_sequences

def get_message(self) -> str:
return (
f"Too many stopping sequences. Recieved {self.num_stopping_sequences} stopping sequences,"
f"but the maximum is {self.max_num_stopping_sequences}. Please reduce the number of provided stopping sequences."
)

@property
def exception(self) -> Exception:
return TooManyStoppingSequencesError(self.get_message())
Loading

0 comments on commit ee2ed35

Please sign in to comment.