Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 24 additions & 275 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,19 @@
import copy
import hashlib
import inspect
import json
import traceback

import ray.cloudpickle as pickle
from ray.function_manager import FunctionActorManager
import ray.local_scheduler
import ray.ray_constants as ray_constants
import ray.signature as signature
import ray.worker
from ray.utils import (
decode,
_random_string,
check_oversized_pickle,
is_cython,
push_error_to_driver,
)
from ray.utils import _random_string

DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1


def is_classmethod(f):
"""Returns whether the given method is a classmethod."""

return hasattr(f, "__self__") and f.__self__ is not None


def compute_actor_handle_id(actor_handle_id, num_forks):
"""Deterministically compute an actor handle ID.

Expand Down Expand Up @@ -96,24 +84,6 @@ def compute_actor_creation_function_id(class_id):
return ray.ObjectID(class_id)


def compute_actor_method_function_id(class_name, attr):
"""Get the function ID corresponding to an actor method.

Args:
class_name (str): The class name of the actor.
attr (str): The attribute name of the method.

Returns:
Function ID corresponding to the method.
"""
function_id_hash = hashlib.sha1()
function_id_hash.update(class_name.encode("ascii"))
function_id_hash.update(attr.encode("ascii"))
function_id = function_id_hash.digest()
assert len(function_id) == ray_constants.ID_SIZE
return ray.ObjectID(function_id)


def set_actor_checkpoint(worker, actor_id, checkpoint_index, checkpoint,
frontier):
"""Set the most recent checkpoint associated with a given actor ID.
Expand All @@ -134,28 +104,6 @@ def set_actor_checkpoint(worker, actor_id, checkpoint_index, checkpoint,
})


def get_actor_checkpoint(worker, actor_id):
"""Get the most recent checkpoint associated with a given actor ID.

Args:
worker: The worker to use to get the checkpoint.
actor_id: The actor ID of the actor to get the checkpoint for.

Returns:
If a checkpoint exists, this returns a tuple of the number of tasks
included in the checkpoint, the saved checkpoint state, and the
task frontier at the time of the checkpoint. If no checkpoint
exists, all objects are set to None. The checkpoint index is the .
executed on the actor before the checkpoint was made.
"""
actor_key = b"Actor:" + actor_id
checkpoint_index, checkpoint, frontier = worker.redis_client.hmget(
actor_key, ["checkpoint_index", "checkpoint", "frontier"])
if checkpoint_index is not None:
checkpoint_index = int(checkpoint_index)
return checkpoint_index, checkpoint, frontier


def save_and_log_checkpoint(worker, actor):
"""Save a checkpoint on the actor and log any errors.

Expand Down Expand Up @@ -205,219 +153,26 @@ def restore_and_log_checkpoint(worker, actor):
return checkpoint_resumed


def make_actor_method_executor(worker, method_name, method, actor_imported):
"""Make an executor that wraps a user-defined actor method.

The wrapped method updates the worker's internal state and performs any
necessary checkpointing operations.
def get_actor_checkpoint(worker, actor_id):
"""Get the most recent checkpoint associated with a given actor ID.

Args:
worker (Worker): The worker that is executing the actor.
method_name (str): The name of the actor method.
method (instancemethod): The actor method to wrap. This should be a
method defined on the actor class and should therefore take an
instance of the actor as the first argument.
actor_imported (bool): Whether the actor has been imported.
Checkpointing operations will not be run if this is set to False.
worker: The worker to use to get the checkpoint.
actor_id: The actor ID of the actor to get the checkpoint for.

Returns:
A function that executes the given actor method on the worker's stored
instance of the actor. The function also updates the worker's
internal state to record the executed method.
"""

def actor_method_executor(dummy_return_id, actor, *args):
# Update the actor's task counter to reflect the task we're about to
# execute.
worker.actor_task_counter += 1

# If this is the first task to execute on the actor, try to resume from
# a checkpoint.
if actor_imported and worker.actor_task_counter == 1:
checkpoint_resumed = restore_and_log_checkpoint(worker, actor)
if checkpoint_resumed:
# NOTE(swang): Since we did not actually execute the __init__
# method, this will put None as the return value. If the
# __init__ method is supposed to return multiple values, an
# exception will be logged.
return

