Skip to content

Convert actor dummy objects to task execution edges. #1281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ def make_actor_method_executor(worker, method_name, method):

def actor_method_executor(dummy_return_id, task_counter, actor,
*args):
# An actor task's dependency on the previous task is represented by
# a dummy argument. Remove this argument before invocation.
args = args[:-1]
if method_name == "__ray_checkpoint__":
# Execute the checkpoint task.
actor_checkpoint_failed, error = method(actor, *args)
Expand Down Expand Up @@ -582,9 +579,11 @@ def _actor_method_call(self, method_name, args=None, kwargs=None,
ray.worker.global_worker.actors[self._ray_actor_id],
method_name)(*copy.deepcopy(args))

# Add the dummy argument that represents dependency on a preceding
# task.
args.append(dependency)
# Add the execution dependency.
if dependency is None:
execution_dependencies = []
else:
execution_dependencies = [dependency]

is_actor_checkpoint_method = (method_name == "__ray_checkpoint__")

Expand All @@ -594,7 +593,8 @@ def _actor_method_call(self, method_name, args=None, kwargs=None,
function_id, args, actor_id=self._ray_actor_id,
actor_handle_id=self._ray_actor_handle_id,
actor_counter=self._ray_actor_counter,
is_actor_checkpoint_method=is_actor_checkpoint_method)
is_actor_checkpoint_method=is_actor_checkpoint_method,
execution_dependencies=execution_dependencies)
# Update the actor counter and cursor to reflect the most recent
# invocation.
self._ray_actor_counter += 1
Expand Down
21 changes: 12 additions & 9 deletions python/ray/common/redis_module/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def testInvalidTaskTableAdd(self):
with self.assertRaises(redis.ResponseError):
# Should not be able to update a non-existent task.
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", 10,
"node_id")
"node_id", b"")

def testTaskTableAddAndLookup(self):
TASK_STATUS_WAITING = 1
Expand All @@ -321,7 +321,8 @@ def testTaskTableAddAndLookup(self):
p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))

def check_task_reply(message, task_args, updated=False):
task_status, local_scheduler_id, task_spec = task_args
(task_status, local_scheduler_id, execution_dependencies_string,
task_spec) = task_args
task_reply_object = TaskReply.GetRootAsTaskReply(message, 0)
self.assertEqual(task_reply_object.State(), task_status)
self.assertEqual(task_reply_object.LocalSchedulerId(),
Expand All @@ -330,15 +331,15 @@ def check_task_reply(message, task_args, updated=False):
self.assertEqual(task_reply_object.Updated(), updated)

# Check that task table adds, updates, and lookups work correctly.
task_args = [TASK_STATUS_WAITING, b"node_id", b"task_spec"]
task_args = [TASK_STATUS_WAITING, b"node_id", b"", b"task_spec"]
response = self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id",
*task_args)
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
check_task_reply(response, task_args)

task_args[0] = TASK_STATUS_SCHEDULED
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id",
*task_args[:2])
*task_args[:3])
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
check_task_reply(response, task_args)

Expand Down Expand Up @@ -407,17 +408,19 @@ def check_task_reply(message, task_args, updated=False):

def check_task_subscription(self, p, scheduling_state, local_scheduler_id):
task_args = [b"task_id", scheduling_state,
local_scheduler_id.encode("ascii"), b"task_spec"]
local_scheduler_id.encode("ascii"), b"", b"task_spec"]
self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args)
# Receive the data.
message = get_next_message(p)["data"]
# Check that the notification object is correct.
notification_object = TaskReply.GetRootAsTaskReply(message, 0)
self.assertEqual(notification_object.TaskId(), b"task_id")
self.assertEqual(notification_object.State(), scheduling_state)
self.assertEqual(notification_object.TaskId(), task_args[0])
self.assertEqual(notification_object.State(), task_args[1])
self.assertEqual(notification_object.LocalSchedulerId(),
local_scheduler_id.encode("ascii"))
self.assertEqual(notification_object.TaskSpec(), b"task_spec")
task_args[2])
self.assertEqual(notification_object.ExecutionDependencies(),
task_args[3])
self.assertEqual(notification_object.TaskSpec(), task_args[4])

def testTaskTableSubscribe(self):
scheduling_state = 1
Expand Down
2 changes: 2 additions & 0 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ def _task_table(self, task_id):
return {"State": task_table_message.State(),
"LocalSchedulerID": binary_to_hex(
task_table_message.LocalSchedulerId()),
"ExecutionDependenciesString":
task_table_message.ExecutionDependencies(),
"TaskSpec": task_spec_info}

def task_table(self, task_id=None):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/global_scheduler/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def test_task_default_resources(self):
[random_object_id()], 0, random_task_id(),
0, local_scheduler.ObjectID(NIL_ACTOR_ID),
local_scheduler.ObjectID(NIL_ACTOR_ID),
0, 0, {"CPU": 1, "GPU": 2})
0, 0, [], {"CPU": 1, "GPU": 2})
self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2})

def test_redis_only_single_task(self):
Expand Down Expand Up @@ -268,7 +268,7 @@ def integration_many_tasks_helper(self, timesync=True):
self.local_scheduler_clients[0].submit(task)
# Check that there are the correct number of tasks in Redis and that
# they all get assigned to the local scheduler.
num_retries = 10
num_retries = 20
num_tasks_done = 0
while num_retries > 0:
task_entries = self.state.task_table()
Expand Down
3 changes: 2 additions & 1 deletion python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def cleanup_task_table(self):
ok = self.state._execute_command(
key, "RAY.TASK_TABLE_UPDATE",
hex_to_binary(task_id),
ray.experimental.state.TASK_STATUS_LOST, NIL_ID)
ray.experimental.state.TASK_STATUS_LOST, NIL_ID,
task["ExecutionDependenciesString"])
if ok != b"OK":
log.warn("Failed to update lost task for dead scheduler.")
num_tasks_updated += 1
Expand Down
10 changes: 9 additions & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ def get_object(self, object_ids):

def submit_task(self, function_id, args, actor_id=None,
actor_handle_id=None, actor_counter=0,
is_actor_checkpoint_method=False):
is_actor_checkpoint_method=False,
execution_dependencies=None):
"""Submit a remote task to the scheduler.

Tell the scheduler to schedule the execution of the function with ID
Expand Down Expand Up @@ -527,6 +528,10 @@ def submit_task(self, function_id, args, actor_id=None,
else:
args_for_local_scheduler.append(put(arg))

# By default, there are no execution dependencies.
if execution_dependencies is None:
execution_dependencies = []

# Look up the various function properties.
function_properties = self.function_properties[
self.task_driver_id.id()][function_id.id()]
Expand All @@ -543,6 +548,7 @@ def submit_task(self, function_id, args, actor_id=None,
actor_handle_id,
actor_counter,
is_actor_checkpoint_method,
execution_dependencies,
function_properties.resources)
# Increment the worker's task index to track how many tasks have
# been submitted by the current task so far.
Expand Down Expand Up @@ -1871,13 +1877,15 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
nil_actor_counter,
False,
[],
{"CPU": 0})
global_state._execute_command(
driver_task.task_id(),
"RAY.TASK_TABLE_ADD",
driver_task.task_id().id(),
TASK_STATUS_RUNNING,
NIL_LOCAL_SCHEDULER_ID,
driver_task.execution_dependencies_string(),
ray.local_scheduler.task_to_string(driver_task))
# Set the driver's current task ID to the task ID assigned to the
# driver task.
Expand Down
21 changes: 21 additions & 0 deletions src/common/common_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ ObjectID from_flatbuf(const flatbuffers::String &string) {
return object_id;
}

const std::vector<ObjectID> from_flatbuf(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
&vector) {
std::vector<ObjectID> object_ids;
for (int64_t i = 0; i < vector.Length(); i++) {
object_ids.push_back(from_flatbuf(*vector.Get(i)));
}
return object_ids;
}

flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
Expand All @@ -25,6 +35,17 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
return fbb.CreateVector(results);
}

flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
const std::vector<ObjectID> &object_ids) {
std::vector<flatbuffers::Offset<flatbuffers::String>> results;
for (auto object_id : object_ids) {
results.push_back(to_flatbuf(fbb, object_id));
}
return fbb.CreateVector(results);
}

std::string string_from_flatbuf(const flatbuffers::String &string) {
return std::string(string.data(), string.size());
}
Expand Down
18 changes: 18 additions & 0 deletions src/common/common_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ flatbuffers::Offset<flatbuffers::String> to_flatbuf(
/// @return The object ID.
ObjectID from_flatbuf(const flatbuffers::String &string);

/// Convert a flatbuffer vector of strings to a vector of object IDs.
///
/// @param vector The flatbuffer vector.
/// @return The vector of object IDs.
const std::vector<ObjectID> from_flatbuf(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
&vector);

/// Convert an array of object IDs to a flatbuffer vector of strings.
///
/// @param fbb Reference to the flatbuffer builder.
Expand All @@ -36,6 +44,16 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
ObjectID object_ids[],
int64_t num_objects);

/// Convert a vector of object IDs to a flatbuffer vector of strings.
///
/// @param fbb Reference to the flatbuffer builder.
/// @param object_ids Vector of object IDs.
/// @return Flatbuffer vector of strings.
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
const std::vector<ObjectID> &object_ids);

/// Convert a flatbuffer string to a std::string.
///
/// @param fbb Reference to the flatbuffer builder.
Expand Down
10 changes: 10 additions & 0 deletions src/common/format/common.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ table ObjectInfo {

root_type TaskInfo;

table TaskExecutionDependencies {
// A list of object IDs representing this task's dependencies at execution
// time.
execution_dependencies: [string];
}

root_type TaskExecutionDependencies;

table SubscribeToNotificationsReply {
// The object ID of the object that the notification is about.
object_id: string;
Expand All @@ -89,6 +97,8 @@ table TaskReply {
state: long;
// A local scheduler ID.
local_scheduler_id: string;
// A string of bytes representing the task's TaskExecutionDependencies.
execution_dependencies: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be cleaner to store the list of strings here directly instead of wrapping and serializing them? Any reason this is not possible?

// A string of bytes representing the task specification.
task_spec: string;
// A boolean representing whether the update was successful. This field
Expand Down
51 changes: 45 additions & 6 deletions src/common/lib/python/common_extension.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "common.h"
#include "common_extension.h"
#include "common_protocol.h"
#include "task.h"

#include <string>
Expand Down Expand Up @@ -104,6 +105,8 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) {
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->size = size;
result->spec = TaskSpec_copy((TaskSpec *) data, size);
/* The created task does not include any execution dependencies. */
result->execution_dependencies = new std::vector<ObjectID>();
/* TODO(pcm): Use flatbuffers validation here. */
return (PyObject *) result;
}
Expand Down Expand Up @@ -288,14 +291,18 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
TaskID parent_task_id;
/* The number of tasks that the parent task has called prior to this one. */
int parent_counter;
/* Arguments of the task that are execution-dependent. These must be
* PyObjectIDs). */
PyObject *execution_arguments = NULL;
/* Dictionary of resource requirements for this task. */
PyObject *resource_map = NULL;
if (!PyArg_ParseTuple(
args, "O&O&OiO&i|O&O&iOO", &PyObjectToUniqueID, &driver_id,
&PyObjectToUniqueID, &function_id, &arguments, &num_returns,
&PyObjectToUniqueID, &parent_task_id, &parent_counter,
&PyObjectToUniqueID, &actor_id, &PyObjectToUniqueID, &actor_handle_id,
&actor_counter, &is_actor_checkpoint_method_object, &resource_map)) {
if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&iOOO", &PyObjectToUniqueID,
&driver_id, &PyObjectToUniqueID, &function_id,
&arguments, &num_returns, &PyObjectToUniqueID,
&parent_task_id, &parent_counter, &PyObjectToUniqueID,
&actor_id, &PyObjectToUniqueID, &actor_handle_id,
&actor_counter, &is_actor_checkpoint_method_object,
&execution_arguments, &resource_map)) {
return -1;
}

Expand Down Expand Up @@ -371,13 +378,31 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {

/* Compute the task ID and the return object IDs. */
self->spec = TaskSpec_finish_construct(g_task_builder, &self->size);

/* Set the task's execution dependencies. */
self->execution_dependencies = new std::vector<ObjectID>();
if (execution_arguments != NULL) {
size = PyList_Size(execution_arguments);
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *execution_arg = PyList_GetItem(execution_arguments, i);
if (!PyObject_IsInstance(execution_arg, (PyObject *) &PyObjectIDType)) {
PyErr_SetString(PyExc_TypeError,
"Execution arguments must be an ObjectID.");
return -1;
}
self->execution_dependencies->push_back(
((PyObjectID *) execution_arg)->object_id);
}
}

return 0;
}

static void PyTask_dealloc(PyTask *self) {
if (self->spec != NULL) {
TaskSpec_free(self->spec);
}
delete self->execution_dependencies;
Py_TYPE(self)->tp_free((PyObject *) self);
}

Expand Down Expand Up @@ -471,6 +496,15 @@ static PyObject *PyTask_returns(PyObject *self) {
return return_id_list;
}

static PyObject *PyTask_execution_dependencies_string(PyTask *self) {
flatbuffers::FlatBufferBuilder fbb;
auto execution_dependencies = CreateTaskExecutionDependencies(
fbb, to_flatbuf(fbb, *self->execution_dependencies));
fbb.Finish(execution_dependencies);
return PyBytes_FromStringAndSize((char *) fbb.GetBufferPointer(),
fbb.GetSize());
}

static PyMethodDef PyTask_methods[] = {
{"function_id", (PyCFunction) PyTask_function_id, METH_NOARGS,
"Return the function ID for this task."},
Expand All @@ -492,6 +526,9 @@ static PyMethodDef PyTask_methods[] = {
"Return the resource vector of the task."},
{"returns", (PyCFunction) PyTask_returns, METH_NOARGS,
"Return the object IDs for the return values of the task."},
{"execution_dependencies_string",
(PyCFunction) PyTask_execution_dependencies_string, METH_NOARGS,
"Return the execution dependencies for the task as a string."},
{NULL} /* Sentinel */
};

Expand Down Expand Up @@ -543,6 +580,8 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) {
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->spec = task_spec;
result->size = task_size;
/* The created task does not include any execution dependencies. */
result->execution_dependencies = new std::vector<ObjectID>();
return (PyObject *) result;
}

Expand Down
3 changes: 3 additions & 0 deletions src/common/lib/python/common_extension.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef COMMON_EXTENSION_H
#define COMMON_EXTENSION_H

#include <vector>

#include <Python.h>
#include "marshal.h"
#include "structmember.h"
Expand All @@ -22,6 +24,7 @@ typedef struct {
PyObject_HEAD
int64_t size;
TaskSpec *spec;
std::vector<ObjectID> *execution_dependencies;
} PyTask;
// clang-format on

Expand Down
Loading