Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskCancellation #7669

Merged
merged 50 commits into from
Apr 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
169c540
Smol comment
ijrsvt Mar 19, 2020
682c5b5
Merge branch 'master' into TaskCancellation
ijrsvt Mar 25, 2020
2d020ba
WIP, not passing ray.init
ijrsvt Mar 25, 2020
1958e05
Fixed small problem
ijrsvt Mar 25, 2020
4fdeb5a
wip
ijrsvt Mar 31, 2020
40b2bb5
Pseudo interrupt things
ijrsvt Mar 31, 2020
d1295c3
Basic prototype operational
ijrsvt Mar 31, 2020
269a3b1
Merge branch 'master' of github.com:ijrsvt/ray into TaskCancellation
ijrsvt Apr 1, 2020
028d9f7
correct proc title
ijrsvt Apr 2, 2020
a4b58e5
Mostly done
ijrsvt Apr 7, 2020
33ad6a1
Cleanup
ijrsvt Apr 7, 2020
4f7eec7
cleaner raylet error
ijrsvt Apr 7, 2020
cc3ca28
Cleaning up a few loose ends
ijrsvt Apr 7, 2020
bd47066
Fixing Race Conds
ijrsvt Apr 7, 2020
c0b5ab4
Prelim testing
ijrsvt Apr 7, 2020
58c8bed
Fixing comments and adding second_check for kill
ijrsvt Apr 8, 2020
bae435f
Working_new_impl
ijrsvt Apr 9, 2020
9ab039d
demo_ready
ijrsvt Apr 9, 2020
d85496d
Fixing my english
ijrsvt Apr 10, 2020
d0ba816
Merge branch 'master' into TaskCancellation
ijrsvt Apr 10, 2020
652a0fe
Fixing a few problems
ijrsvt Apr 10, 2020
daac610
Small problems
ijrsvt Apr 10, 2020
b050b28
Cleaning up
ijrsvt Apr 10, 2020
b0457a3
Response to changes
ijrsvt Apr 15, 2020
18b3dbc
Fixing error passing
ijrsvt Apr 15, 2020
b813faf
Merge branch 'master' into TaskCancellation
ijrsvt Apr 15, 2020
112d7d8
Merged to master
ijrsvt Apr 15, 2020
ff8bbd3
fixing lock
ijrsvt Apr 15, 2020
af35898
Cleaning up print statements
ijrsvt Apr 15, 2020
b015c51
Format
ijrsvt Apr 15, 2020
616f487
Fixing Unit test build failure
ijrsvt Apr 16, 2020
2361273
mock_worker fix
ijrsvt Apr 16, 2020
9dba915
java_fix
ijrsvt Apr 16, 2020
9a43056
Canel
ijrsvt Apr 16, 2020
68a6458
Switching to Cancel
ijrsvt Apr 17, 2020
46545e1
Responding to Review
ijrsvt Apr 21, 2020
7308225
FixFormatting
ijrsvt Apr 21, 2020
1f95492
Merge branch 'master' into TaskCancellation
ijrsvt Apr 21, 2020
9a0d120
Lease cancellation
ijrsvt Apr 22, 2020
82a6248
FInal comments?
ijrsvt Apr 22, 2020
3270f92
Moving exist check to CoreWorker
ijrsvt Apr 23, 2020
794f146
Fix Actor Transport Test
ijrsvt Apr 23, 2020
e43ea33
Fixing task manager test
ijrsvt Apr 23, 2020
9beea80
chaning clock repr
ijrsvt Apr 23, 2020
2a789f5
Fix build
ijrsvt Apr 24, 2020
8c75a83
fix white space
ijrsvt Apr 24, 2020
8f7bdfe
lint fix
ijrsvt Apr 24, 2020
6f1ef56
Updating to medium size
ijrsvt Apr 24, 2020
f7fb69f
Fixing Java test compilation issue
ijrsvt Apr 25, 2020
e8cd360
lengthen bad timeouts
ijrsvt Apr 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working_new_impl
  • Loading branch information
ijrsvt committed Apr 9, 2020
commit bae435f804a0dc34c0c4a6b297e8476b47eecc3e
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ cdef void async_plasma_callback(CObjectID object_id,

cdef c_bool kill_main_task() nogil:
with gil:
x = asyncio.get_event_loop()
print(x)
if setproctitle.getproctitle() == "ray::IDLE":
return False
_thread.interrupt_main()
Expand Down
45 changes: 29 additions & 16 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
client_call_manager_(new rpc::ClientCallManager(io_service_)),
death_check_timer_(io_service_),
internal_timer_(io_service_),
kill_retry_timer_(io_service_),
core_worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */),
task_queue_length_(0),
num_executed_tasks_(0),
Expand Down Expand Up @@ -903,7 +902,8 @@ Status CoreWorker::KillTask(const ObjectID &object_id) {
if (task_manager_->IsTaskPending(task_id)) {
auto task_spec = task_manager_->GetTaskSpec(object_id.TaskId());
if (!task_spec.IsActorCreationTask())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do a RAY_CHECK instead of if statement here because this is definitely checked by python code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it actually is.

return direct_task_submitter_->KillTask(task_spec);
RAY_RETURN_NOT_OK(direct_task_submitter_->KillTask(task_spec));
RAY_LOG(ERROR) << "YEET";
}
return Status::OK();
}
Expand Down Expand Up @@ -1096,6 +1096,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
{
absl::MutexLock lock(&mutex_);
current_task_ = task_spec;
TryKillTask();
}

RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};
Expand Down Expand Up @@ -1402,23 +1403,18 @@ void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &re
owner_address, ref_removed_callback);
}

void CoreWorker::TryKillTask(const TaskID &task_id, int num_tries) {
if (num_tries == 0) {
RAY_LOG(WARNING) << "Failed to Kill Task: " << task_id;
void CoreWorker::TryKillTask() {
if (task_to_kill_.IsNil()) {
return;
}
{
absl::MutexLock lock(&mutex_);
if (main_thread_task_id_ == task_id) {
RAY_LOG(INFO) << "Interrupting main: " << main_thread_task_id_;
if (kill_main_thread_()) {
return;
}
if (main_thread_task_id_ == task_to_kill_) {
RAY_LOG(INFO) << "Killing worker: " << main_thread_task_id_;
RAY_IGNORE_EXPR(local_raylet_client_->Disconnect());
if (log_dir_ != "") {
RayLog::ShutDownRayLog();
}
exit(1);
}
kill_retry_timer_.expires_from_now(boost::asio::chrono::milliseconds(500));
kill_retry_timer_.async_wait(
boost::bind(&CoreWorker::TryKillTask, this, task_id, num_tries - 1));
}

void CoreWorker::HandleKillTask(const rpc::KillTaskRequest &request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happen if the task is inside worker's scheduling queue and timeout expired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For direct tasks, the task is never entered into the scheduling queue, it is immediately executed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about for actor tasks? Is that not supported at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

Expand All @@ -1427,7 +1423,24 @@ void CoreWorker::HandleKillTask(const rpc::KillTaskRequest &request,
TaskID task_id = TaskID::FromBinary(request.intended_task_id());
// Try to kill the task immediately, if it fails, it will try again twice. This it to
// avoid the situation where a cancellation RPC is processed before the execute RPC.
TryKillTask(task_id, 3);
absl::MutexLock lock(&mutex_);
task_to_kill_ = task_id;
TryKillTask();
// Exit(true);
// RAY_LOG(ERROR) << "Trying to kill";
// task_manager_->CancelTask(task_id);
// RAY_LOG(ERROR) << "Trying to disc";
// Disconnect();
// RAY_LOG(ERROR) << "Trying to shutdown";
// Shutdown();
// return;
// absl::MutexLock lock(&mutex_);
// if (main_thread_task_id_ == task_id) {
// RAY_LOG(INFO) << "Interrupting main: " << main_thread_task_id_;
// if (kill_main_thread_()) {
// return;
// }
// }
}

void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}

/// Try to kill a task multiple times.
void TryKillTask(const TaskID &task_id, int num_tries);
void TryKillTask() EXCLUSIVE_LOCKS_REQUIRED(mutex_);

/// Type of this worker (i.e., DRIVER or WORKER).
const WorkerType worker_type_;
Expand Down Expand Up @@ -734,6 +734,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// worker context.
TaskID main_thread_task_id_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the GUARDED_BY(mutex_) annotation here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to add this! Thanks!


/// Task to kill (this handles the situation where the cancellation rpc
// reaches the remort worker before the task begins executing)
TaskID task_to_kill_ GUARDED_BY(mutex_) = TaskID::Nil();

// Flag indicating whether this worker has been shut down.
bool shutdown_ = false;

Expand All @@ -752,8 +756,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Timer for internal book-keeping.
boost::asio::steady_timer internal_timer_;

boost::asio::steady_timer kill_retry_timer_;

/// RPC server used to receive tasks to execute.
rpc::GrpcServer core_worker_server_;

Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ void ReferenceCounter::RemoveSubmittedTaskReferences(
<< argument_id;
return;
}
RAY_CHECK(it->second.submitted_task_ref_count > 0);
RAY_CHECK(it->second.submitted_task_ref_count > 0) << argument_id;
it->second.submitted_task_ref_count--;
if (release_lineage) {
if (it->second.lineage_ref_count > 0) {
Expand Down
15 changes: 10 additions & 5 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,19 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
ShutdownIfNeeded();
}

void TaskManager::CancelTask(const TaskID &task_id) {
void TaskManager::CancelTask(const TaskID &task_id, bool pending) {
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
if (it != submissible_tasks_.end()) {
it->second.num_retries_left = 0;
if (it == submissible_tasks_.end()) {
return;
}
it->second.num_retries_left = 0;
it->second.canceled = true;
}
if (pending) {
MarkPendingTaskFailed(task_id, GetTaskSpec(task_id), rpc::ErrorType::TASK_CANCELLED);
}
PendingTaskFailed(task_id, rpc::ErrorType::TASK_CANCELLED);
}

void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Expand All @@ -234,6 +238,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
if (num_retries_left == 0) {
submissible_tasks_.erase(it);
num_pending_tasks_--;
error_type = it->second.canceled ? rpc::ErrorType::TASK_CANCELLED : error_type;
} else {
RAY_CHECK(it->second.num_retries_left > 0);
it->second.num_retries_left--;
Expand Down Expand Up @@ -374,7 +379,7 @@ void TaskManager::RemoveLineageReference(const ObjectID &object_id,
void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
const TaskSpecification &spec,
rpc::ErrorType error_type) {
RAY_LOG(DEBUG) << "Treat task as failed. task_id: " << task_id
RAY_LOG(ERROR) << "Treat task as failed. task_id: " << task_id
<< ", error_type: " << ErrorType_Name(error_type);
int64_t num_returns = spec.NumReturns();
for (int i = 0; i < num_returns; i++) {
Expand Down
6 changes: 4 additions & 2 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TaskFinisherInterface {
virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status = nullptr) = 0;

virtual void CancelTask(const TaskID &task_id) = 0;
virtual void CancelTask(const TaskID &task_id, bool pending) = 0;

virtual void OnTaskDependenciesInlined(
const std::vector<ObjectID> &inlined_dependency_ids,
Expand Down Expand Up @@ -113,7 +113,7 @@ class TaskManager : public TaskFinisherInterface {
/// Cancels a task by treating it as failed and removes any remaining retries
///
/// \param[in] task_id The TaskId that is being cancelled
void CancelTask(const TaskID &task_id);
void CancelTask(const TaskID &task_id, bool pending);

/// Return the spec for a pending task.
TaskSpecification GetTaskSpec(const TaskID &task_id) const;
Expand Down Expand Up @@ -178,6 +178,8 @@ class TaskManager : public TaskFinisherInterface {
// pending tasks and tasks that finished execution but that may be
// retried in the future.
absl::flat_hash_set<ObjectID> reconstructable_return_ids;

bool canceled = false;
};

/// Remove a lineage reference to this object ID. This should be called
Expand Down
69 changes: 47 additions & 22 deletions src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
resolver_.ResolveDependencies(task_spec, [this, task_spec]() {
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();
absl::MutexLock lock(&mu_);
if (canceled_resolving_tasks_.find(task_spec.TaskId()) !=
canceled_resolving_tasks_.end()) {
canceled_resolving_tasks_.erase(task_spec.TaskId());

return;
}
// Note that the dependencies in the task spec are mutated to only contain
// plasma dependencies after ResolveDependencies finishes.
const SchedulingKey scheduling_key(
Expand Down Expand Up @@ -234,37 +240,56 @@ Status CoreWorkerDirectTaskSubmitter::KillTask(TaskSpecification task_spec) {
const SchedulingKey scheduling_key(
task_spec.GetSchedulingClass(), task_spec.GetDependencies(),
task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil());
absl::MutexLock lock(&mu_);
auto scheduled_tasks = task_queues_.find(scheduling_key);

// See if task has not been shipped yet
if (scheduled_tasks != task_queues_.end()) {
for (auto spec = scheduled_tasks->second.begin();
spec != scheduled_tasks->second.end(); spec++) {
if (spec->TaskId() == task_spec.TaskId()) {
scheduled_tasks->second.erase(spec);
task_finisher_->CancelTask(task_spec.TaskId());

// Erase an empty queue
if (scheduled_tasks->second.size() == 0) {
task_queues_.erase(scheduling_key);
std::shared_ptr<rpc::CoreWorkerClientInterface> client = nullptr;
bool full_kill = false;
{
absl::MutexLock lock(&mu_);
auto scheduled_tasks = task_queues_.find(scheduling_key);
RAY_LOG(ERROR) << "First place";
// See if task has not been shipped yet
if (scheduled_tasks != task_queues_.end()) {
for (auto spec = scheduled_tasks->second.begin();
spec != scheduled_tasks->second.end(); spec++) {
if (spec->TaskId() == task_spec.TaskId()) {
scheduled_tasks->second.erase(spec);
RAY_LOG(ERROR) << "Canceling task here";
full_kill = true;
// Erase an empty queue
if (scheduled_tasks->second.empty()) {
task_queues_.erase(scheduling_key);
}
break;
}
return Status::OK();
}
} else {
auto rpc_client = sent_tasks_.find(task_spec.TaskId());
// No RPC handle for worker
if (rpc_client == sent_tasks_.end() ||
client_cache_.find(rpc_client->second) == client_cache_.end()) {
// Task has dependencies being resolved
canceled_resolving_tasks_.emplace(task_spec.TaskId());
} else {
client = client_cache_.find(rpc_client->second)->second;
}
}
}

auto rpc_client = sent_tasks_.find(task_spec.TaskId());
// No RPC handle for worker
if (rpc_client == sent_tasks_.end() ||
client_cache_.find(rpc_client->second) == client_cache_.end()) {
if (client == nullptr) {
if (full_kill) {
task_finisher_->CancelTask(task_spec.TaskId(), false);
task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED);
} else {
task_finisher_->CancelTask(task_spec.TaskId(), true);
}
return Status::OK();
}

auto client = client_cache_.find(rpc_client->second);
RAY_LOG(ERROR) << "Second place";
task_finisher_->CancelTask(task_spec.TaskId(), false);
RAY_LOG(ERROR) << "third place";
auto request = rpc::KillTaskRequest();
request.set_intended_task_id(task_spec.TaskId().Binary());
return client->second->KillTask(request, nullptr);
return client->KillTask(request, nullptr);
}

}; // namespace ray
3 changes: 3 additions & 0 deletions src/ray/core_worker/transport/direct_task_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ class CoreWorkerDirectTaskSubmitter {
absl::flat_hash_map<SchedulingKey, std::deque<TaskSpecification>> task_queues_
GUARDED_BY(mu_);

// Tasks that were canceled while being resolved.
absl::flat_hash_set<TaskID> canceled_resolving_tasks_ GUARDED_BY(mu_);

// Keeps track of where currently executing tasks are being run.
absl::flat_hash_map<TaskID, rpc::WorkerAddress> sent_tasks_ GUARDED_BY(mu_);
};
Expand Down