Skip to content

Commit

Permalink
Prototype distributed actor handles (#1137)
Browse files Browse the repository at this point in the history
* Add actor handle ID to the task spec

* Local scheduler dispatches actor tasks according to a task counter per handle

* Fix python test

* Allow passing actor handles into tasks. Not completely working yet. Also this is very messy.

* Fixes, should be roughly working now.

* Refactor actor handle wrapper

* Fix __init__ tests

* Terminate actor when the original handle goes out of scope

* TODO and a couple test cases

* Make tests for unsupported cases

* Fix Python mode tests

* Linting.

* Cache actor definitions that occur before ray.init() is called.

* Fix export actor class

* Deterministically compute actor handle ID

* Fix __getattribute__

* Fix string encoding for python3

* doc

* Add comment and assertion.
  • Loading branch information
stephanie-wang authored and robertnishihara committed Oct 20, 2017
1 parent 2f45ac9 commit af47737
Show file tree
Hide file tree
Showing 11 changed files with 797 additions and 404 deletions.
693 changes: 447 additions & 246 deletions python/ray/actor.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions python/ray/global_scheduler/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def test_task_default_resources(self):
task2 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(),
0, local_scheduler.ObjectID(NIL_ACTOR_ID),
local_scheduler.ObjectID(NIL_ACTOR_ID),
0, 0, [1.0, 2.0, 0.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0])

