Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stability] Make task result for RemoveTask optional #5146

Merged
merged 7 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,11 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
std::unordered_set<TaskID> local_task_ids;
for (const auto &task_id : decision) {
// (See design_docs/task_states.rst for the state transition diagram.)
Task task;
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
if (!local_queues_.RemoveTask(task_id, &task, &state)) {
return;
}
// Since we are spilling back from the ready and waiting queues, we need
// to unsubscribe the dependencies.
if (state != TaskState::INFEASIBLE) {
Expand Down Expand Up @@ -980,8 +983,10 @@ void NodeManager::ProcessDisconnectClientMessage(
// If the worker was an actor, the task was already cleaned up in
// `HandleDisconnectedActor`.
if (actor_id.IsNil()) {
const Task &task = local_queues_.RemoveTask(task_id);
TreatTaskAsFailed(task, ErrorType::WORKER_DIED);
Task task;
if (local_queues_.RemoveTask(task_id, &task)) {
TreatTaskAsFailed(task, ErrorType::WORKER_DIED);
}
}

if (!intentional_disconnect) {
Expand Down Expand Up @@ -1035,6 +1040,8 @@ void NodeManager::ProcessDisconnectClientMessage(
<< "job_id: " << worker->GetAssignedJobId();
}

client->Close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this part of the bug?

Copy link
Contributor Author

@pcmoritz pcmoritz Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was another bug where we were running out of file descriptors.


// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down Expand Up @@ -1315,10 +1322,12 @@ void NodeManager::ScheduleTasks(
} else {
// TODO(atumanov): need a better interface for task exit on forward.
// (See design_docs/task_states.rst for the state transition diagram.)
const auto task = local_queues_.RemoveTask(task_id);
// Attempt to forward the task. If this fails to forward the task,
// the task will be resubmit locally.
ForwardTaskOrResubmit(task, client_id);
Task task;
if (local_queues_.RemoveTask(task_id, &task)) {
// Attempt to forward the task. If this fails to forward the task,
// the task will be resubmit locally.
ForwardTaskOrResubmit(task, client_id);
}
}
}

Expand Down Expand Up @@ -1604,7 +1613,8 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
// worker as blocked. This temporarily releases any resources that the
// worker holds while it is blocked.
if (!worker->IsBlocked() && current_task_id == worker->GetAssignedTaskId()) {
const auto task = local_queues_.RemoveTask(current_task_id);
Task task;
RAY_CHECK(local_queues_.RemoveTask(current_task_id, &task));
local_queues_.QueueTasks({task}, TaskState::RUNNING);
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
Expand Down Expand Up @@ -1653,7 +1663,8 @@ void NodeManager::HandleTaskUnblocked(
// the worker.
if (worker->IsBlocked() && current_task_id == worker->GetAssignedTaskId()) {
// (See design_docs/task_states.rst for the state transition diagram.)
const auto task = local_queues_.RemoveTask(current_task_id);
Task task;
RAY_CHECK(local_queues_.RemoveTask(current_task_id, &task));
local_queues_.QueueTasks({task}, TaskState::RUNNING);
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
Expand Down Expand Up @@ -1771,8 +1782,12 @@ bool NodeManager::AssignTask(const Task &task) {
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
fbb.GetBufferPointer(), [this, worker, task_id](ray::Status status) {
// Remove the ASSIGNED task from the SWAP queue.
Task assigned_task;
TaskState state;
auto assigned_task = local_queues_.RemoveTask(task_id, &state);
if (!local_queues_.RemoveTask(task_id, &assigned_task, &state)) {
return;
}

RAY_CHECK(state == TaskState::SWAP);

if (status.ok()) {
Expand Down Expand Up @@ -1844,7 +1859,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
RAY_LOG(DEBUG) << "Finished task " << task_id;

// (See design_docs/task_states.rst for the state transition diagram.)
const auto task = local_queues_.RemoveTask(task_id);
Task task;
RAY_CHECK(local_queues_.RemoveTask(task_id, &task));

// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker.GetTaskResourceIds();
Expand Down Expand Up @@ -2171,11 +2187,13 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RAY_LOG(INFO) << "Resubmitting task " << task_id
<< " because ForwardTask failed.";
// Remove the RESUBMITTED task from the SWAP queue.
Task task;
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
// Submit the task again.
SubmitTask(task, Lineage());
if (local_queues_.RemoveTask(task_id, &task, &state)) {
RAY_CHECK(state == TaskState::SWAP);
// Submit the task again.
SubmitTask(task, Lineage());
}
});
// Temporarily move the RESUBMITTED task to the SWAP queue while the
// timer is active.
Expand Down Expand Up @@ -2246,8 +2264,11 @@ void NodeManager::ForwardTask(
client->ForwardTask(request, [this, on_error, task_id, node_id](
Status status, const rpc::ForwardTaskReply &reply) {
// Remove the FORWARDING task from the SWAP queue.
Task task;
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
if (!local_queues_.RemoveTask(task_id, &task, &state)) {
return;
}
RAY_CHECK(state == TaskState::SWAP);

if (status.ok()) {
Expand Down
16 changes: 11 additions & 5 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ std::vector<Task> SchedulingQueue::RemoveTasks(std::unordered_set<TaskID> &task_
return removed_tasks;
}

Task SchedulingQueue::RemoveTask(const TaskID &task_id, TaskState *removed_task_state) {
bool SchedulingQueue::RemoveTask(const TaskID &task_id, Task *removed_task,
TaskState *removed_task_state) {
std::vector<Task> removed_tasks;
std::unordered_set<TaskID> task_id_set = {task_id};
// Try to find the task to remove in the queues.
Expand All @@ -273,10 +274,15 @@ Task SchedulingQueue::RemoveTask(const TaskID &task_id, TaskState *removed_task_
}

// Make sure we got the removed task.
RAY_CHECK(removed_tasks.size() == 1) << task_id;
const auto &task = removed_tasks.front();
RAY_CHECK(task.GetTaskSpecification().TaskId() == task_id);
return task;
if (removed_tasks.size() == 1) {
*removed_task = removed_tasks.front();
RAY_CHECK(removed_task->GetTaskSpecification().TaskId() == task_id);
return true;
}
RAY_LOG(DEBUG) << "Task " << task_id
<< " that is to be removed could not be found any more."
<< " Probably its driver was removed.";
return false;
}

void SchedulingQueue::MoveTasks(std::unordered_set<TaskID> &task_ids, TaskState src_state,
Expand Down
6 changes: 4 additions & 2 deletions src/ray/raylet/scheduling_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,12 @@ class SchedulingQueue {
///
/// \param task_id The task ID to remove from the queue. The corresponding
/// task must be contained in the queue.
/// \param task The removed task will be written here, if any.
/// \param task_state If this is not nullptr, then the state of the removed
/// task will be written here.
/// \return The task that was removed.
Task RemoveTask(const TaskID &task_id, TaskState *task_state = nullptr);
/// \return true if the task was removed, false if it is not in the queue.
bool RemoveTask(const TaskID &task_id, Task *removed_task,
TaskState *removed_task_state = nullptr);

/// Remove a driver task ID. This is an empty task used to represent a driver.
///
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ namespace raylet {
/// time.
class Task {
public:
/// Construct an empty task. This should only be used to pass a task
/// as an out parameter to a function or method.
Task() {}

/// Construct a `Task` object from a protobuf message.
///
/// \param message The protobuf message.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/task_execution_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ using rpc::MessageWrapper;
/// Wrapper class of protobuf `TaskExecutionSpec`, see `common.proto` for details.
class TaskExecutionSpecification : public MessageWrapper<rpc::TaskExecutionSpec> {
public:
/// Construct an emtpy task execution specification. This should not be used
/// directly.
TaskExecutionSpecification() {}

/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ using rpc::TaskType;
/// Wrapper class of protobuf `TaskSpec`, see `common.proto` for details.
class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
public:
/// Construct an empty task specification. This should not be used directly.
TaskSpecification() {}

/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
Expand Down
3 changes: 3 additions & 0 deletions src/ray/rpc/message_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ namespace rpc {
template <class Message>
class MessageWrapper {
public:
/// Construct an empty message wrapper. This should not be used directly.
MessageWrapper() {}

/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
Expand Down