Skip to content

Convert actor dummy objects to task execution edges. #1281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

stephanie-wang
Copy link
Contributor

This PR converts the actor "dummy" objects, which were used to indicate actor execution order rather than data dependencies explicitly specified by the user, into a separate, mutable field in the task spec.

What do these changes do?

This replaces all instances of the TaskSpec with the new TaskExecutionSpec, which comprises the immutable TaskSpec and an additional vector of execution dependencies. Execution dependencies are currently only used by actors, to determine what tasks should have executed on the actor before the new task can be scheduled. Currently, these execution dependencies are never changed in the GCS task table, but in the future, they may be used for deterministic actor replay by recording the exact order of initial execution.

@AmplabJenkins
Copy link

Build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2628/
Test FAILed.

@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2629/
Test PASSed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a couple comments. I'm only part way through the PR.

@@ -13,6 +13,20 @@ ObjectID from_flatbuf(const flatbuffers::String *string) {
return object_id;
}

std::vector<ObjectID> from_flatbuf(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the return be const?

@@ -13,6 +13,20 @@ ObjectID from_flatbuf(const flatbuffers::String *string) {
return object_id;
}

std::vector<ObjectID> from_flatbuf(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
*vector) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a ref instead of pointer? In #1236 (which unfortunately introduces a bunch of conflicts) I got rid of the pointers in this file.

size = PyList_Size(execution_arguments);
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *execution_arg = PyList_GetItem(execution_arguments, i);
CHECK(PyObject_IsInstance(execution_arg, (PyObject *) &PyObjectIDType));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't been super consistent about this, but this should probably raise a TypeError instead of dying.

@@ -2,6 +2,7 @@
#define COMMON_EXTENSION_H

#include <Python.h>
#include <vector>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should follow the Google C++ style guide here for include orders.

https://google.github.io/styleguide/cppguide.html#Names_and_Order_of_Includes

@@ -289,6 +289,17 @@ int64_t TaskSpec_num_args(TaskSpec *spec) {
return message->args()->size();
}

int64_t TaskSpec_num_args_by_ref(TaskSpec *spec) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere?


TaskExecutionSpec *TaskExecutionSpec_alloc(std::vector<ObjectID> execution_dependencies, TaskSpec *spec, int64_t task_spec_size) {
int64_t size = sizeof(TaskExecutionSpec) - sizeof(TaskSpec) + task_spec_size;
TaskExecutionSpec *copy = (TaskExecutionSpec *) malloc(size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use new/delete instead of malloc/free? Especially given that this has a std::vector inside which presumably needs to have its constructor called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted this too, but ended up just copying the current Task_alloc to deal with the variable-sized TaskSpec. I think we can fix this with a unique_ptr.

TaskExecutionSpec_task_spec_size(spec));
}

TaskExecutionSpec *TaskExecutionSpec_alloc(std::vector<ObjectID> execution_dependencies, TaskSpec *spec, int64_t task_spec_size) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass execution_dependenciesas const ref

int64_t TaskExecutionSpec_task_spec_size(TaskExecutionSpec *spec) {
return spec->task_spec_size;
}
TaskSpec *TaskExecutionSpec_task_spec(TaskExecutionSpec *spec) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would probably be good to be consistent about using execution_spec as the variable name for TaskExecutionSpecs

}

/* TASK INSTANCES */

Task *Task_alloc(TaskSpec *spec,
int64_t task_spec_size,
int state,
DBClientID local_scheduler_id) {
DBClientID local_scheduler_id,
std::vector<ObjectID> execution_dependencies) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const ref

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2634/
Test FAILed.

@stephanie-wang stephanie-wang changed the title [WIP] Convert actor dummy objects to task execution edges. Convert actor dummy objects to task execution edges. Dec 7, 2017
@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2661/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2665/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2682/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2763/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2765/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2777/
Test PASSed.

return TaskSpec_finish_construct(g_task_builder, task_spec_size);
int64_t task_spec_size;
TaskSpec *spec = TaskSpec_finish_construct(g_task_builder, &task_spec_size);
std::vector<ObjectID> execution_dependencies = std::vector<ObjectID>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessarily verbose i think. I think you can just do

std::vector<ObjectID> execution_dependencies;

@@ -104,6 +108,8 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) {
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->size = size;
result->spec = TaskSpec_copy((TaskSpec *) data, size);
/* The created task does not include any execution dependencies. */
result->execution_dependencies = empty_execution_dependencies;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so in this case we'd probably need to store ptr as a field inside of the PyTask struct, which is pretty weird, but maybe necessary. Another workaround would be to just have the PyTask contain a std::vector<ObjectID> * instead of std::vector<ObjectID>. Not sure if there would be a performance hit there.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2782/
Test PASSed.

///
/// @return A vector of object IDs representing this task's execution
/// dependencies.
std::vector<ObjectID> ExecutionDependencies();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've obviously given this more thought, but it seems cleaner to me to expose two methods ObjectIDDependencies and ExecutionDependencies.

There are some places in the local scheduler where we would have to iterate over both lists (e.g., when issuing reconstruction commands).

However, there are some places where we only want to iterate over a single list, such as when issuing fetch requests).

