Convert actor dummy objects to task execution edges.#1281
Convert actor dummy objects to task execution edges.#1281robertnishihara merged 19 commits intoray-project:masterfrom
Conversation
|
Build finished. Test FAILed. |
|
Test FAILed. |
|
Build finished. Test PASSed. |
|
Test PASSed. |
aae6bf0 to
4b62ebe
Compare
robertnishihara
left a comment
There was a problem hiding this comment.
Leaving a couple comments. I'm only part way through the PR.
src/common/common_protocol.cc
Outdated
There was a problem hiding this comment.
should the return be const?
src/common/common_protocol.cc
Outdated
There was a problem hiding this comment.
can we make this a ref instead of pointer? In #1236 (which unfortunately introduces a bunch of conflicts) I got rid of the pointers in this file.
There was a problem hiding this comment.
We haven't been super consistent about this, but this should probably raise a TypeError instead of dying.
There was a problem hiding this comment.
We should follow the Google C++ style guide here for include orders.
https://google.github.io/styleguide/cppguide.html#Names_and_Order_of_Includes
src/common/task.cc
Outdated
There was a problem hiding this comment.
is this used anywhere?
src/common/task.cc
Outdated
There was a problem hiding this comment.
Shouldn't we use new/delete instead of malloc/free? Especially given that this has a std::vector inside which presumably needs to have its constructor called?
There was a problem hiding this comment.
Yeah, I wanted this too, but ended up just copying the current Task_alloc to deal with the variable-sized TaskSpec. I think we can fix this with a unique_ptr.
src/common/task.cc
Outdated
There was a problem hiding this comment.
pass execution_dependenciesas const ref
src/common/task.cc
Outdated
There was a problem hiding this comment.
would probably be good to be consistent about using execution_spec as the variable name for TaskExecutionSpecs
src/common/task.cc
Outdated
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
4b62ebe to
36b98da
Compare
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
b7b6ab2 to
42bf669
Compare
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
42bf669 to
c512c83
Compare
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
src/common/test/example_task.h
Outdated
There was a problem hiding this comment.
This is unnecessarily verbose i think. I think you can just do
std::vector<ObjectID> execution_dependencies;There was a problem hiding this comment.
Interesting, so in this case we'd probably need to store ptr as a field inside of the PyTask struct, which is pretty weird, but maybe necessary. Another workaround would be to just have the PyTask contain a std::vector<ObjectID> * instead of std::vector<ObjectID>. Not sure if there would be a performance hit there.
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
src/common/task.h
Outdated
There was a problem hiding this comment.
You've obviously given this more thought, but it seems cleaner to me to expose two methods ObjectIDDependencies and ExecutionDependencies.
There are some places in the local scheduler where we would have to iterate over both lists (e.g., when issuing reconstruction commands).
However, there are some places where we only want to iterate over a single list, such as when issuing fetch requests).
@pcmoritz thoughts about this?
There was a problem hiding this comment.
E.g., in the future it could make sense to change the dummy object ID implementation and have execution dependencies actually be a vector of TaskIDs instead of ObjectIDs.
There was a problem hiding this comment.
Hmm, I'm not convinced that it would be cleaner to have a vector of TaskIDs instead of ObjectIDs. It would be nice to keep reconstruction and fetching on relatively the same path. If you're worried about the latency of having to go to the result table, I think it makes more sense to cache information for actor tasks at the local scheduler.
There was a problem hiding this comment.
I agree it would be cleaner to separate ObjectIDDependencies from ExecutionDependencies. Probably also having TaskIDs instead of ObjectIDs for the latter but we can think more about that and do it in a followup PR.
src/common/task.cc
Outdated
There was a problem hiding this comment.
By the way, I would suggest switching to std::copy or std::copy_n, since that's the way to copy memory in C++. This code will break if at any time the data you are copying becomes nontrivial to copy (such as if the data type gains any members with constructors or destructors).
dfe49ad to
df92acb
Compare
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
src/common/common_protocol.cc
Outdated
| auto string = vector.Get(i); | ||
| CHECK(string->size() == sizeof(object_id.id)); | ||
| memcpy(&object_id.id[0], string->data(), sizeof(object_id.id)); | ||
| object_ids.push_back(object_id); |
There was a problem hiding this comment.
can we just do object_ids.push_back(from_flatbuf(vector.Get(i)) here?
| // A local scheduler ID. | ||
| local_scheduler_id: string; | ||
| // A string of bytes representing the task's TaskExecutionDependencies. | ||
| execution_dependencies: string; |
There was a problem hiding this comment.
wouldn't it be cleaner to store the list of strings here directly instead of wrapping and serializing them? Any reason this is not possible?
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
There are a couple remaining cleanups, but we can fix those in follow up PRs. |
| } | ||
|
|
||
| std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() { | ||
| return execution_dependencies_; |
There was a problem hiding this comment.
I believe this makes a copy of a vector of ObjectIDs each time you call ExecutionDependencies(). For readonly callers, why not return a const ref? Callers who need their own copy of the vector can make one.
| } | ||
| } | ||
| // Iterate through the execution dependencies to see if it contains object_id. | ||
| for (auto dependency_id : execution_dependencies_) { |
There was a problem hiding this comment.
this makes a copy of each ObjectID element of the execution dependencies vector. Why not
for (const auto &dep_id : exe_deps) {
ObjectID_equal is a readonly consumer (which, incidentally, we should be enforcing in the function prototype -- I can create a PR for that separately).
There was a problem hiding this comment.
a side note (less important for now), if we ever envision large numbers of execution dependencies, we should probably make it a hashmap. We had to eliminate linear searches in the past for performance reasons.
| } | ||
| } | ||
|
|
||
| bool TaskExecutionSpec::DependsOn(ObjectID object_id) { |
There was a problem hiding this comment.
another instance where we should really be passing a const ref to the object id. Does a few useful things:
- establishes a contract with the caller that the object_id is not going to be mutated
- eliminates a 20 byte copy when calling this function
| } | ||
| // Iterate through the execution dependencies to see if it contains object_id. | ||
| for (auto dependency_id : execution_dependencies_) { | ||
| if (ObjectID_equal(dependency_id, object_id)) { |
There was a problem hiding this comment.
we have operator== overloaded for object ID equality testing. I think we should deprecate the use of ObjectID_equal (with pass by value ObjectIDs) in favor of the equality operator. Perhaps all new code should be using the new C++ equality operator for object equality testing.
| task_id.id, sizeof(task_id.id), state, local_scheduler_id.id, | ||
| sizeof(local_scheduler_id.id), spec, Task_task_spec_size(task)); | ||
| sizeof(local_scheduler_id.id), fbb.GetBufferPointer(), | ||
| (size_t) fbb.GetSize(), spec, execution_spec->SpecSize()); |
There was a problem hiding this comment.
Why not send the whole TaskExecutionSpec, replacing TaskSpec and its size? That way, any expansion of the TaskExecutionSpec class doesn't/won't affect this redis logic (provided we generate the flatbuffers serialization logic for it). It seems odd that only one field of the TaskExecutionSpec is extracted and flatbuffered. For any new fields (like spillback) we need to update this redis function and its callback counterpart.
There was a problem hiding this comment.
That would make it more expensive to implement task_table_update, where we don't want to rewrite the whole TaskSpec.
This PR converts the actor "dummy" objects, which were used to indicate actor execution order rather than data dependencies explicitly specified by the user, into a separate, mutable field in the task spec.
What do these changes do?
This replaces all instances of the
TaskSpecwith the newTaskExecutionSpec, which comprises the immutableTaskSpecand an additional vector of execution dependencies. Execution dependencies are currently only used by actors, to determine what tasks should have executed on the actor before the new task can be scheduled. Currently, these execution dependencies are never changed in the GCS task table, but in the future, they may be used for deterministic actor replay by recording the exact order of initial execution.