Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update named actor API #8559

Merged
merged 3 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions doc/source/rllib-training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,10 @@ For even finer-grained control over training, you can use RLlib's lower-level `b

Global Coordination
~~~~~~~~~~~~~~~~~~~
Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *named actors* (learn more about Ray actors `here <actors.html>`__). As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program:
Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *detached actors* (learn more about Ray actors `here <actors.html>`__). These actors are assigned a global name and handles to them can be retrieved using these names. As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program:

.. code-block:: python

from ray.util import named_actors

@ray.remote
class Counter:
def __init__(self):
Expand All @@ -483,12 +481,11 @@ Sometimes, it is necessary to coordinate between pieces of code that live in dif
return self.count

# on the driver
counter = Counter.remote()
named_actors.register_actor("global_counter", counter)
counter = Counter.options(name="global_counter").remote()
print(ray.get(counter.get.remote())) # get the latest count

# in your envs
counter = named_actors.get_actor("global_counter")
counter = ray.get_actor("global_counter")
counter.inc.remote(1) # async call to increment the global count

Ray actors provide high levels of performance, so in more complex cases they can be used implement communication patterns such as parameter servers and allreduce.
Expand Down
2 changes: 2 additions & 0 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
connect,
disconnect,
get,
get_actor,
get_gpu_ids,
get_resource_ids,
get_webui_url,
Expand Down Expand Up @@ -126,6 +127,7 @@
"connect",
"disconnect",
"get",
"get_actor",
"get_gpu_ids",
"get_resource_ids",
"get_webui_url",
Expand Down
40 changes: 17 additions & 23 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,7 @@ def _remote(self,
asyncio execution. Note that the execution order is not
guaranteed when max_concurrency > 1.
name: The globally unique name for the actor.
detached: Whether the actor should be kept alive after driver
exits.
detached: DEPRECATED.

Returns:
A handle to the newly created actor.
Expand Down Expand Up @@ -469,18 +468,16 @@ def _remote(self,
raise RuntimeError("Actors cannot be created before ray.init() "
"has been called.")

if detached and name is None:
raise ValueError("Detached actors must be named. "
"Please use Actor._remote(name='some_name') "
"to associate the name.")
if detached:
logger.warning("The detached flag is deprecated. To create a "
"detached actor, use the name parameter.")

if name and not detached:
raise ValueError("Only detached actors can be named. "
"Please use Actor._remote(detached=True, "
"name='some_name').")

if name == "":
raise ValueError("Actor name cannot be an empty string.")
if name is not None:
if not isinstance(name, str):
raise TypeError("name must be None or a string, "
"got: '{}'.".format(type(name)))
if name == "":
raise ValueError("Actor name cannot be an empty string.")

# Check whether the name is already taken.
# TODO(edoakes): this check has a race condition because two drivers
Expand All @@ -489,14 +486,17 @@ def _remote(self,
# async call.
if name is not None:
try:
ray.util.get_actor(name)
ray.get_actor(name)
except ValueError: # Name is not taken.
pass
else:
raise ValueError(
"The name {name} is already taken. Please use "
"a different name or get existing actor using "
"ray.util.get_actor('{name}')".format(name=name))
"a different name or get the existing actor using "
"ray.get_actor('{name}')".format(name=name))
detached = True
else:
detached = False

# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
Expand Down Expand Up @@ -583,7 +583,7 @@ def _remote(self,
original_handle=True)

if name is not None and not gcs_actor_service_enabled():
ray.util.register_actor(name, actor_handle)
ray.util.named_actors._register_actor(name, actor_handle)

return actor_handle

Expand Down Expand Up @@ -762,12 +762,6 @@ def __repr__(self):
self._ray_actor_creation_function_descriptor.class_name,
self._actor_id.hex())

def __ray_kill__(self):
"""Deprecated - use ray.kill() instead."""
logger.warning("actor.__ray_kill__() is deprecated and will be removed"
" in the near future. Use ray.kill(actor) instead.")
ray.kill(self)

@property
def _actor_id(self):
return self._ray_actor_id
Expand Down
3 changes: 1 addition & 2 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def init(cluster_name=None,
global master_actor
master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name)
try:
master_actor = ray.util.get_actor(master_actor_name)
master_actor = ray.get_actor(master_actor_name)
return
except ValueError:
pass
Expand All @@ -124,7 +124,6 @@ def init(cluster_name=None,
# in the future.
http_node_id = ray.state.current_node_id()
master_actor = ServeMaster.options(
detached=True,
name=master_actor_name,
max_restarts=-1,
).remote(cluster_name, start_server, http_node_id, http_host, http_port,
Expand Down
20 changes: 7 additions & 13 deletions python/ray/serve/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ def _get_or_start_router(self):
"""
router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name)
try:
self.router = ray.util.get_actor(router_name)
self.router = ray.get_actor(router_name)
except ValueError:
logger.info("Starting router with name '{}'".format(router_name))
self.router = async_retryable(ray.remote(Router)).options(
detached=True,
name=router_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
Expand All @@ -148,13 +147,12 @@ def _get_or_start_http_proxy(self, node_id, host, port):
"""
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name)
try:
self.http_proxy = ray.util.get_actor(proxy_name)
self.http_proxy = ray.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}'".format(
proxy_name, node_id))
self.http_proxy = async_retryable(HTTPProxyActor).options(
detached=True,
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
Expand All @@ -180,12 +178,11 @@ def _get_or_start_metric_exporter(self, metric_exporter_class):
metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME,
self.cluster_name)
try:
self.metric_exporter = ray.util.get_actor(metric_sink_name)
self.metric_exporter = ray.get_actor(metric_sink_name)
except ValueError:
logger.info("Starting metric exporter with name '{}'".format(
metric_sink_name))
self.metric_exporter = MetricExporterActor.options(
detached=True,
name=metric_sink_name).remote(metric_exporter_class)