# Determine whether we should checkpoint the actor.
checkpointing_on = (actor_imported
and worker.actor_checkpoint_interval > 0)
# We should checkpoint the actor if user checkpointing is on, we've
# executed checkpoint_interval tasks since the last checkpoint, and the
# method we're about to execute is not a checkpoint.
save_checkpoint = (
checkpointing_on and
(worker.actor_task_counter % worker.actor_checkpoint_interval == 0
and method_name != "__ray_checkpoint__"))

# Execute the assigned method and save a checkpoint if necessary.
try:
if is_classmethod(method):
method_returns = method(*args)
else:
method_returns = method(actor, *args)
except Exception:
# Save the checkpoint before allowing the method exception to be
# thrown.
if save_checkpoint:
save_and_log_checkpoint(worker, actor)
raise
else:
# Save the checkpoint before returning the method's return values.
if save_checkpoint:
save_and_log_checkpoint(worker, actor)
return method_returns

return actor_method_executor


def fetch_and_register_actor(actor_class_key, worker):
"""Import an actor.

This will be called by the worker's import thread when the worker receives
the actor_class export, assuming that the worker is an actor for that
class.

Args:
actor_class_key: The key in Redis to use to fetch the actor.
worker: The worker to use.
"""
actor_id_str = worker.actor_id
(driver_id, class_id, class_name, module, pickled_class,
checkpoint_interval, actor_method_names) = worker.redis_client.hmget(
actor_class_key, [
"driver_id", "class_id", "class_name", "module", "class",
"checkpoint_interval", "actor_method_names"
])

class_name = decode(class_name)
module = decode(module)
checkpoint_interval = int(checkpoint_interval)
actor_method_names = json.loads(decode(actor_method_names))

# Create a temporary actor with some temporary methods so that if the actor
# fails to be unpickled, the temporary actor can be used (just to produce
# error messages and to prevent the driver from hanging).
class TemporaryActor(object):
pass

worker.actors[actor_id_str] = TemporaryActor()
worker.actor_checkpoint_interval = checkpoint_interval

def temporary_actor_method(*xs):
raise Exception("The actor with name {} failed to be imported, and so "
"cannot execute this method".format(class_name))

# Register the actor method executors.
for actor_method_name in actor_method_names:
function_id = compute_actor_method_function_id(class_name,
actor_method_name).id()
temporary_executor = make_actor_method_executor(
worker,
actor_method_name,
temporary_actor_method,
actor_imported=False)
worker.function_execution_info[driver_id][function_id] = (
ray.worker.FunctionExecutionInfo(
function=temporary_executor,
function_name=actor_method_name,
max_calls=0))
worker.num_task_executions[driver_id][function_id] = 0

try:
unpickled_class = pickle.loads(pickled_class)
worker.actor_class = unpickled_class
except Exception:
# If an exception was thrown when the actor was imported, we record the
# traceback and notify the scheduler of the failure.
traceback_str = ray.utils.format_error_message(traceback.format_exc())
# Log the error message.
push_error_to_driver(
worker,
ray_constants.REGISTER_ACTOR_PUSH_ERROR,
traceback_str,
driver_id,
data={"actor_id": actor_id_str})
# TODO(rkn): In the future, it might make sense to have the worker exit
# here. However, currently that would lead to hanging if someone calls
# ray.get on a method invoked on the actor.
else:
# TODO(pcm): Why is the below line necessary?
unpickled_class.__module__ = module
worker.actors[actor_id_str] = unpickled_class.__new__(unpickled_class)

def pred(x):
return (inspect.isfunction(x) or inspect.ismethod(x)
or is_cython(x))

actor_methods = inspect.getmembers(unpickled_class, predicate=pred)
for actor_method_name, actor_method in actor_methods:
function_id = compute_actor_method_function_id(
class_name, actor_method_name).id()
executor = make_actor_method_executor(
worker, actor_method_name, actor_method, actor_imported=True)
worker.function_execution_info[driver_id][function_id] = (
ray.worker.FunctionExecutionInfo(
function=executor,
function_name=actor_method_name,
max_calls=0))
# We do not set worker.function_properties[driver_id][function_id]
# because we currently do need the actor worker to submit new tasks
# for the actor.


