Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Allow multiple HTTP servers. #9523

Merged
merged 19 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ci/long_running_tests/workloads/serve_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ def __init__(self, kill_period_s=1):

def _get_all_serve_actors(self):
master = serve.api._get_controller()
[router] = ray.get(master.get_router.remote())
[http_proxy] = ray.get(master.get_http_proxy.remote())
all_handles = [master, router, http_proxy]
routers = ray.get(master.get_router.remote())
all_handles = routers + [master]
worker_handle_dict = ray.get(master.get_all_worker_handles.remote())
for _, replica_dict in worker_handle_dict.items():
all_handles.extend(list(replica_dict.values()))
Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ py_test(
)


py_test(
name = "test_scaling",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive"],
deps = [":serve_lib"],
)

py_test(
name = "test_util",
size = "small",
Expand Down
44 changes: 28 additions & 16 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ def __call__(self, *, python_arg=None):
return f


def init(name=None,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
metric_exporter=InMemoryExporter):
def init(
name=None,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
metric_exporter=InMemoryExporter,
):
"""Initialize or connect to a serve cluster.

If serve cluster is already initialized, this function will just return.
Expand All @@ -71,11 +73,12 @@ def init(name=None,
name (str): A unique name for this serve instance. This allows
multiple serve instances to run on the same ray cluster. Must be
specified in all subsequent serve.init() calls.
http_host (str): Host for HTTP server. Default to "0.0.0.0".
http_port (int): Port for HTTP server. Default to 8000.
http_host (str): Host for HTTP servers. Default to "0.0.0.0". Serve
starts one HTTP server per node in the Ray cluster.
http_port (int, List[int]): Port for HTTP server. Default to 8000.
metric_exporter(ExporterInterface): The class aggregates metrics from
all RayServe actors and optionally export them to external
services. RayServe has two options built in: InMemoryExporter and
services. Ray Serve has two options built in: InMemoryExporter and
PrometheusExporter
"""
if name is not None and not isinstance(name, str):
Expand All @@ -94,19 +97,27 @@ def init(name=None,
except ValueError:
pass

# TODO(edoakes): for now, always start the HTTP proxy on the node that
# serve.init() was run on. We should consider making this configurable
# in the future.
http_node_id = ray.state.current_node_id()
controller = ServeController.options(
name=controller_name,
max_restarts=-1,
max_task_retries=-1,
).remote(name, http_node_id, http_host, http_port, metric_exporter)
).remote(
name,
http_host,
http_port,
metric_exporter,
)

block_until_http_ready(
"http://{}:{}/-/routes".format(http_host, http_port),
timeout=HTTP_PROXY_TIMEOUT)
futures = []
for node_id in ray.state.node_ids():
future = block_until_http_ready.options(
num_cpus=0, resources={
node_id: 0.01
}).remote(
"http://{}:{}/-/routes".format(http_host, http_port),
timeout=HTTP_PROXY_TIMEOUT)
futures.append(future)
ray.get(futures)
Comment on lines +112 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is pretty clever :)



@_ensure_connected
Expand All @@ -122,6 +133,7 @@ def shutdown():
controller = None