def get_metric_exporter(self):
Expand Down Expand Up @@ -246,7 +243,7 @@ async def _recover_from_checkpoint(self, checkpoint_bytes):
for replica_tag in replica_tags:
replica_name = format_actor_name(replica_tag,
self.cluster_name)
self.workers[backend_tag][replica_tag] = ray.util.get_actor(
self.workers[backend_tag][replica_tag] = ray.get_actor(
replica_name)

# Push configuration state to the router.
Expand Down Expand Up @@ -311,7 +308,6 @@ async def _start_backend_worker(self, backend_tag, replica_tag):

replica_name = format_actor_name(replica_tag, self.cluster_name)
worker_handle = async_retryable(ray.remote(backend_worker)).options(
detached=True,
name=replica_name,
max_restarts=-1,
**replica_config.ray_actor_options).remote(
Expand All @@ -328,7 +324,7 @@ async def _start_replica(self, backend_tag, replica_tag):
# failed after creating them but before writing a
# checkpoint.
try:
worker_handle = ray.util.get_actor(replica_tag)
worker_handle = ray.get_actor(replica_tag)
except ValueError:
worker_handle = await self._start_backend_worker(
backend_tag, replica_tag)
Expand Down Expand Up @@ -371,7 +367,7 @@ async def _stop_pending_replicas(self):
# NOTE(edoakes): the replicas may already be stopped if we
# failed after stopping them but before writing a checkpoint.
try:
replica = ray.util.get_actor(replica_tag)
replica = ray.get_actor(replica_tag)
except ValueError:
continue

Expand All @@ -384,9 +380,7 @@ async def _stop_pending_replicas(self):
# use replica.__ray_terminate__, we may send it while the
# replica is being restarted and there's no way to tell if it
# successfully killed the worker or not.
worker = ray.worker.global_worker
# Kill the actor with no_restart=True.
worker.core_worker.kill_actor(replica._ray_actor_id, True)
ray.kill(replica, no_restart=True)

self.replicas_to_stop.clear()

Expand Down
18 changes: 9 additions & 9 deletions python/ray/serve/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def function():
response = request_with_retries("/master_failure", timeout=30)
assert response.text == "hello1"

ray.kill(serve.api._get_master_actor())
ray.kill(serve.api._get_master_actor(), no_restart=False)

for _ in range(10):
response = request_with_retries("/master_failure", timeout=30)
Expand All @@ -44,7 +44,7 @@ def function():
def function():
return "hello2"

ray.kill(serve.api._get_master_actor())
ray.kill(serve.api._get_master_actor(), no_restart=False)

serve.create_backend("master_failure:v2", function)
serve.set_traffic("master_failure", {"master_failure:v2": 1.0})
Expand All @@ -56,11 +56,11 @@ def function():
def function():
return "hello3"

ray.kill(serve.api._get_master_actor())
ray.kill(serve.api._get_master_actor(), no_restart=False)
serve.create_endpoint("master_failure_2", "/master_failure_2")
ray.kill(serve.api._get_master_actor())
ray.kill(serve.api._get_master_actor(), no_restart=False)
serve.create_backend("master_failure_2", function)
ray.kill(serve.api._get_master_actor())
ray.kill(serve.api._get_master_actor(), no_restart=False)
serve.set_traffic("master_failure_2", {"master_failure_2": 1.0})

for _ in range(10):
Expand All @@ -73,7 +73,7 @@ def function():
def _kill_http_proxy():
[http_proxy] = ray.get(
serve.api._get_master_actor().get_http_proxy.remote())
ray.kill(http_proxy)
ray.kill(http_proxy, no_restart=False)


def test_http_proxy_failure(serve_instance):
Expand Down Expand Up @@ -107,7 +107,7 @@ def function():

def _kill_router():
[router] = ray.get(serve.api._get_master_actor().get_router.remote())
ray.kill(router)
ray.kill(router, no_restart=False)


def test_router_failure(serve_instance):
Expand Down Expand Up @@ -169,7 +169,7 @@ def __call__(self):
# Kill the worker.
handles = _get_worker_handles("worker_failure:v1")
assert len(handles) == 1
ray.kill(handles[0])
ray.kill(handles[0], no_restart=False)

# Wait until the worker is killed and a one is started.
start = time.time()
Expand Down Expand Up @@ -227,7 +227,7 @@ def __call__(self):
# Kill one of the replicas.
handles = _get_worker_handles("replica_failure")
assert len(handles) == 2
ray.kill(handles[0])
ray.kill(handles[0], no_restart=False)

# Check that the other replica still serves requests.
for _ in range(10):
Expand Down
Loading