def publish_actor_class_to_key(key, actor_class_info, worker):
"""Push an actor class definition to Redis.

The is factored out as a separate function because it is also called
on cached actor class definitions when a worker connects for the first
time.

Args:
key: The key to store the actor class info at.
actor_class_info: Information about the actor class.
worker: The worker to use to connect to Redis.
If a checkpoint exists, this returns a tuple of the number of tasks
included in the checkpoint, the saved checkpoint state, and the
task frontier at the time of the checkpoint. If no checkpoint
exists, all objects are set to None. The checkpoint index is the .
executed on the actor before the checkpoint was made.
"""
# We set the driver ID here because it may not have been available when the
# actor class was defined.
actor_class_info["driver_id"] = worker.task_driver_id.id()
worker.redis_client.hmset(key, actor_class_info)
worker.redis_client.rpush("Exports", key)


def export_actor_class(class_id, Class, actor_method_names,
checkpoint_interval, worker):
key = b"ActorClass:" + class_id
actor_class_info = {
"class_name": Class.__name__,
"module": Class.__module__,
"class": pickle.dumps(Class),
"checkpoint_interval": checkpoint_interval,
"actor_method_names": json.dumps(list(actor_method_names))
}

check_oversized_pickle(actor_class_info["class"],
actor_class_info["class_name"], "actor", worker)

if worker.mode is None:
# This means that 'ray.init()' has not been called yet and so we must
# cache the actor class definition and export it when 'ray.init()' is
# called.
assert worker.cached_remote_functions_and_actors is not None
worker.cached_remote_functions_and_actors.append(
("actor", (key, actor_class_info)))
# This caching code path is currently not used because we only export
# actor class definitions lazily when we instantiate the actor for the
# first time.
assert False, "This should be unreachable."
else:
publish_actor_class_to_key(key, actor_class_info, worker)
# TODO(rkn): Currently we allow actor classes to be defined within tasks.
# I tried to disable this, but it may be necessary because of
# https://github.com/ray-project/ray/issues/1146.
actor_key = b"Actor:" + actor_id
checkpoint_index, checkpoint, frontier = worker.redis_client.hmget(
actor_key, ["checkpoint_index", "checkpoint", "frontier"])
if checkpoint_index is not None:
checkpoint_index = int(checkpoint_index)
return checkpoint_index, checkpoint, frontier


def method(*args, **kwargs):
Expand Down Expand Up @@ -518,13 +273,8 @@ def __init__(self, modified_class, class_id, checkpoint_interval, num_cpus,
self._actor_method_cpus = actor_method_cpus
self._exported = False

# Get the actor methods of the given class.
def pred(x):
return (inspect.isfunction(x) or inspect.ismethod(x)
or is_cython(x))

self._actor_methods = inspect.getmembers(
self._modified_class, predicate=pred)
self._modified_class, ray.utils.is_function_or_method)
# Extract the signatures of each of the methods. This will be used
# to catch some errors if the methods are called with inappropriate
# arguments.
Expand All @@ -537,7 +287,7 @@ def pred(x):
# don't support, there may not be much the user can do about it.
signature.check_signature_supported(method, warn=True)
self._method_signatures[method_name] = signature.extract_signature(
method, ignore_first=not is_classmethod(method))
method, ignore_first=not ray.utils.is_class_method(method))

# Set the default number of return values for this method.
if hasattr(method, "__ray_num_return_vals__"):
Expand Down Expand Up @@ -614,9 +364,9 @@ def _submit(self,
else:
# Export the actor.
if not self._exported:
export_actor_class(self._class_id, self._modified_class,
self._actor_method_names,
self._checkpoint_interval, worker)
worker.function_actor_manager.export_actor_class(
self._class_id, self._modified_class,
self._actor_method_names, self._checkpoint_interval)
self._exported = True

resources = ray.utils.resources_from_resource_arguments(
Expand Down Expand Up @@ -801,8 +551,8 @@ def _actor_method_call(self,
else:
actor_handle_id = self._ray_actor_handle_id

function_id = compute_actor_method_function_id(self._ray_class_name,
method_name)
function_id = FunctionActorManager.compute_actor_method_function_id(
self._ray_class_name, method_name)
object_ids = worker.submit_task(
function_id,
args,
Expand Down Expand Up @@ -1068,5 +818,4 @@ def __ray_checkpoint_restore__(self):
resources, actor_method_cpus)


ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor
ray.worker.global_worker.make_actor = make_actor
Loading