@_ensure_connected
edoakes marked this conversation as resolved.
Show resolved Hide resolved
def create_endpoint(endpoint_name,
*,
backend=None,
Expand Down Expand Up @@ -356,7 +368,7 @@ def get_handle(endpoint_name,
assert endpoint_name in ray.get(controller.get_all_endpoints.remote())

return RayServeHandle(
ray.get(controller.get_http_proxy.remote())[0],
ray.get(controller.get_router.remote())[0],
endpoint_name,
relative_slo_ms,
absolute_slo_ms,
Expand Down
153 changes: 97 additions & 56 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from collections import defaultdict, namedtuple
from itertools import groupby
import os
import random
import time
Expand Down Expand Up @@ -87,8 +88,8 @@ class ServeController:
requires all implementations here to be idempotent.
"""

async def __init__(self, instance_name, http_node_id, http_proxy_host,
http_proxy_port, metric_exporter_class):
async def __init__(self, instance_name, http_proxy_host, http_proxy_port,
metric_exporter_class):
# Unique name of the serve instance managed by this actor. Used to
# namespace child actors and checkpoints.
self.instance_name = instance_name
Expand Down Expand Up @@ -121,15 +122,13 @@ async def __init__(self, instance_name, http_node_id, http_proxy_host,
self.write_lock = asyncio.Lock()

# Cached handles to actors in the system.
self.router = None
self.http_proxy = None
self.routers = []
self.metric_exporter = None

# If starting the actor for the first time, starts up the other system
# components. If recovering, fetches their actor handles.
self._get_or_start_metric_exporter(metric_exporter_class)
self._get_or_start_http_proxy(http_node_id, http_proxy_host,
http_proxy_port)
self._get_or_start_routers(http_proxy_host, http_proxy_port)

# NOTE(edoakes): unfortunately, we can't completely recover from a
# checkpoint in the constructor because we block while waiting for
Expand All @@ -151,40 +150,42 @@ async def __init__(self, instance_name, http_node_id, http_proxy_host,
asyncio.get_event_loop().create_task(
self._recover_from_checkpoint(checkpoint))

def _get_or_start_http_proxy(self, node_id, host, port):
def _get_or_start_routers(self, host, port):
"""Get the HTTP proxy belonging to this serve instance.

If the HTTP proxy does not already exist, it will be started.
"""
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.instance_name)
try:
self.http_proxy = ray.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}'".format(
proxy_name, node_id))
self.http_proxy = HTTPProxyActor.options(
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
max_task_retries=-1,
resources={
node_id: 0.01
},
).remote(
host, port, instance_name=self.instance_name)

# Since router is a merged with HTTP proxy actor, the router will be
# proxied via the HTTP actor. Even though the two variable names are
# pointing to the same object, their semantic differences make the code
# more readable. (e.g. http_proxy.set_route_table, router.add_worker)
self.router = self.http_proxy

def get_http_proxy(self):
# TODO(simon): We don't handle nodes being added/removed. To do that,
# we should implement some sort of control loop in master actor.
for _, node_id_group in groupby(sorted(ray.state.node_ids())):
for index, node_id in enumerate(node_id_group):
proxy_name = format_actor_name(SERVE_PROXY_NAME,
self.instance_name)
proxy_name += "-{}-{}".format(node_id, index)
try:
router = ray.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}' "
"listening on port {}".format(proxy_name, node_id,
port))
router = HTTPProxyActor.options(
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
max_task_retries=-1,
resources={
node_id: 0.01
},
).remote(
host, port, instance_name=self.instance_name)
self.routers.append(router)

def get_router(self):
"""Returns a handle to the HTTP proxy managed by this actor."""
return [self.http_proxy]
return self.routers

def get_http_proxy_config(self):
def get_router_config(self):
"""Called by the HTTP proxy on startup to fetch required state."""
return self.routes

Expand Down Expand Up @@ -267,20 +268,31 @@ async def _recover_from_checkpoint(self, checkpoint_bytes):
# Push configuration state to the router.
# TODO(edoakes): should we make this a pull-only model for simplicity?
for endpoint, traffic_policy in self.traffic_policies.items():
await self.router.set_traffic.remote(endpoint, traffic_policy)
await asyncio.gather(*[
router.set_traffic.remote(endpoint, traffic_policy)
for router in self.routers
])

for backend_tag, replica_dict in self.workers.items():
for replica_tag, worker in replica_dict.items():
await self.router.add_new_worker.remote(
backend_tag, replica_tag, worker)
await asyncio.gather(*[
router.add_new_worker.remote(backend_tag, replica_tag,
worker)
for router in self.routers
])

for backend, info in self.backends.items():
await self.router.set_backend_config.remote(
backend, info.backend_config)
await asyncio.gather(*[
router.set_backend_config.remote(backend, info.backend_config)
for router in self.routers
])
await self.broadcast_backend_config(backend)

# Push configuration state to the HTTP proxy.
await self.http_proxy.set_route_table.remote(self.routes)
await asyncio.gather(*[
router.set_route_table.remote(self.routes)
for router in self.routers
])

# Start/stop any pending backend replicas.
await self._start_pending_replicas()
Expand Down Expand Up @@ -353,8 +365,11 @@ async def _start_replica(self, backend_tag, replica_tag):
self.workers[backend_tag][replica_tag] = worker_handle

# Register the worker with the router.
await self.router.add_new_worker.remote(backend_tag, replica_tag,
worker_handle)
await asyncio.gather(*[
router.add_new_worker.remote(backend_tag, replica_tag,
worker_handle)
for router in self.routers
])

async def _start_pending_replicas(self):
"""Starts the pending backend replicas in self.replicas_to_start.
Expand Down Expand Up @@ -392,8 +407,10 @@ async def _stop_pending_replicas(self):
continue

# Remove the replica from router. This call is idempotent.
await self.router.remove_worker.remote(backend_tag,
replica_tag)
await asyncio.gather(*[
router.remove_worker.remote(backend_tag, replica_tag)
for router in self.routers
])

# TODO(edoakes): this logic isn't ideal because there may be
# pending tasks still executing on the replica. However, if we
Expand All @@ -410,7 +427,10 @@ async def _remove_pending_backends(self):
Clears self.backends_to_remove.
"""
for backend_tag in self.backends_to_remove:
await self.router.remove_backend.remote(backend_tag)
await asyncio.gather(*[
router.remove_backend.remote(backend_tag)
for router in self.routers
])
self.backends_to_remove.clear()

async def _remove_pending_endpoints(self):
Expand All @@ -419,7 +439,10 @@ async def _remove_pending_endpoints(self):
Clears self.endpoints_to_remove.
"""
for endpoint_tag in self.endpoints_to_remove:
await self.router.remove_endpoint.remote(endpoint_tag)
await asyncio.gather(*[
router.remove_endpoint.remote(endpoint_tag)
for router in self.routers
])
self.endpoints_to_remove.clear()

def _scale_replicas(self, backend_tag, num_replicas):
Expand Down Expand Up @@ -533,7 +556,10 @@ async def _set_traffic(self, endpoint_name, traffic_dict):
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
await self.router.set_traffic.remote(endpoint_name, traffic_policy)
await asyncio.gather(*[
router.set_traffic.remote(endpoint_name, traffic_policy)
for router in self.routers
])

async def set_traffic(self, endpoint_name, traffic_dict):
"""Sets the traffic policy for the specified endpoint."""
Expand All @@ -560,8 +586,12 @@ async def shadow_traffic(self, endpoint_name, backend_tag, proportion):
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
await self.router.set_traffic.remote(
endpoint_name, self.traffic_policies[endpoint_name])
await asyncio.gather(*[
router.set_traffic.remote(
endpoint_name,
self.traffic_policies[endpoint_name],
) for router in self.routers
])

async def create_endpoint(self, endpoint, traffic_dict, route, methods):
"""Create a new endpoint with the specified route and methods.
Expand Down Expand Up @@ -600,7 +630,10 @@ async def create_endpoint(self, endpoint, traffic_dict, route, methods):

# NOTE(edoakes): checkpoint is written in self._set_traffic.
await self._set_traffic(endpoint, traffic_dict)
await self.http_proxy.set_route_table.remote(self.routes)
await asyncio.gather(*[
router.set_route_table.remote(self.routes)
for router in self.routers
])

async def delete_endpoint(self, endpoint):
"""Delete the specified endpoint.
Expand Down Expand Up @@ -635,7 +668,10 @@ async def delete_endpoint(self, endpoint):

# Update the HTTP proxy first to ensure no new requests for the
# endpoint are sent to the router.
await self.http_proxy.set_route_table.remote(self.routes)
await asyncio.gather(*[
router.set_route_table.remote(self.routes)
for router in self.routers
])
await self._remove_pending_endpoints()

async def create_backend(self, backend_tag, backend_config,
Expand All @@ -660,8 +696,10 @@ async def create_backend(self, backend_tag, backend_config,

# Set the backend config inside the router
# (particularly for max-batch-size).
await self.router.set_backend_config.remote(
backend_tag, backend_config)
await asyncio.gather(*[
router.set_backend_config.remote(backend_tag, backend_config)
for router in self.routers
])
await self.broadcast_backend_config(backend_tag)

async def delete_backend(self, backend_tag):
Expand Down Expand Up @@ -717,8 +755,10 @@ async def update_backend_config(self, backend_tag, config_options):

# Inform the router about change in configuration
# (particularly for setting max_batch_size).
await self.router.set_backend_config.remote(
backend_tag, backend_config)
await asyncio.gather(*[
router.set_backend_config.remote(backend_tag, backend_config)
for router in self.routers
])

await self._start_pending_replicas()
await self._stop_pending_replicas()
Expand Down Expand Up @@ -748,7 +788,8 @@ def get_backend_config(self, backend_tag):
async def shutdown(self):
"""Shuts down the serve instance completely."""
async with self.write_lock:
ray.kill(self.http_proxy, no_restart=True)
for router in self.routers:
ray.kill(router, no_restart=True)
ray.kill(self.metric_exporter, no_restart=True)
for replica_dict in self.workers.values():
for replica in replica_dict.values():
Expand Down
Loading