Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype distributed actor handles #1137

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Deterministically compute actor handle ID
  • Loading branch information
stephanie-wang committed Oct 19, 2017
commit 39555f494c90b9da10a6474c1192edce8ec9793e
56 changes: 34 additions & 22 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,26 @@ def random_actor_class_id():
return random_string()


def random_actor_handle_id():
return ray.local_scheduler.ObjectID(random_string())
def compute_actor_handle_id(actor_handle_id, num_forks):
"""Deterministically comopute an actor handle ID.

A new actor handle ID is generated when it is forked from another actor
handle. The new handle ID is computed as hash(old_handle_id || num_forks).

Args:
actor_handle_id (common.ObjectID): The original actor handle ID.
num_forks: The number of times the original actor handle has been
forked so far.

Returns:
An object ID for the new actor handle.
"""
handle_id_hash = hashlib.sha1()
handle_id_hash.update(actor_handle_id.id())
handle_id_hash.update(str(num_forks))
handle_id = handle_id_hash.digest()
assert len(handle_id) == 20
return ray.local_scheduler.ObjectID(handle_id)


def get_actor_method_function_id(actor_id, attr):
Expand Down Expand Up @@ -310,11 +328,11 @@ def export_actor_class(class_id, Class, actor_method_names,
checkpoint_interval, worker):
key = b"ActorClass:" + class_id
actor_class_info = {
"class_name": Class.__name__,
"module": Class.__module__,
"class": pickle.dumps(Class),
"checkpoint_interval": checkpoint_interval,
"actor_method_names": json.dumps(list(actor_method_names))}
"class_name": Class.__name__,
"module": Class.__module__,
"class": pickle.dumps(Class),
"checkpoint_interval": checkpoint_interval,
"actor_method_names": json.dumps(list(actor_method_names))}

if worker.mode is None:
# This means that 'ray.init()' has not been called yet and so we must
Expand Down Expand Up @@ -436,15 +454,18 @@ def wrap_actor_handle(actor_handle):
if actor_handle._ray_checkpoint_interval > 0:
raise Exception("Checkpointing not yet supported for distributed "
"actor handles.")
return ActorHandleWrapper(
wrapper = ActorHandleWrapper(
actor_handle._ray_actor_id,
random_actor_handle_id(), # Generate a new handle ID.
compute_actor_handle_id(actor_handle._ray_actor_handle_id,
actor_handle._ray_actor_forks),
actor_handle._ray_actor_cursor,
0, # Reset the actor counter.
actor_handle._ray_actor_method_names,
actor_handle._ray_method_signatures,
actor_handle._ray_checkpoint_interval,
actor_handle._ray_class_name)
actor_handle._ray_actor_forks += 1
return wrapper


def unwrap_actor_handle(worker, wrapper):
Expand Down Expand Up @@ -506,6 +527,7 @@ def _manual_init(self, actor_id, actor_handle_id, actor_cursor,
self._ray_method_signatures = method_signatures
self._ray_checkpoint_interval = checkpoint_interval
self._ray_class_name = class_name
self._ray_actor_forks = 0

def _actor_method_call(self, method_name, args=None, kwargs=None,
dependency=None):
Expand Down Expand Up @@ -583,15 +605,8 @@ def __dir__(self):
return self._ray_actor_method_names

def __getattribute__(self, attr):
# The following is needed so we can still access actor methods.
if attr in ["_manual_init", "_ray_actor_id",
"_ray_actor_handle_id", "_ray_actor_counter",
"_ray_actor_cursor", "_ray_actor_method_names",
"_actor_method_invokers", "_ray_method_signatures",
"_actor_method_call", "_ray_checkpoint_interval",
"_ray_class_name"]:
return object.__getattribute__(self, attr)
if attr in self._ray_actor_method_names:
if (hasattr(self, "_ray_actor_method_names") and attr in
object.__getattribute__(self, "_ray_actor_method_names")):
# We create the ActorMethod on the fly here so that the
# ActorHandle doesn't need a reference to the ActorMethod. The
# ActorMethod has a reference to the ActorHandle and this was
Expand All @@ -603,9 +618,7 @@ def __getattribute__(self, attr):
actor_method_cls = ActorMethod
return actor_method_cls(self, attr)
else:
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'"
.format(class_name, attr))
return object.__getattribute__(self, attr)

def __repr__(self):
return "Actor(" + self._ray_actor_id.hex() + ")"
Expand Down Expand Up @@ -684,7 +697,6 @@ def remote(cls, *args, **kwargs):
actor_method_names, num_cpus,
num_gpus, ray.worker.global_worker)


# Instantiate the actor handle.
actor_object = cls.__new__(cls)
actor_object._manual_init(actor_id, actor_handle_id, actor_cursor,
Expand Down