Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Deployment Graph] Move Deployment creation outside to build function #26129

Merged
merged 5 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial commit with tests working
  • Loading branch information
jiaodong committed Jun 27, 2022
commit cb34c942197536ff4b58140c6f45df12672a108a
92 changes: 88 additions & 4 deletions python/ray/serve/deployment_graph_build.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import inspect
import json
from typing import List
from collections import OrderedDict

from ray.serve.deployment import Deployment
from ray.serve.deployment import Deployment, schema_to_deployment
from ray.serve.deployment_graph import RayServeDAGHandle
from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_node import DeploymentNode
Expand All @@ -11,6 +12,9 @@
from ray.serve.deployment_method_executor_node import DeploymentMethodExecutorNode
from ray.serve.deployment_function_executor_node import DeploymentFunctionExecutorNode
from ray.serve.json_serde import DAGNodeEncoder
from ray.serve.handle import RayServeLazySyncHandle
from ray.serve.schema import DeploymentSchema
from ray.serve.config import DeploymentConfig


from ray.dag import (
Expand Down Expand Up @@ -133,13 +137,93 @@ def transform_ray_dag_to_serve_dag(
"""
if isinstance(dag_node, ClassNode):
deployment_name = node_name_generator.get_node_name(dag_node)

# Deployment can be passed into other DAGNodes as init args. This is
# supported pattern in ray DAG that user can instantiate and pass class
# instances as init args to others.

# However in ray serve we send init args via .remote() that requires
# pickling, and all DAGNode types are not picklable by design.

# Thus we need convert all DeploymentNode used in init args into
# deployment handles (executable and picklable) in ray serve DAG to make
# serve DAG end to end executable.
def replace_with_handle(node):
if isinstance(node, DeploymentNode):
return RayServeLazySyncHandle(node._deployment.name)
elif isinstance(node, DeploymentExecutorNode):
return node._deployment_handle

(
replaced_deployment_init_args,
replaced_deployment_init_kwargs,
) = dag_node.apply_functional(
[dag_node.get_args(), dag_node.get_kwargs()],
predictate_fn=lambda node: isinstance(
node,
# We need to match and replace all DAGNodes even though they
# could be None, because no DAGNode replacement should run into
# re-resolved child DAGNodes, otherwise with KeyError
(
DeploymentNode,
DeploymentMethodNode,
DeploymentFunctionNode,
DeploymentExecutorNode,
DeploymentFunctionExecutorNode,
DeploymentMethodExecutorNode,
),
),
apply_fn=replace_with_handle,
)

if "deployment_schema" in dag_node._bound_other_args_to_resolve:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the conditionals? the else case only covers @ray.remote decorated classes, which we don't support mixing in right now

deployment_schema: DeploymentSchema = dag_node._bound_other_args_to_resolve[
"deployment_schema"
]
deployment_shell = schema_to_deployment(deployment_schema)

# Prefer user specified name to override the generated one.
if (
inspect.isclass(dag_node._body)
and deployment_shell.name != dag_node._body.__name__
):
deployment_name = deployment_shell.name

# Set the route prefix, prefer the one user supplied,
# otherwise set it to /deployment_name
if (
deployment_shell.route_prefix is None
or deployment_shell.route_prefix != f"/{deployment_shell.name}"
):
route_prefix = deployment_shell.route_prefix
else:
route_prefix = f"/{deployment_name}"

deployment = deployment_shell.options(
func_or_class=dag_node._body,
name=deployment_name,
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
route_prefix=route_prefix,
)
else:
# ClassNode is created via bind on ray.remote decorated class with
# no serve specific configs.
deployment: Deployment = Deployment(
dag_node._body,
deployment_name,
DeploymentConfig(),
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
ray_actor_options=dag_node.get_options(),
_internal=True,
)

return DeploymentNode(
dag_node._body,
deployment_name,
deployment,
dag_node.get_args(),
dag_node.get_kwargs(),
dag_node.get_options(),
# TODO: (jiaodong) Support .options(metadata=xxx) for deployment
other_args_to_resolve=dag_node.get_other_args_to_resolve(),
)

Expand Down
103 changes: 5 additions & 98 deletions python/ray/serve/deployment_node.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
import inspect
from typing import Any, Callable, Dict, Optional, List, Tuple, Union
from typing import Any, Dict, Optional, List, Tuple

from ray.dag import DAGNode
from ray.serve.deployment_executor_node import DeploymentExecutorNode
from ray.serve.deployment_function_executor_node import (
DeploymentFunctionExecutorNode,
)
from ray.serve.deployment_method_executor_node import (
DeploymentMethodExecutorNode,
)
from ray.serve.handle import RayServeLazySyncHandle

from ray.dag.constants import PARENT_CLASS_NODE_KEY
from ray.dag.format_utils import get_dag_node_str
from ray.serve.deployment_method_node import DeploymentMethodNode
from ray.serve.deployment_function_node import DeploymentFunctionNode
from ray.serve.deployment import Deployment, schema_to_deployment
from ray.serve.config import DeploymentConfig
from ray.serve.schema import DeploymentSchema
from ray.serve.deployment import Deployment


class DeploymentNode(DAGNode):
Expand All @@ -27,8 +16,7 @@ def __init__(
self,
# For serve structured deployment, deployment body can be import path
# to the class or function instead.
func_or_class: Union[Callable, str],
deployment_name: str,
deployment: Deployment,
deployment_init_args: Tuple[Any],
deployment_init_kwargs: Dict[str, Any],
ray_actor_options: Dict[str, Any],
Expand All @@ -41,87 +29,7 @@ def __init__(
ray_actor_options,
other_args_to_resolve=other_args_to_resolve,
)
# Deployment can be passed into other DAGNodes as init args. This is
# supported pattern in ray DAG that user can instantiate and pass class
# instances as init args to others.

# However in ray serve we send init args via .remote() that requires
# pickling, and all DAGNode types are not picklable by design.

# Thus we need convert all DeploymentNode used in init args into
# deployment handles (executable and picklable) in ray serve DAG to make
# serve DAG end to end executable.
# TODO(jiaodong): This part does some magic for DAGDriver and will throw
# error with weird pickle replace table error. Move this out.
def replace_with_handle(node):
if isinstance(node, DeploymentNode):
return RayServeLazySyncHandle(node._deployment.name)
elif isinstance(node, DeploymentExecutorNode):
return node._deployment_handle

(
replaced_deployment_init_args,
replaced_deployment_init_kwargs,
) = self.apply_functional(
[deployment_init_args, deployment_init_kwargs],
predictate_fn=lambda node: isinstance(
node,
# We need to match and replace all DAGNodes even though they
# could be None, because no DAGNode replacement should run into
# re-resolved child DAGNodes, otherwise with KeyError
(
DeploymentNode,
DeploymentMethodNode,
DeploymentFunctionNode,
DeploymentExecutorNode,
DeploymentFunctionExecutorNode,
DeploymentMethodExecutorNode,
),
),
apply_fn=replace_with_handle,
)

if "deployment_schema" in self._bound_other_args_to_resolve:
deployment_schema: DeploymentSchema = self._bound_other_args_to_resolve[
"deployment_schema"
]
deployment_shell = schema_to_deployment(deployment_schema)

# Prefer user specified name to override the generated one.
if (
inspect.isclass(func_or_class)
and deployment_shell.name != func_or_class.__name__
):
deployment_name = deployment_shell.name

# Set the route prefix, prefer the one user supplied,
# otherwise set it to /deployment_name
if (
deployment_shell.route_prefix is None
or deployment_shell.route_prefix != f"/{deployment_shell.name}"
):
route_prefix = deployment_shell.route_prefix
else:
route_prefix = f"/{deployment_name}"

self._deployment = deployment_shell.options(
func_or_class=func_or_class,
name=deployment_name,
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
route_prefix=route_prefix,
)
else:
self._deployment: Deployment = Deployment(
func_or_class,
deployment_name,
# TODO: (jiaodong) Support deployment config from user input
DeploymentConfig(),
init_args=replaced_deployment_init_args,
init_kwargs=replaced_deployment_init_kwargs,
ray_actor_options=ray_actor_options,
_internal=True,
)
self._deployment = deployment
self._deployment_handle = RayServeLazySyncHandle(self._deployment.name)

def _copy_impl(
Expand All @@ -132,8 +40,7 @@ def _copy_impl(
new_other_args_to_resolve: Dict[str, Any],
):
return DeploymentNode(
self._deployment.func_or_class,
self._deployment.name,
self._deployment,
new_args,
new_kwargs,
new_options,
Expand Down