-
Notifications
You must be signed in to change notification settings - Fork 6.2k
Convert actor dummy objects to task execution edges. #1281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert actor dummy objects to task execution edges. #1281
Conversation
Build finished. Test FAILed. |
Test FAILed. |
Build finished. Test PASSed. |
Test PASSed. |
aae6bf0
to
4b62ebe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a couple comments. I'm only part way through the PR.
src/common/common_protocol.cc
Outdated
@@ -13,6 +13,20 @@ ObjectID from_flatbuf(const flatbuffers::String *string) { | |||
return object_id; | |||
} | |||
|
|||
std::vector<ObjectID> from_flatbuf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the return be const
?
src/common/common_protocol.cc
Outdated
@@ -13,6 +13,20 @@ ObjectID from_flatbuf(const flatbuffers::String *string) { | |||
return object_id; | |||
} | |||
|
|||
std::vector<ObjectID> from_flatbuf( | |||
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> | |||
*vector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this a ref instead of pointer? In #1236 (which unfortunately introduces a bunch of conflicts) I got rid of the pointers in this file.
size = PyList_Size(execution_arguments); | ||
for (Py_ssize_t i = 0; i < size; ++i) { | ||
PyObject *execution_arg = PyList_GetItem(execution_arguments, i); | ||
CHECK(PyObject_IsInstance(execution_arg, (PyObject *) &PyObjectIDType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't been super consistent about this, but this should probably raise a TypeError instead of dying.
@@ -2,6 +2,7 @@ | |||
#define COMMON_EXTENSION_H | |||
|
|||
#include <Python.h> | |||
#include <vector> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should follow the Google C++ style guide here for include orders.
https://google.github.io/styleguide/cppguide.html#Names_and_Order_of_Includes
@@ -289,6 +289,17 @@ int64_t TaskSpec_num_args(TaskSpec *spec) { | |||
return message->args()->size(); | |||
} | |||
|
|||
int64_t TaskSpec_num_args_by_ref(TaskSpec *spec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used anywhere?
src/common/task.cc
Outdated
|
||
TaskExecutionSpec *TaskExecutionSpec_alloc(std::vector<ObjectID> execution_dependencies, TaskSpec *spec, int64_t task_spec_size) { | ||
int64_t size = sizeof(TaskExecutionSpec) - sizeof(TaskSpec) + task_spec_size; | ||
TaskExecutionSpec *copy = (TaskExecutionSpec *) malloc(size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use new
/delete
instead of malloc
/free
? Especially given that this has a std::vector
inside which presumably needs to have its constructor called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I wanted this too, but ended up just copying the current Task_alloc
to deal with the variable-sized TaskSpec
. I think we can fix this with a unique_ptr
.
src/common/task.cc
Outdated
TaskExecutionSpec_task_spec_size(spec)); | ||
} | ||
|
||
TaskExecutionSpec *TaskExecutionSpec_alloc(std::vector<ObjectID> execution_dependencies, TaskSpec *spec, int64_t task_spec_size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass execution_dependencies
as const ref
src/common/task.cc
Outdated
int64_t TaskExecutionSpec_task_spec_size(TaskExecutionSpec *spec) { | ||
return spec->task_spec_size; | ||
} | ||
TaskSpec *TaskExecutionSpec_task_spec(TaskExecutionSpec *spec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would probably be good to be consistent about using execution_spec
as the variable name for TaskExecutionSpec
s
src/common/task.cc
Outdated
} | ||
|
||
/* TASK INSTANCES */ | ||
|
||
Task *Task_alloc(TaskSpec *spec, | ||
int64_t task_spec_size, | ||
int state, | ||
DBClientID local_scheduler_id) { | ||
DBClientID local_scheduler_id, | ||
std::vector<ObjectID> execution_dependencies) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ref
Merged build finished. Test FAILed. |
Test FAILed. |
4b62ebe
to
36b98da
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
b7b6ab2
to
42bf669
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
42bf669
to
c512c83
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
src/common/test/example_task.h
Outdated
return TaskSpec_finish_construct(g_task_builder, task_spec_size); | ||
int64_t task_spec_size; | ||
TaskSpec *spec = TaskSpec_finish_construct(g_task_builder, &task_spec_size); | ||
std::vector<ObjectID> execution_dependencies = std::vector<ObjectID>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unnecessarily verbose i think. I think you can just do
std::vector<ObjectID> execution_dependencies;
@@ -104,6 +108,8 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { | |||
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); | |||
result->size = size; | |||
result->spec = TaskSpec_copy((TaskSpec *) data, size); | |||
/* The created task does not include any execution dependencies. */ | |||
result->execution_dependencies = empty_execution_dependencies; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, so in this case we'd probably need to store ptr
as a field inside of the PyTask
struct, which is pretty weird, but maybe necessary. Another workaround would be to just have the PyTask
contain a std::vector<ObjectID> *
instead of std::vector<ObjectID>
. Not sure if there would be a performance hit there.
Merged build finished. Test PASSed. |
Test PASSed. |
/// | ||
/// @return A vector of object IDs representing this task's execution | ||
/// dependencies. | ||
std::vector<ObjectID> ExecutionDependencies(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've obviously given this more thought, but it seems cleaner to me to expose two methods ObjectIDDependencies
and ExecutionDependencies
.
There are some places in the local scheduler where we would have to iterate over both lists (e.g., when issuing reconstruction commands).
However, there are some places where we only want to iterate over a single list, such as when issuing fetch requests).
@pcmoritz thoughts about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g., in the future it could make sense to change the dummy object ID implementation and have execution dependencies actually be a vector of TaskID
s instead of ObjectID
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not convinced that it would be cleaner to have a vector of TaskID
s instead of ObjectID
s. It would be nice to keep reconstruction and fetching on relatively the same path. If you're worried about the latency of having to go to the result table, I think it makes more sense to cache information for actor tasks at the local scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it would be cleaner to separate ObjectIDDependencies from ExecutionDependencies. Probably also having TaskIDs instead of ObjectIDs for the latter but we can think more about that and do it in a followup PR.
execution_dependencies_ = execution_dependencies; | ||
task_spec_size_ = task_spec_size; | ||
TaskSpec *spec_copy = new TaskSpec[task_spec_size_]; | ||
memcpy(spec_copy, spec, task_spec_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, I would suggest switching to std::copy
or std::copy_n
, since that's the way to copy memory in C++. This code will break if at any time the data you are copying becomes nontrivial to copy (such as if the data type gains any members with constructors or destructors).
dfe49ad
to
df92acb
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
src/common/common_protocol.cc
Outdated
auto string = vector.Get(i); | ||
CHECK(string->size() == sizeof(object_id.id)); | ||
memcpy(&object_id.id[0], string->data(), sizeof(object_id.id)); | ||
object_ids.push_back(object_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just do object_ids.push_back(from_flatbuf(vector.Get(i))
here?
@@ -89,6 +97,8 @@ table TaskReply { | |||
state: long; | |||
// A local scheduler ID. | |||
local_scheduler_id: string; | |||
// A string of bytes representing the task's TaskExecutionDependencies. | |||
execution_dependencies: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't it be cleaner to store the list of strings here directly instead of wrapping and serializing them? Any reason this is not possible?
Merged build finished. Test PASSed. |
Test PASSed. |
There are a couple remaining cleanups, but we can fix those in follow up PRs. |
} | ||
|
||
std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() { | ||
return execution_dependencies_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this makes a copy of a vector of ObjectIDs each time you call ExecutionDependencies(). For readonly callers, why not return a const ref? Callers who need their own copy of the vector can make one.
@@ -359,45 +456,60 @@ bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) { | |||
} | |||
} | |||
} | |||
// Iterate through the execution dependencies to see if it contains object_id. | |||
for (auto dependency_id : execution_dependencies_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes a copy of each ObjectID element of the execution dependencies vector. Why not
for (const auto &dep_id : exe_deps) {
ObjectID_equal is a readonly consumer (which, incidentally, we should be enforcing in the function prototype -- I can create a PR for that separately).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a side note (less important for now), if we ever envision large numbers of execution dependencies, we should probably make it a hashmap. We had to eliminate linear searches in the past for performance reasons.
} | ||
} | ||
|
||
bool TaskExecutionSpec::DependsOn(ObjectID object_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another instance where we should really be passing a const ref to the object id. Does a few useful things:
- establishes a contract with the caller that the object_id is not going to be mutated
- eliminates a 20 byte copy when calling this function
@@ -359,45 +456,60 @@ bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) { | |||
} | |||
} | |||
} | |||
// Iterate through the execution dependencies to see if it contains object_id. | |||
for (auto dependency_id : execution_dependencies_) { | |||
if (ObjectID_equal(dependency_id, object_id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have operator==
overloaded for object ID equality testing. I think we should deprecate the use of ObjectID_equal (with pass by value ObjectIDs) in favor of the equality operator. Perhaps all new code should be using the new C++ equality operator for object equality testing.
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id, | ||
sizeof(local_scheduler_id.id), spec, Task_task_spec_size(task)); | ||
sizeof(local_scheduler_id.id), fbb.GetBufferPointer(), | ||
(size_t) fbb.GetSize(), spec, execution_spec->SpecSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not send the whole TaskExecutionSpec, replacing TaskSpec and its size? That way, any expansion of the TaskExecutionSpec
class doesn't/won't affect this redis logic (provided we generate the flatbuffers serialization logic for it). It seems odd that only one field of the TaskExecutionSpec is extracted and flatbuffered. For any new fields (like spillback) we need to update this redis function and its callback counterpart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would make it more expensive to implement task_table_update
, where we don't want to rewrite the whole TaskSpec
.
This PR converts the actor "dummy" objects, which were used to indicate actor execution order rather than data dependencies explicitly specified by the user, into a separate, mutable field in the task spec.
What do these changes do?
This replaces all instances of the
TaskSpec
with the newTaskExecutionSpec
, which comprises the immutableTaskSpec
and an additional vector of execution dependencies. Execution dependencies are currently only used by actors, to determine what tasks should have executed on the actor before the new task can be scheduled. Currently, these execution dependencies are never changed in the GCS task table, but in the future, they may be used for deterministic actor replay by recording the exact order of initial execution.