@pcmoritz thoughts about this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., in the future it could make sense to change the dummy object ID implementation and have execution dependencies actually be a vector of TaskIDs instead of ObjectIDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not convinced that it would be cleaner to have a vector of TaskIDs instead of ObjectIDs. It would be nice to keep reconstruction and fetching on relatively the same path. If you're worried about the latency of having to go to the result table, I think it makes more sense to cache information for actor tasks at the local scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it would be cleaner to separate ObjectIDDependencies from ExecutionDependencies. Probably also having TaskIDs instead of ObjectIDs for the latter but we can think more about that and do it in a followup PR.

execution_dependencies_ = execution_dependencies;
task_spec_size_ = task_spec_size;
TaskSpec *spec_copy = new TaskSpec[task_spec_size_];
memcpy(spec_copy, spec, task_spec_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I would suggest switching to std::copy or std::copy_n, since that's the way to copy memory in C++. This code will break if at any time the data you are copying becomes nontrivial to copy (such as if the data type gains any members with constructors or destructors).

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2790/
Test PASSed.

auto string = vector.Get(i);
CHECK(string->size() == sizeof(object_id.id));
memcpy(&object_id.id[0], string->data(), sizeof(object_id.id));
object_ids.push_back(object_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just do object_ids.push_back(from_flatbuf(vector.Get(i)) here?

@@ -89,6 +97,8 @@ table TaskReply {
state: long;
// A local scheduler ID.
local_scheduler_id: string;
// A string of bytes representing the task's TaskExecutionDependencies.
execution_dependencies: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be cleaner to store the list of strings here directly instead of wrapping and serializing them? Any reason this is not possible?

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2796/
Test PASSed.

@robertnishihara robertnishihara merged commit 12fdb3f into ray-project:master Dec 15, 2017
@robertnishihara robertnishihara deleted the actor-nondeterminism branch December 15, 2017 04:47
@robertnishihara
Copy link
Collaborator

robertnishihara commented Dec 15, 2017

There are a couple remaining cleanups, but we can fix those in follow up PRs.

}

std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() {
return execution_dependencies_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this makes a copy of a vector of ObjectIDs each time you call ExecutionDependencies(). For readonly callers, why not return a const ref? Callers who need their own copy of the vector can make one.

@@ -359,45 +456,60 @@ bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) {
}
}
}
// Iterate through the execution dependencies to see if it contains object_id.
for (auto dependency_id : execution_dependencies_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes a copy of each ObjectID element of the execution dependencies vector. Why not

for (const auto &dep_id : exe_deps) {

ObjectID_equal is a readonly consumer (which, incidentally, we should be enforcing in the function prototype -- I can create a PR for that separately).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a side note (less important for now), if we ever envision large numbers of execution dependencies, we should probably make it a hashmap. We had to eliminate linear searches in the past for performance reasons.

}
}

bool TaskExecutionSpec::DependsOn(ObjectID object_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another instance where we should really be passing a const ref to the object id. Does a few useful things:

  • establishes a contract with the caller that the object_id is not going to be mutated
  • eliminates a 20 byte copy when calling this function

@@ -359,45 +456,60 @@ bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) {
}
}
}
// Iterate through the execution dependencies to see if it contains object_id.
for (auto dependency_id : execution_dependencies_) {
if (ObjectID_equal(dependency_id, object_id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have operator== overloaded for object ID equality testing. I think we should deprecate the use of ObjectID_equal (with pass by value ObjectIDs) in favor of the equality operator. Perhaps all new code should be using the new C++ equality operator for object equality testing.

task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
sizeof(local_scheduler_id.id), spec, Task_task_spec_size(task));
sizeof(local_scheduler_id.id), fbb.GetBufferPointer(),
(size_t) fbb.GetSize(), spec, execution_spec->SpecSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not send the whole TaskExecutionSpec, replacing TaskSpec and its size? That way, any expansion of the TaskExecutionSpec class doesn't/won't affect this redis logic (provided we generate the flatbuffers serialization logic for it). It seems odd that only one field of the TaskExecutionSpec is extracted and flatbuffered. For any new fields (like spillback) we need to update this redis function and its callback counterpart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make it more expensive to implement task_table_update, where we don't want to rewrite the whole TaskSpec.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants