-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Basic task reconstruction mechanism #2526
[xray] Basic task reconstruction mechanism #2526
Conversation
Test PASSed. |
1fc636c
to
801d4b6
Compare
src/ray/gcs/tables.h
Outdated
// since the lease entry itself contains the expiration period. In the | ||
// worst case, if the command fails, then a client that looks up the lease | ||
// entry will overestimate the expiration time. | ||
std::vector<std::string> args = {"PEXPIRE", "TASK_LEASE" + id.binary(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of hardcoding TASK_LEASE
, we can do something like EnumNameTablePrefix(TablePrefix::TASK_LEASE)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks :)
const WriteCallback &done) override { | ||
RAY_RETURN_NOT_OK((Table<TaskID, TaskLeaseData>::Add(job_id, id, data, done))); | ||
// Mark the entry for expiration in Redis. It's okay if this command fails | ||
// since the lease entry itself contains the expiration period. In the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, is it actually ok if this fails? E.g., if it fails, then it will fail to extend the previous expiration, and so the key could actually expire too early, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to Redis documentation:
The timeout will only be cleared by commands that delete or overwrite the contents of the key, including DEL, SET, GETSET and all the *STORE commands.
I believe that the Add
command in the previous line will overwrite the contents.
src/ray/raylet/task_spec.h
Outdated
@@ -82,6 +82,8 @@ class TaskSpecification { | |||
/// \param string A serialized task specification flatbuffer. | |||
TaskSpecification(const flatbuffers::String &string); | |||
|
|||
TaskSpecification(const std::string &string); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document, in particular it's worth saying that this requires the string to be in scope for at least as long as the TaskSpecification
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll document. I don't think that is true though, since the constructor performs a copy of the string data.
// reconstruction first. This should suppress this node's first attempt at | ||
// reconstruction. | ||
auto task_reconstruction_data = std::make_shared<TaskReconstructionDataT>(); | ||
task_reconstruction_data->node_manager_id = ClientID::from_random().hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All UniqueIDs stored in flatbuffer objects should be binary not hex.
@@ -70,6 +71,8 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { | |||
/// this timeout, then objects that the task creates may be reconstructed. | |||
void HandleTaskLeaseNotification(const TaskID &task_id, int64_t lease_timeout_ms); | |||
|
|||
void HandleReconstructionLogAppend(const TaskID &task_id, bool success); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document
src/ray/raylet/node_manager.cc
Outdated
}); | ||
} | ||
|
||
void NodeManager::ResubmitTask(const Task &task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method is called, then we are guaranteed that task
is currently not in local_queues_
, right?
We are also guaranteed that this node manager has the task lease, right?
Might be helpful to document some of these constraints/assumptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, task
should not be in local_queues_
. If it is, then there is a spurious reconstruction.
The node manager will acquire the task lease once it calls TaskPending
, which should happen once the task is accepted by the node (e.g., in EnqueuePlaceableTask).
Thanks, I'll document that.
src/ray/raylet/node_manager.cc
Outdated
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { | ||
const Task &task = lineage_cache_.GetTask(task_id); | ||
ResubmitTask(task); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two callbacks here. Can you document what circumstances lead to each of the two callbacks firing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, thanks!
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
1ab9b6c
to
a8288d2
Compare
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
jenkins, retest this please |
Test FAILed. |
jenkins, retest this please |
Test PASSed. |
|
||
Status Add(const JobID &job_id, const TaskID &id, std::shared_ptr<TaskLeaseDataT> &data, | ||
const WriteCallback &done) override { | ||
RAY_RETURN_NOT_OK((Table<TaskID, TaskLeaseData>::Add(job_id, id, data, done))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there's a pair of unnecessary parentheses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I need the parentheses for it to build for some reason...I don't know exactly why, but the compiler seems to think that there are 2 arguments to the macro, maybe because of the templated argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason that you have to add Table<TaskID, TaskLeaseData>::
because the two methods have the same name? Would that go away if we just change this method name from Add
to something else like UpdateTaskLease
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just googled this, the reason is because the macro processor doesn't know about template, thus recognizes Table<TaskID
as a parameter.
Someone suggests this way:
#define COMMA ,
RAY_RETURN_NOT_OK(Table<TaskID COMMA TaskLeaseData>::(...))
It works. But I think adding an extra pair of parentheses is easier. Or as @robertnishihara suggested, use another function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided against changing the function name since I believe it's nicer to have the TaskLeaseTable
still implement the Table<TaskID, TaskLeaseData>
interface.
src/ray/raylet/lineage_cache.cc
Outdated
@@ -436,6 +436,13 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) { | |||
} | |||
} | |||
|
|||
const Task LineageCache::GetTask(const TaskID &task_id) const { | |||
auto entries = lineage_.GetEntries(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto -> auto& ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
src/ray/raylet/lineage_cache.cc
Outdated
@@ -436,6 +436,13 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) { | |||
} | |||
} | |||
|
|||
const Task LineageCache::GetTask(const TaskID &task_id) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const Task & ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
} | ||
|
||
void NodeManager::ResubmitTask(const Task &task) { | ||
// Actor reconstruction is turned off by default right now. If this is an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious: is this a stage approach - aka. actor reconstruction will be enabled later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we're still thinking about how exactly actor reconstruction should be exposed to the user. I believe the current plan is to have actor reconstruction enabled by a flag from the frontend.
src/ray/raylet/node_manager.cc
Outdated
// The task was not in the GCS task table. It must therefore be in the | ||
// lineage cache. Use a copy of the cached task spec to re-execute the | ||
// task. | ||
const Task task = lineage_cache_.GetTask(task_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto & ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually mean to copy the task here in case the lineage cache gets modified while resubmitting the task for execution.
Test FAILed. |
2c02ca9
to
9fb8f1e
Compare
e9f5905
to
351e42d
Compare
Test PASSed. |
Test PASSed. |
351e42d
to
bea11c4
Compare
src/ray/raylet/node_manager.cc
Outdated
const Task task = lineage_cache_.GetTask(task_id); | ||
ResubmitTask(task); | ||
// lineage cache. | ||
if (!lineage_cache_.ContainsTask(task_id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not the right way to solve it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang That's a nice solution also.
btw, does it still hang after ur this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope! Although there are many error messages (and will be for any job with tasks that take some time to complete) logged.
Test PASSed. |
Test FAILed. |
What do these changes do?
This implements basic task reconstruction in raylet. There are two parts to this PR:
TaskReconstructionLog
. This prevents two raylets from reconstructing the same task if they decide simultaneously (via the logic in [xray] Implement task lease table, logic for deciding when to reconstruct a task #2497) that reconstruction is necessary.Reconstruction is quite slow in this PR, especially for long chains of dependent tasks. This is mainly due to the lease table mechanism, where nodes may wait too long before trying to reconstruct a task. There are two ways to improve this:
PEXPIRE
. This is a WIP and I may include it in this PR.Since handling failures of other raylets is probably not yet complete in master, this only turns back on Python tests for reconstructing evicted objects.