Skip to content

Commit

Permalink
[serve] Simplify application build codepath (#48218)
Browse files Browse the repository at this point in the history
Stacked on: #48223

Reimplements the `.bind()` codepath to avoid using the Ray DAG codepath
which adds a lot of complexity for little benefit.

The DAG traversal code is much simpler, around 100 lines of
self-contained code in `build_app.py`. I've also added unit tests for
it.

There should be no behavior change here.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
edoakes authored Oct 29, 2024
1 parent cd4c34a commit cb52699
Show file tree
Hide file tree
Showing 19 changed files with 573 additions and 604 deletions.
10 changes: 5 additions & 5 deletions python/ray/serve/_private/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,16 @@ def serve_start(
return client


def call_app_builder_with_args_if_necessary(
def call_user_app_builder_with_args_if_necessary(
builder: Union[Application, FunctionType],
args: Dict[str, Any],
) -> Application:
"""Builds a Serve application from an application builder function.
"""Calls a user-provided function that returns Serve application.
If a pre-built application is passed, this is a no-op.
If an Application object is passed, this is a no-op.
Else, we validate the signature of the builder, convert the args dictionary to
the user-annotated Pydantic model if provided, and call the builder function.
Else, we validate the signature of the function, convert the args dictionary to
the user-annotated Pydantic model if provided, and call the function.
The output of the function is returned (must be an Application).
"""
Expand Down
15 changes: 8 additions & 7 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray import cloudpickle
from ray._private.utils import import_attr
from ray.exceptions import RuntimeEnvSetupError
from ray.serve._private.build_app import BuiltApplication, build_app
from ray.serve._private.common import (
DeploymentID,
DeploymentStatus,
Expand Down Expand Up @@ -1141,20 +1142,20 @@ def build_serve_application(
)

try:
from ray.serve._private.api import call_app_builder_with_args_if_necessary
from ray.serve._private.deployment_graph_build import build as pipeline_build
from ray.serve._private.api import call_user_app_builder_with_args_if_necessary

# Import and build the application.
args_info_str = f" with arguments {args}" if args else ""
logger.info(f"Importing application '{name}'{args_info_str}.")

app = call_app_builder_with_args_if_necessary(import_attr(import_path), args)
deployments = pipeline_build(app._get_internal_dag_node(), name)
ingress = deployments[-1]
app = call_user_app_builder_with_args_if_necessary(
import_attr(import_path), args
)

deploy_args_list = []
for deployment in deployments:
is_ingress = deployment.name == ingress.name
built_app: BuiltApplication = build_app(app, name=name)
for deployment in built_app.deployments:
is_ingress = deployment.name == built_app.ingress_deployment_name
deploy_args_list.append(
get_deploy_args(
name=deployment._name,
Expand Down
164 changes: 164 additions & 0 deletions python/ray/serve/_private/build_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import logging
from dataclasses import dataclass
from typing import Generic, List, TypeVar

from ray.dag.py_obj_scanner import _PyObjScanner
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve.deployment import Application, Deployment
from ray.serve.handle import DeploymentHandle

logger = logging.getLogger(SERVE_LOGGER_NAME)

K = TypeVar("K")
V = TypeVar("V")


class IDDict(dict, Generic[K, V]):
"""Dictionary that uses id() for keys instead of hash().
This is necessary because Application objects aren't hashable and we want each
instance to map to a unique key.
"""

def __getitem__(self, key: K) -> V:
return super().__getitem__(id(key))

def __setitem__(self, key: K, value: V):
return super().__setitem__(id(key), value)

def __delitem__(self, key: K):
return super().__delitem__(id(key))

def __contains__(self, key: object):
return super().__contains__(id(key))


@dataclass(frozen=True)
class BuiltApplication:
# Name of the application.
name: str
# Name of the application's 'ingress' deployment
# (the one exposed over gRPC/HTTP/handle).
ingress_deployment_name: str
# List of unique deployments comprising the app.
deployments: List[Deployment]


def build_app(
app: Application,
*,
name: str,
) -> BuiltApplication:
"""Builds the application into a list of finalized deployments.
The following transformations are made:
- Application objects in constructor args/kwargs are converted to
DeploymentHandles for injection at runtime.
- Name conflicts from deployments that use the same class are handled
by appending a monotonically increasing suffix (e.g., SomeClass_1).
Returns: BuiltApplication
"""
deployments = _build_app_recursive(
app,
app_name=name,
handles=IDDict(),
deployment_names=IDDict(),
)
return BuiltApplication(
name=name,
ingress_deployment_name=app._bound_deployment.name,
deployments=deployments,
)


def _build_app_recursive(
app: Application,
*,
app_name: str,
deployment_names: IDDict[Application, str],
handles: IDDict[Application, DeploymentHandle],
) -> List[Deployment]:
"""Recursively traverses the graph of Application objects.
Each Application will have an associated DeploymentHandle created that will replace
it in any occurrences in other Applications' args or kwargs.
Also collects a list of the unique Applications encountered and returns them as
deployable Deployment objects.
"""
# This application has already been encountered.
# There's no need to recurse into its child args and we don't want to create
# a duplicate entry for it in the list of deployments.
if app in handles:
return []

# Create the DeploymentHandle that will be used to replace this application
# in the arguments of its parent(s).
handles[app] = DeploymentHandle(
_get_unique_deployment_name_memoized(app, deployment_names),
app_name=app_name,
)

deployments = []
scanner = _PyObjScanner(source_type=Application)
try:
# Recursively traverse any Application objects bound to init args/kwargs.
child_apps = scanner.find_nodes(
(app._bound_deployment.init_args, app._bound_deployment.init_kwargs)
)
for child_app in child_apps:
deployments.extend(
_build_app_recursive(
child_app,
app_name=app_name,
handles=handles,
deployment_names=deployment_names,
)
)

# Replace Application objects with their corresponding DeploymentHandles.
new_init_args, new_init_kwargs = scanner.replace_nodes(handles)
deployments.append(
app._bound_deployment.options(
name=_get_unique_deployment_name_memoized(app, deployment_names),
_init_args=new_init_args,
_init_kwargs=new_init_kwargs,
)
)
return deployments
finally:
scanner.clear()


def _get_unique_deployment_name_memoized(
app: Application, deployment_names: IDDict[Application, str]
) -> str:
"""Generates a name for the deployment.
This is used to handle collisions when the user does not specify a name
explicitly, so typically we'd use the class name as the default.
In that case, we append a monotonically increasing suffix to the name, e.g.,
Deployment, then Deployment_1, then Deployment_2, ...
Names are memoized in the `deployment_names` dict, which should be passed to
subsequent calls to this function.
"""
if app in deployment_names:
return deployment_names[app]

idx = 1
name = app._bound_deployment.name
while name in deployment_names.values():
name = f"{app._bound_deployment.name}_{idx}"
idx += 1

if idx != 1:
logger.warning(
"There are multiple deployments with the same name "
f"'{app._bound_deployment.name}'. Renaming one to '{name}'."
)

deployment_names[app] = name
return name
63 changes: 42 additions & 21 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ray
from ray.actor import ActorHandle
from ray.serve._private.application_state import StatusOverview
from ray.serve._private.build_app import BuiltApplication
from ray.serve._private.common import (
DeploymentID,
DeploymentStatus,
Expand All @@ -23,6 +24,7 @@
from ray.serve._private.controller import ServeController
from ray.serve._private.deploy_utils import get_deploy_args
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.utils import get_random_string
from ray.serve.config import HTTPOptions
from ray.serve.exceptions import RayServeException
from ray.serve.generated.serve_pb2 import DeploymentArgs, DeploymentRoute
Expand Down Expand Up @@ -246,24 +248,26 @@ def _wait_for_application_running(self, name: str, timeout_s: int = -1):
@_ensure_connected
def deploy_application(
self,
name,
deployments: List[Dict],
_blocking: bool = True,
):
ingress_route_prefix = None
built_app: BuiltApplication,
*,
blocking: bool,
route_prefix: Optional[str],
logging_config: Optional[Union[Dict, LoggingConfig]],
) -> DeploymentHandle:
deployment_args_list = []
for deployment in deployments:
if deployment["ingress"]:
ingress_route_prefix = deployment["route_prefix"]
for deployment in built_app.deployments:
if deployment.logging_config is None and logging_config:
deployment = deployment.options(logging_config=logging_config)

is_ingress = deployment.name == built_app.ingress_deployment_name
deployment_args = get_deploy_args(
deployment["name"],
replica_config=deployment["replica_config"],
ingress=deployment["ingress"],
deployment_config=deployment["deployment_config"],
version=deployment["version"],
route_prefix=deployment["route_prefix"],
docs_path=deployment["docs_path"],
deployment.name,
ingress=is_ingress,
replica_config=deployment._replica_config,
deployment_config=deployment._deployment_config,
version=deployment._version or get_random_string(),
route_prefix=route_prefix if is_ingress else None,
docs_path=deployment._docs_path,
)

deployment_args_proto = DeploymentArgs()
Expand All @@ -283,14 +287,31 @@ def deploy_application(

deployment_args_list.append(deployment_args_proto.SerializeToString())

ray.get(self._controller.deploy_application.remote(name, deployment_args_list))
if _blocking:
self._wait_for_application_running(name)
if ingress_route_prefix is not None:
url_part = " at " + self._root_url + ingress_route_prefix
ray.get(
self._controller.deploy_application.remote(
built_app.name, deployment_args_list
)
)

# The deployment state is not guaranteed to be created after
# deploy_application returns; the application state manager will
# need another reconcile iteration to create it.
self._wait_for_deployment_created(
built_app.ingress_deployment_name, built_app.name
)
handle = self.get_handle(
built_app.ingress_deployment_name, built_app.name, check_exists=False
)

if blocking:
self._wait_for_application_running(built_app.name)
if route_prefix is not None:
url_part = " at " + self._root_url + route_prefix
else:
url_part = ""
logger.info(f"Application '{name}' is ready{url_part}.")
logger.info(f"Application '{built_app.name}' is ready{url_part}.")

return handle

@_ensure_connected
def deploy_apps(
Expand Down
Loading

0 comments on commit cb52699

Please sign in to comment.