-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TaskCancellation #7669
TaskCancellation #7669
Changes from 12 commits
169c540
682c5b5
2d020ba
1958e05
4fdeb5a
40b2bb5
d1295c3
269a3b1
028d9f7
a4b58e5
33ad6a1
4f7eec7
cc3ca28
bd47066
c0b5ab4
58c8bed
bae435f
9ab039d
d85496d
d0ba816
652a0fe
daac610
b050b28
b0457a3
18b3dbc
b813faf
112d7d8
ff8bbd3
af35898
b015c51
616f487
2361273
9dba915
9a43056
68a6458
46545e1
7308225
1f95492
9a0d120
82a6248
3270f92
794f146
e43ea33
9beea80
2a789f5
8c75a83
8f7bdfe
6f1ef56
f7fb69f
e8cd360
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1371,8 +1371,10 @@ def disconnect(exiting_interpreter=False): | |
@contextmanager | ||
def _changeproctitle(title, next_title): | ||
setproctitle.setproctitle(title) | ||
yield | ||
setproctitle.setproctitle(next_title) | ||
try: | ||
yield | ||
finally: | ||
setproctitle.setproctitle(next_title) | ||
|
||
|
||
def register_custom_serializer(cls, | ||
|
@@ -1655,7 +1657,7 @@ def wait(object_ids, num_returns=1, timeout=None): | |
return ready_ids, remaining_ids | ||
|
||
|
||
def kill(actor): | ||
def kill(id): | ||
"""Kill an actor forcefully. | ||
|
||
This will interrupt any running tasks on the actor, causing them to fail | ||
|
@@ -1668,15 +1670,22 @@ def kill(actor): | |
If this actor is reconstructable, it will be attempted to be reconstructed. | ||
|
||
Args: | ||
actor (ActorHandle): Handle to the actor to kill. | ||
id (ActorHandle or ObjectID): Handle for the actor to kill or ObjectID | ||
of the task to kill. | ||
ijrsvt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
if not isinstance(actor, ray.actor.ActorHandle): | ||
raise ValueError("ray.kill() only supported for actors. " | ||
"Got: {}.".format(type(actor))) | ||
|
||
worker = ray.worker.global_worker | ||
worker.check_connected() | ||
worker.core_worker.kill_actor(actor._ray_actor_id, False) | ||
|
||
if isinstance(id, ray.actor.ActorHandle): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll move this to a new PR later |
||
worker.core_worker.kill_actor(id._ray_actor_id, False) | ||
elif isinstance(id, ray.ObjectID): | ||
if id.task_id().actor_id().hex() in ray.actors().keys(): | ||
raise ValueError( | ||
"Please use ray.kill(ActorHandle) to kill an actor task") | ||
worker.core_worker.kill_task(id) | ||
else: | ||
raise ValueError("ray.kill() only supported for actors and objects. " | ||
"Got: {}.".format(type(id))) | ||
|
||
|
||
def _mode(worker=global_worker): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,12 +84,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, | |
std::function<Status()> check_signals, | ||
std::function<void()> gc_collect, | ||
std::function<void(std::string *)> get_lang_stack, | ||
bool ref_counting_enabled) | ||
bool ref_counting_enabled, std::function<void()> kill_main) | ||
: worker_type_(worker_type), | ||
language_(language), | ||
log_dir_(log_dir), | ||
ref_counting_enabled_(ref_counting_enabled), | ||
check_signals_(check_signals), | ||
kill_main_thread_(kill_main), | ||
gc_collect_(gc_collect), | ||
get_call_site_(RayConfig::instance().record_ref_creation_sites() ? get_lang_stack | ||
: nullptr), | ||
|
@@ -356,10 +357,10 @@ void CoreWorker::RunIOService() { | |
|
||
void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { | ||
worker_context_.SetCurrentTaskId(task_id); | ||
main_thread_task_id_ = task_id; | ||
bool not_actor_task = false; | ||
{ | ||
absl::MutexLock lock(&mutex_); | ||
main_thread_task_id_ = task_id; | ||
not_actor_task = actor_id_.IsNil(); | ||
} | ||
if (not_actor_task && task_id.IsNil()) { | ||
|
@@ -374,7 +375,7 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { | |
} | ||
|
||
void CoreWorker::CheckForRayletFailure() { | ||
// If the raylet fails, we will be reassigned to init (PID=1). | ||
// If the raylet fails, we will be reassigned to init (PID=1). | ||
if (getppid() == 1) { | ||
RAY_LOG(ERROR) << "Raylet failed. Shutting down."; | ||
Shutdown(); | ||
|
@@ -896,6 +897,16 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f | |
return status; | ||
} | ||
|
||
Status CoreWorker::KillTask(const ObjectID &object_id) { | ||
auto task_id = object_id.TaskId(); | ||
if (task_manager_->IsTaskPending(task_id)) { | ||
auto task_spec = task_manager_->GetTaskSpec(object_id.TaskId()); | ||
ijrsvt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!task_spec.IsActorCreationTask()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's do a RAY_CHECK instead of if statement here because this is definitely checked by python code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it actually is. |
||
return direct_task_submitter_->KillTask(task_spec); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, | ||
bool no_reconstruction) { | ||
ActorHandle *actor_handle = nullptr; | ||
|
@@ -1078,6 +1089,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, | |
if (resource_ids != nullptr) { | ||
resource_ids_ = resource_ids; | ||
} | ||
RAY_LOG(ERROR) << "Setting current task id: " << task_spec.TaskId(); | ||
worker_context_.SetCurrentTask(task_spec); | ||
SetCurrentTaskId(task_spec.TaskId()); | ||
|
||
|
@@ -1389,6 +1401,23 @@ void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &re | |
reference_counter_->SetRefRemovedCallback(object_id, contained_in_id, owner_id, | ||
owner_address, ref_removed_callback); | ||
} | ||
void CoreWorker::HandleKillTask(const rpc::KillTaskRequest &request, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happen if the task is inside worker's scheduling queue and timeout expired? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For direct tasks, the task is never entered into the scheduling queue, it is immediately executed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about for actor tasks? Is that not supported at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. |
||
rpc::KillTaskReply *reply, | ||
rpc::SendReplyCallback send_reply_callback) { | ||
ijrsvt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TaskID intended_task_id = TaskID::FromBinary(request.intended_task_id()); | ||
for (int i = 0; i < 3; i++) { | ||
{ | ||
absl::MutexLock lock(&mutex_); | ||
if (main_thread_task_id_ == intended_task_id) { | ||
RAY_LOG(ERROR) << "Calling Kill Main"; | ||
kill_main_thread_(); | ||
return; | ||
} | ||
} | ||
RAY_LOG(ERROR) << "Failed: " << i; | ||
usleep(5 * 1000); | ||
} | ||
} | ||
|
||
void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request, | ||
rpc::KillActorReply *reply, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this get raised outside of the try block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe this can be raised on any line of python code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can only be raised in regular python code. When in compiled Cython or C++ code, the interrupts can be observed with
PyErr_CheckSignals
(whichcheck_signals
does). As long as I callcheck_signals
before python code (the next place it is called is instore_task_output
, any lingering interrupts will be cleared.