Expand Down
73 changes: 47 additions & 26 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,12 @@ class Worker(object):
connected (bool): True if Ray has been started and False otherwise.
mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE,
SILENT_MODE, and WORKER_MODE.
cached_remote_functions (List[Tuple[str, str]]): A list of pairs
representing the remote functions that were defined before the
worker called connect. The first element is the name of the remote
function, and the second element is the serialized remote function.
When the worker eventually does call connect, if it is a driver, it
will export these functions to the scheduler. If
cached_remote_functions is None, that means that connect has been
called already.
cached_remote_functions_and_actors: A list of information for exporting
remote functions and actor classes definitions that were defined
before the worker called connect. When the worker eventually does
call connect, if it is a driver, it will export these functions and
actors. If cached_remote_functions_and_actors is None, that means
that connect has been called already.
cached_functions_to_run (List): A list of functions to run on all of
the workers that should be exported as soon as connect is called.
"""
Expand Down Expand Up @@ -221,7 +219,7 @@ def __init__(self):
self.num_task_executions = collections.defaultdict(lambda: {})
self.connected = False
self.mode = None
self.cached_remote_functions = []
self.cached_remote_functions_and_actors = []
self.cached_functions_to_run = []
self.fetch_and_register_actor = None
self.make_actor = None
Expand Down Expand Up @@ -454,7 +452,8 @@ def get_object(self, object_ids):
assert len(final_results) == len(object_ids)
return final_results

def submit_task(self, function_id, args, actor_id=None, actor_counter=0,
def submit_task(self, function_id, args, actor_id=None,
actor_handle_id=None, actor_counter=0,
is_actor_checkpoint_method=False):
"""Submit a remote task to the scheduler.
Expand All @@ -474,14 +473,21 @@ def submit_task(self, function_id, args, actor_id=None, actor_counter=0,
"""
with log_span("ray:submit_task", worker=self):
check_main_thread()
actor_id = (ray.local_scheduler.ObjectID(NIL_ACTOR_ID)
if actor_id is None else actor_id)
if actor_id is None:
assert actor_handle_id is None
actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID)
actor_handle_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID)
else:
assert actor_handle_id is not None
# Put large or complex arguments that are passed by value in the
# object store first.
args_for_local_scheduler = []
for arg in args:
if isinstance(arg, ray.local_scheduler.ObjectID):
args_for_local_scheduler.append(arg)
elif isinstance(arg, ray.actor.ActorHandleParent):
args_for_local_scheduler.append(put(
ray.actor.wrap_actor_handle(arg)))
elif ray.local_scheduler.check_simple_value(arg):
args_for_local_scheduler.append(arg)
else:
Expand All @@ -500,6 +506,7 @@ def submit_task(self, function_id, args, actor_id=None, actor_counter=0,
self.current_task_id,
self.task_index,
actor_id,
actor_handle_id,
actor_counter,
is_actor_checkpoint_method,
[function_properties.num_cpus, function_properties.num_gpus,
Expand Down Expand Up @@ -655,6 +662,8 @@ def _get_arguments_for_execution(self, function_name, serialized_args):
# created this object failed, and we should propagate the
# error message here.
raise RayGetArgumentError(function_name, i, arg, argument)
elif isinstance(argument, ray.actor.ActorHandleWrapper):
argument = ray.actor.unwrap_actor_handle(self, argument)
else:
# pass the argument by value
argument = arg
Expand Down Expand Up @@ -1098,6 +1107,8 @@ def _deserialize_callback_pandas(data):
_register_class(type(lambda: 0), use_pickle=True)
# Tell Ray to serialize types with pickle.
_register_class(type(int), use_pickle=True)
# Ray can serialize actor handles that have been wrapped.
_register_class(ray.actor.ActorHandleWrapper)


def get_address_info_from_redis_helper(redis_address, node_ip_address):
Expand Down Expand Up @@ -1704,7 +1715,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
error_message = "Perhaps you called ray.init twice by accident?"
assert not worker.connected, error_message
assert worker.cached_functions_to_run is not None, error_message
assert worker.cached_remote_functions is not None, error_message
assert worker.cached_remote_functions_and_actors is not None, error_message
# Initialize some fields.
worker.worker_id = random_string()
worker.actor_id = actor_id
Expand Down Expand Up @@ -1840,6 +1851,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.current_task_id,
worker.task_index,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
nil_actor_counter,
False,
[0, 0, 0])
Expand Down Expand Up @@ -1913,23 +1925,32 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
for function in worker.cached_functions_to_run:
worker.run_function_on_all_workers(function)
# Export cached remote functions to the workers.
for info in worker.cached_remote_functions:
(function_id, func_name, func,
func_invoker, function_properties) = info
export_remote_function(function_id, func_name, func, func_invoker,
function_properties, worker)
for cached_type, info in worker.cached_remote_functions_and_actors:
if cached_type == "remote_function":
(function_id, func_name, func,
func_invoker, function_properties) = info
export_remote_function(function_id, func_name, func,
func_invoker, function_properties,
worker)
elif cached_type == "actor":
(key, actor_class_info) = info
ray.actor.publish_actor_class_to_key(key, actor_class_info,
worker)
else:
assert False, "This code should be unreachable."
worker.cached_functions_to_run = None
worker.cached_remote_functions = None
worker.cached_remote_functions_and_actors = None


def disconnect(worker=global_worker):
"""Disconnect this worker from the scheduler and object store."""
# Reset the list of cached remote functions so that if more remote
# functions are defined and then connect is called again, the remote
# functions will be exported. This is mostly relevant for the tests.
# Reset the list of cached remote functions and actors so that if more
# remote functions or actors are defined and then connect is called again,
# the remote functions will be exported. This is mostly relevant for the
# tests.
worker.connected = False
worker.cached_functions_to_run = []
worker.cached_remote_functions = []
worker.cached_remote_functions_and_actors = []
worker.serialization_context = pyarrow.SerializationContext()


Expand Down Expand Up @@ -2381,9 +2402,9 @@ def func_invoker(*args, **kwargs):
export_remote_function(function_id, func_name, func,
func_invoker, function_properties)
elif worker.mode is None:
worker.cached_remote_functions.append((function_id, func_name,
func, func_invoker,
function_properties))
worker.cached_remote_functions_and_actors.append(
("remote_function", (function_id, func_name, func,
func_invoker, function_properties)))
return func_invoker

return remote_decorator
Expand Down
3 changes: 3 additions & 0 deletions src/common/format/common.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ table TaskInfo {
// Actor ID of the task. This is the actor that this task is executed on
// or NIL_ACTOR_ID if the task is just a normal task.
actor_id: string;
// The ID of the handle that was used to submit the task. This should be
// unique across handles with the same actor_id.
actor_handle_id: string;
// Number of tasks that have been submitted to this actor so far.
actor_counter: int;
// True if this task is an actor checkpoint task and false otherwise.
Expand Down
16 changes: 10 additions & 6 deletions src/common/lib/python/common_extension.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
UniqueID driver_id;
/* ID of the actor this task should run on. */
UniqueID actor_id = NIL_ACTOR_ID;
/* ID of the actor handle used to submit this task. */
UniqueID actor_handle_id = NIL_ACTOR_ID;
/* How many tasks have been launched on the actor so far? */
int actor_counter = 0;
/* True if this is an actor checkpoint task and false otherwise. */
Expand All @@ -287,12 +289,13 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
int parent_counter;
/* Resource vector of the required resources to execute this task. */
PyObject *resource_vector = NULL;
if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&iOO", &PyObjectToUniqueID,
if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&iOO", &PyObjectToUniqueID,
&driver_id, &PyObjectToUniqueID, &function_id,
&arguments, &num_returns, &PyObjectToUniqueID,
&parent_task_id, &parent_counter, &PyObjectToUniqueID,
&actor_id, &actor_counter,
&is_actor_checkpoint_method_object, &resource_vector)) {
&actor_id, &PyObjectToUniqueID, &actor_handle_id,
&actor_counter, &is_actor_checkpoint_method_object,
&resource_vector)) {
return -1;
}

Expand All @@ -304,9 +307,10 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {

Py_ssize_t size = PyList_Size(arguments);
/* Construct the task specification. */
TaskSpec_start_construct(
g_task_builder, driver_id, parent_task_id, parent_counter, actor_id,
actor_counter, is_actor_checkpoint_method, function_id, num_returns);
TaskSpec_start_construct(g_task_builder, driver_id, parent_task_id,
parent_counter, actor_id, actor_handle_id,
actor_counter, is_actor_checkpoint_method,
function_id, num_returns);
/* Add the task arguments. */
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
Expand Down
17 changes: 14 additions & 3 deletions src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TaskBuilder {
TaskID parent_task_id,
int64_t parent_counter,
ActorID actor_id,
ActorID actor_handle_id,
int64_t actor_counter,
bool is_actor_checkpoint_method,
FunctionID function_id,
Expand All @@ -46,6 +47,7 @@ class TaskBuilder {
parent_task_id_ = parent_task_id;
parent_counter_ = parent_counter;
actor_id_ = actor_id;
actor_handle_id_ = actor_handle_id;
actor_counter_ = actor_counter;
is_actor_checkpoint_method_ = is_actor_checkpoint_method;
function_id_ = function_id;
Expand Down Expand Up @@ -107,7 +109,8 @@ class TaskBuilder {
auto message = CreateTaskInfo(
fbb, to_flatbuf(fbb, driver_id_), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, parent_task_id_), parent_counter_,
to_flatbuf(fbb, actor_id_), actor_counter_, is_actor_checkpoint_method_,
to_flatbuf(fbb, actor_id_), to_flatbuf(fbb, actor_handle_id_),
actor_counter_, is_actor_checkpoint_method_,
to_flatbuf(fbb, function_id_), arguments, fbb.CreateVector(returns),
fbb.CreateVector(resource_vector_));
/* Finish the TaskInfo. */
Expand All @@ -130,6 +133,7 @@ class TaskBuilder {
TaskID parent_task_id_;
int64_t parent_counter_;
ActorID actor_id_;
ActorID actor_handle_id_;
int64_t actor_counter_;
bool is_actor_checkpoint_method_;
FunctionID function_id_;
Expand Down Expand Up @@ -172,13 +176,14 @@ void TaskSpec_start_construct(TaskBuilder *builder,
TaskID parent_task_id,
int64_t parent_counter,
ActorID actor_id,
ActorID actor_handle_id,
int64_t actor_counter,
bool is_actor_checkpoint_method,
FunctionID function_id,
int64_t num_returns) {
builder->Start(driver_id, parent_task_id, parent_counter, actor_id,
actor_counter, is_actor_checkpoint_method, function_id,
num_returns);
actor_handle_id, actor_counter, is_actor_checkpoint_method,
function_id, num_returns);
}

uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) {
Expand Down Expand Up @@ -221,6 +226,12 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) {
return from_flatbuf(message->actor_id());
}

ActorID TaskSpec_actor_handle_id(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return from_flatbuf(message->actor_handle_id());
}

bool TaskSpec_is_actor_task(TaskSpec *spec) {
return !ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID);
}
Expand Down
12 changes: 12 additions & 0 deletions src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ void free_task_builder(TaskBuilder *builder);
* the parent task prior to this one.
* @param actor_id The ID of the actor that this task is for. If it is not an
* actor task, then this if NIL_ACTOR_ID.
* @param actor_handle_id The ID of the actor handle that this task was
* submitted through. If it is not an actor task, or if this is the
* original handle, then this is NIL_ACTOR_ID.
* @param actor_counter A counter indicating how many tasks have been submitted
* to the same actor before this one.
* @param is_actor_checkpoint_method True if this is an actor checkpoint method
Expand All @@ -102,6 +105,7 @@ void TaskSpec_start_construct(TaskBuilder *B,
TaskID parent_task_id,
int64_t parent_counter,
UniqueID actor_id,
UniqueID actor_handle_id,
int64_t actor_counter,
bool is_actor_checkpoint_method,
FunctionID function_id,
Expand Down Expand Up @@ -133,6 +137,14 @@ FunctionID TaskSpec_function(TaskSpec *spec);
*/
UniqueID TaskSpec_actor_id(TaskSpec *spec);

/**
* Return the actor handle ID of the task.
*
* @param spec The task_spec in question.
* @return The ID of the actor handle that the task was submitted through.
*/
UniqueID TaskSpec_actor_handle_id(TaskSpec *spec);

/**
* Return whether this task is for an actor.
*
Expand Down
3 changes: 2 additions & 1 deletion src/common/test/example_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ static inline TaskSpec *example_task_spec_with_args(int64_t num_args,
TaskID parent_task_id = globally_unique_id();
FunctionID func_id = globally_unique_id();
TaskSpec_start_construct(g_task_builder, NIL_ID, parent_task_id, 0,
NIL_ACTOR_ID, 0, false, func_id, num_returns);
NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id,
num_returns);
for (int64_t i = 0; i < num_args; ++i) {
ObjectID arg_id;
if (arg_ids == NULL) {
Expand Down
Loading

0 comments on commit af47737

Please sign in to comment.