Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Basic task reconstruction mechanism #2526

Merged

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Jul 31, 2018

What do these changes do?

This implements basic task reconstruction in raylet. There are two parts to this PR:

  1. Reconstruction suppression through the TaskReconstructionLog. This prevents two raylets from reconstructing the same task if they decide simultaneously (via the logic in [xray] Implement task lease table, logic for deciding when to reconstruct a task #2497) that reconstruction is necessary.
  2. Task resubmission once a raylet becomes responsible for reconstructing a task.

Reconstruction is quite slow in this PR, especially for long chains of dependent tasks. This is mainly due to the lease table mechanism, where nodes may wait too long before trying to reconstruct a task. There are two ways to improve this:

  1. Expire entries in the lease table using Redis PEXPIRE. This is a WIP and I may include it in this PR.
  2. Introduce a "fast path" for reconstructing dependencies of a re-executed task. Normally, we wait for an initial timeout before checking whether a task requires reconstruction. However, if a task requires reconstruction, then it's likely that its dependencies also require reconstruction. In this case, we could skip the initial timeout before checking the GCS to see whether reconstruction is necessary (e.g., if the object has been evicted).

Since handling failures of other raylets is probably not yet complete in master, this only turns back on Python tests for reconstructing evicted objects.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7063/
Test PASSed.

// since the lease entry itself contains the expiration period. In the
// worst case, if the command fails, then a client that looks up the lease
// entry will overestimate the expiration time.
std::vector<std::string> args = {"PEXPIRE", "TASK_LEASE" + id.binary(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hardcoding TASK_LEASE, we can do something like EnumNameTablePrefix(TablePrefix::TASK_LEASE).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks :)

const WriteCallback &done) override {
RAY_RETURN_NOT_OK((Table<TaskID, TaskLeaseData>::Add(job_id, id, data, done)));
// Mark the entry for expiration in Redis. It's okay if this command fails
// since the lease entry itself contains the expiration period. In the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is it actually ok if this fails? E.g., if it fails, then it will fail to extend the previous expiration, and so the key could actually expire too early, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Redis documentation:

The timeout will only be cleared by commands that delete or overwrite the contents of the key, including DEL, SET, GETSET and all the *STORE commands.

I believe that the Add command in the previous line will overwrite the contents.

@@ -82,6 +82,8 @@ class TaskSpecification {
/// \param string A serialized task specification flatbuffer.
TaskSpecification(const flatbuffers::String &string);

TaskSpecification(const std::string &string);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document, in particular it's worth saying that this requires the string to be in scope for at least as long as the TaskSpecification, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll document. I don't think that is true though, since the constructor performs a copy of the string data.

// reconstruction first. This should suppress this node's first attempt at
// reconstruction.
auto task_reconstruction_data = std::make_shared<TaskReconstructionDataT>();
task_reconstruction_data->node_manager_id = ClientID::from_random().hex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All UniqueIDs stored in flatbuffer objects should be binary not hex.

@@ -70,6 +71,8 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
/// this timeout, then objects that the task creates may be reconstructed.
void HandleTaskLeaseNotification(const TaskID &task_id, int64_t lease_timeout_ms);

void HandleReconstructionLogAppend(const TaskID &task_id, bool success);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document

});
}

void NodeManager::ResubmitTask(const Task &task) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method is called, then we are guaranteed that task is currently not in local_queues_, right?

We are also guaranteed that this node manager has the task lease, right?

Might be helpful to document some of these constraints/assumptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, task should not be in local_queues_. If it is, then there is a spurious reconstruction.

The node manager will acquire the task lease once it calls TaskPending, which should happen once the task is accepted by the node (e.g., in EnqueuePlaceableTask).

Thanks, I'll document that.

[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) {
const Task &task = lineage_cache_.GetTask(task_id);
ResubmitTask(task);
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two callbacks here. Can you document what circumstances lead to each of the two callbacks firing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks!

@stephanie-wang stephanie-wang changed the title [WIP][xray] Basic task reconstruction mechanism [xray] Basic task reconstruction mechanism Jul 31, 2018
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7067/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7066/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7068/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7070/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7075/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7096/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7105/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7114/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7123/
Test FAILed.

@robertnishihara
Copy link
Collaborator

jenkins, retest this please

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7126/
Test FAILed.

@robertnishihara
Copy link
Collaborator

jenkins, retest this please

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7130/
Test PASSed.


Status Add(const JobID &job_id, const TaskID &id, std::shared_ptr<TaskLeaseDataT> &data,
const WriteCallback &done) override {
RAY_RETURN_NOT_OK((Table<TaskID, TaskLeaseData>::Add(job_id, id, data, done)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's a pair of unnecessary parentheses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I need the parentheses for it to build for some reason...I don't know exactly why, but the compiler seems to think that there are 2 arguments to the macro, maybe because of the templated argument?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason that you have to add Table<TaskID, TaskLeaseData>:: because the two methods have the same name? Would that go away if we just change this method name from Add to something else like UpdateTaskLease?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just googled this, the reason is because the macro processor doesn't know about template, thus recognizes Table<TaskID as a parameter.
Someone suggests this way:

#define COMMA ,
RAY_RETURN_NOT_OK(Table<TaskID COMMA TaskLeaseData>::(...))

It works. But I think adding an extra pair of parentheses is easier. Or as @robertnishihara suggested, use another function name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided against changing the function name since I believe it's nicer to have the TaskLeaseTable still implement the Table<TaskID, TaskLeaseData> interface.

@@ -436,6 +436,13 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
}
}

const Task LineageCache::GetTask(const TaskID &task_id) const {
auto entries = lineage_.GetEntries();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto -> auto& ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@@ -436,6 +436,13 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
}
}

const Task LineageCache::GetTask(const TaskID &task_id) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const Task & ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}

void NodeManager::ResubmitTask(const Task &task) {
// Actor reconstruction is turned off by default right now. If this is an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: is this a stage approach - aka. actor reconstruction will be enabled later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we're still thinking about how exactly actor reconstruction should be exposed to the user. I believe the current plan is to have actor reconstruction enabled by a flag from the frontend.

// The task was not in the GCS task table. It must therefore be in the
// lineage cache. Use a copy of the cached task spec to re-execute the
// task.
const Task task = lineage_cache_.GetTask(task_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto & ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually mean to copy the task here in case the lineage cache gets modified while resubmitting the task for execution.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7254/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7352/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7353/
Test PASSed.

const Task task = lineage_cache_.GetTask(task_id);
ResubmitTask(task);
// lineage cache.
if (!lineage_cache_.ContainsTask(task_id)) {
Copy link
Contributor

@jovany-wang jovany-wang Aug 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not the right way to solve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a TODO explaining that we should revert this change once #2608 is fixed. I don't think we should block this PR on #2608 though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang That's a nice solution also.
btw, does it still hang after ur this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! Although there are many error messages (and will be for any job with tasks that take some time to complete) logged.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7367/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/7375/
Test FAILed.

@robertnishihara robertnishihara merged commit d49b4be into ray-project:master Aug 9, 2018
@robertnishihara robertnishihara deleted the xray-reconstruction branch August 9, 2018 14:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants