Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue tasks in the raylet in between async callbacks #4766

Merged
merged 5 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,10 @@ void NodeManager::DispatchTasks(
}
}
}
local_queues_.RemoveTasks(removed_task_ids);
// Move the ASSIGNED task to the SWAP queue so that we remember that we have
// it queued locally. Once the GetTaskReply has been sent, the task will get
// re-queued, depending on whether the message succeeded or not.
local_queues_.MoveTasks(removed_task_ids, TaskState::READY, TaskState::SWAP);
}

void NodeManager::ProcessClientMessage(
Expand Down Expand Up @@ -1665,11 +1668,15 @@ bool NodeManager::AssignTask(const Task &task) {
auto message = protocol::CreateGetTaskReply(fbb, spec.ToFlatbuffer(fbb),
fbb.CreateVector(resource_id_set_flatbuf));
fbb.Finish(message);
// Give the callback a copy of the task so it can modify it.
Task assigned_task(task);
const auto &task_id = spec.TaskId();
worker->Connection()->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
fbb.GetBufferPointer(), [this, worker, assigned_task](ray::Status status) mutable {
fbb.GetBufferPointer(), [this, worker, task_id](ray::Status status) {
// Remove the ASSIGNED task from the SWAP queue.
TaskState state;
auto assigned_task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);

if (status.ok()) {
auto spec = assigned_task.GetTaskSpecification();
// We successfully assigned the task to the worker.
Expand Down Expand Up @@ -2050,9 +2057,9 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
/// TODO(rkn): Should we check that the node manager is remote and not local?
/// TODO(rkn): Should we check if the remote node manager is known to be dead?
// Attempt to forward the task.
ForwardTask(task, node_manager_id, [this, task, node_manager_id](ray::Status error) {
ForwardTask(task, node_manager_id, [this, node_manager_id](ray::Status error,
const Task &task) {
const TaskID task_id = task.GetTaskSpecification().TaskId();

RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;

Expand All @@ -2074,14 +2081,22 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
[this, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(INFO) << "Resubmitting task " << task_id
<< " because ForwardTask failed.";
// Remove the RESUBMITTED task from the SWAP queue.
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
// Submit the task again.
SubmitTask(task, Lineage());
});
// Temporarily move the RESUBMITTED task to the SWAP queue while the
// timer is active.
local_queues_.QueueTasks({task}, TaskState::SWAP);
// Remove the task from the lineage cache. The task will get added back
// once it is resubmitted.
lineage_cache_.RemoveWaitingTask(task_id);
Expand All @@ -2094,8 +2109,9 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
});
}

void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &)> &on_error) {
void NodeManager::ForwardTask(
const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &, const Task &)> &on_error) {
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();

Expand Down Expand Up @@ -2129,16 +2145,25 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
if (it == remote_server_connections_.end()) {
// TODO(atumanov): caller must handle failure to ensure tasks are not lost.
RAY_LOG(INFO) << "No NodeManager connection found for GCS client id " << node_id;
on_error(ray::Status::IOError("NodeManager connection not found"));
on_error(ray::Status::IOError("NodeManager connection not found"), task);
return;
}

auto &server_conn = it->second;
// Move the FORWARDING task to the SWAP queue so that we remember that we
// have it queued locally. Once the ForwardTaskRequest has been sent, the
// task will get re-queued, depending on whether the message succeeded or
// not.
local_queues_.QueueTasks({task}, TaskState::SWAP);
server_conn->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ForwardTaskRequest), fbb.GetSize(),
fbb.GetBufferPointer(),
[this, on_error, task_id, node_id, spec](ray::Status status) {
fbb.GetBufferPointer(), [this, on_error, task_id, node_id](ray::Status status) {
// Remove the FORWARDING task from the SWAP queue.
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);

if (status.ok()) {
const auto &spec = task.GetTaskSpecification();
// If we were able to forward the task, remove the forwarded task from the
// lineage cache since the receiving node is now responsible for writing
// the task to the GCS.
Expand Down Expand Up @@ -2173,7 +2198,7 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
}
}
} else {
on_error(status);
on_error(status, task);
}
});
}
Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ class NodeManager {
/// \param task The task to forward.
/// \param node_id The ID of the node to forward the task to.
/// \param on_error Callback on run on non-ok status.
void ForwardTask(const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &)> &on_error);
void ForwardTask(
const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &, const Task &)> &on_error);

/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state.
Expand Down
40 changes: 27 additions & 13 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace {

static constexpr const char *task_state_strings[] = {
"placeable", "waiting", "ready",
"running", "infeasible", "waiting_for_actor_creation"};
"running", "infeasible", "waiting for actor creation",
"swap"};
static_assert(sizeof(task_state_strings) / sizeof(const char *) ==
static_cast<int>(ray::raylet::TaskState::kNumTaskQueues),
"Must specify a TaskState name for every task queue");
Expand Down Expand Up @@ -172,6 +173,9 @@ void SchedulingQueue::FilterState(std::unordered_set<TaskID> &task_ids,
case TaskState::INFEASIBLE:
FilterStateFromQueue(task_ids, TaskState::INFEASIBLE);
break;
case TaskState::SWAP:
FilterStateFromQueue(task_ids, TaskState::SWAP);
break;
case TaskState::BLOCKED: {
const auto blocked_ids = GetBlockedTaskIds();
for (auto it = task_ids.begin(); it != task_ids.end();) {
Expand Down Expand Up @@ -230,7 +234,7 @@ std::vector<Task> SchedulingQueue::RemoveTasks(std::unordered_set<TaskID> &task_
// Try to find the tasks to remove from the queues.
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP,
}) {
RemoveTasksFromQueue(task_state, task_ids, &removed_tasks);
}
Expand All @@ -245,7 +249,7 @@ Task SchedulingQueue::RemoveTask(const TaskID &task_id, TaskState *removed_task_
// Try to find the task to remove in the queues.
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP,
}) {
RemoveTasksFromQueue(task_state, task_id_set, &removed_tasks);
if (task_id_set.empty()) {
Expand All @@ -260,7 +264,7 @@ Task SchedulingQueue::RemoveTask(const TaskID &task_id, TaskState *removed_task_
}

// Make sure we got the removed task.
RAY_CHECK(removed_tasks.size() == 1);
RAY_CHECK(removed_tasks.size() == 1) << task_id;
const auto &task = removed_tasks.front();
RAY_CHECK(task.GetTaskSpecification().TaskId() == task_id);
return task;
Expand All @@ -287,6 +291,9 @@ void SchedulingQueue::MoveTasks(std::unordered_set<TaskID> &task_ids, TaskState
case TaskState::INFEASIBLE:
RemoveTasksFromQueue(TaskState::INFEASIBLE, task_ids, &removed_tasks);
break;
case TaskState::SWAP:
RemoveTasksFromQueue(TaskState::SWAP, task_ids, &removed_tasks);
break;
default:
RAY_LOG(FATAL) << "Attempting to move tasks from unrecognized state "
<< static_cast<std::underlying_type<TaskState>::type>(src_state);
Expand All @@ -312,6 +319,9 @@ void SchedulingQueue::MoveTasks(std::unordered_set<TaskID> &task_ids, TaskState
case TaskState::INFEASIBLE:
QueueTasks(removed_tasks, TaskState::INFEASIBLE);
break;
case TaskState::SWAP:
QueueTasks(removed_tasks, TaskState::SWAP);
break;
default:
RAY_LOG(FATAL) << "Attempting to move tasks to unrecognized state "
<< static_cast<std::underlying_type<TaskState>::type>(dst_state);
Expand Down Expand Up @@ -348,8 +358,16 @@ std::unordered_set<TaskID> SchedulingQueue::GetTaskIdsForDriver(
std::unordered_set<TaskID> SchedulingQueue::GetTaskIdsForActor(
const ActorID &actor_id) const {
std::unordered_set<TaskID> task_ids;
int swap = static_cast<int>(TaskState::SWAP);
int i = 0;
for (const auto &task_queue : task_queues_) {
GetActorTasksFromQueue(*task_queue, actor_id, task_ids);
// This is a hack to make sure that we don't remove tasks from the SWAP
// queue, since these are always guaranteed to be removed and eventually
// resubmitted if necessary by the node manager.
if (i != swap) {
GetActorTasksFromQueue(*task_queue, actor_id, task_ids);
}
i++;
}
return task_ids;
}
Expand Down Expand Up @@ -385,10 +403,8 @@ const std::unordered_set<TaskID> &SchedulingQueue::GetDriverTaskIds() const {
std::string SchedulingQueue::DebugString() const {
std::stringstream result;
result << "SchedulingQueue:";
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
}) {
for (size_t i = 0; i < static_cast<int>(ray::raylet::TaskState::kNumTaskQueues); i++) {
TaskState task_state = static_cast<TaskState>(i);
result << "\n- num " << GetTaskStateString(task_state)
<< " tasks: " << GetTaskQueue(task_state)->GetTasks().size();
}
Expand All @@ -397,10 +413,8 @@ std::string SchedulingQueue::DebugString() const {
}

void SchedulingQueue::RecordMetrics() const {
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
}) {
for (size_t i = 0; i < static_cast<int>(ray::raylet::TaskState::kNumTaskQueues); i++) {
TaskState task_state = static_cast<TaskState>(i);
stats::SchedulingQueueStats().Record(
static_cast<double>(GetTaskQueue(task_state)->GetTasks().size()),
{{stats::ValueTypeKey,
Expand Down
9 changes: 8 additions & 1 deletion src/ray/raylet/scheduling_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ enum class TaskState {
// The task is an actor method and is waiting to learn where the actor was
// created.
WAITING_FOR_ACTOR_CREATION,
// Swap queue for tasks that are in between states. This can happen when a
// task is removed from one queue, and an async callback is responsible for
// re-queuing the task. For example, a READY task that has just been assigned
// to a worker will get moved to the SWAP queue while waiting for a response
// from the worker. If the worker accepts the task, the task will be added to
// the RUNNING queue, else it will be returned to READY.
SWAP,
// The number of task queues. All states that precede this enum must have an
// associated TaskQueue in SchedulingQueue. All states that succeed
// this enum do not have an associated TaskQueue, since the tasks
Expand Down Expand Up @@ -144,7 +151,7 @@ class SchedulingQueue {
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY,
TaskState::RUNNING, TaskState::INFEASIBLE,
TaskState::WAITING_FOR_ACTOR_CREATION,
TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP,
}) {
if (task_state == TaskState::READY) {
task_queues_[static_cast<int>(task_state)] = ready_queue_;
Expand Down