Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue tasks in the raylet in between async callbacks #4766

Merged
merged 5 commits into from
May 15, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
updates
  • Loading branch information
stephanie-wang committed May 14, 2019
commit 8691a583a9280621061dd729081ab7e7995c34a0
19 changes: 10 additions & 9 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ void NodeManager::DispatchTasks(
}
}
}
// Move the assigned task to the SWAP queue so that we remember that we have
// Move the ASSIGNED task to the SWAP queue so that we remember that we have
// it queued locally. Once the GetTaskReply has been sent, the task will get
// re-queued, depending on whether the message succeeded or not.
local_queues_.MoveTasks(removed_task_ids, TaskState::READY, TaskState::SWAP);
Expand Down Expand Up @@ -1672,7 +1672,7 @@ bool NodeManager::AssignTask(const Task &task) {
worker->Connection()->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
fbb.GetBufferPointer(), [this, worker, task_id](ray::Status status) {
// Remove the task from the SWAP queue.
// Remove the ASSIGNED task from the SWAP queue.
TaskState state;
auto assigned_task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
Expand Down Expand Up @@ -2057,7 +2057,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
/// TODO(rkn): Should we check that the node manager is remote and not local?
/// TODO(rkn): Should we check if the remote node manager is known to be dead?
// Attempt to forward the task.
const auto &task_id = task.GetTaskSpecification().TaskId();
ForwardTask(task, node_manager_id, [this, node_manager_id](ray::Status error,
const Task &task) {
const TaskID task_id = task.GetTaskSpecification().TaskId();
Expand Down Expand Up @@ -2088,14 +2087,15 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RAY_CHECK(!error);
RAY_LOG(INFO) << "Resubmitting task " << task_id
<< " because ForwardTask failed.";
// Remove the task from the SWAP queue.
// Remove the RESUBMITTED task from the SWAP queue.
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
// Submit the task again.
SubmitTask(task, Lineage());
});
// Temporarily move the task to the SWAP queue while the timer is active.
// Temporarily move the RESUBMITTED task to the SWAP queue while the
// timer is active.
local_queues_.QueueTasks({task}, TaskState::SWAP);
// Remove the task from the lineage cache. The task will get added back
// once it is resubmitted.
Expand Down Expand Up @@ -2149,14 +2149,15 @@ void NodeManager::ForwardTask(
return;
}
auto &server_conn = it->second;
// Move the forwarded task to the SWAP queue so that we remember that we have
// it queued locally. Once the ForwardTaskRequest has been sent, the task
// will get re-queued, depending on whether the message succeeded or not.
// Move the FORWARDING task to the SWAP queue so that we remember that we
// have it queued locally. Once the ForwardTaskRequest has been sent, the
// task will get re-queued, depending on whether the message succeeded or
// not.
local_queues_.QueueTasks({task}, TaskState::SWAP);
server_conn->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ForwardTaskRequest), fbb.GetSize(),
fbb.GetBufferPointer(), [this, on_error, task_id, node_id](ray::Status status) {
// Remove the task from the SWAP queue.
// Remove the FORWARDING task from the SWAP queue.
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
Expand Down