Skip to content

Commit 720d9c1

Browse files
authored
[core] Preserve err type in case of task cancellation due to actor death (#57538)
cherrypick #57095 Check if task cancellation is due to actor shutdown or explicit user cancellation. Actor shutdown should raise RayActorError, not TaskCancelledError. Closes #57092 Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
1 parent 93e5a96 commit 720d9c1

File tree

4 files changed

+122
-28
lines changed

4 files changed

+122
-28
lines changed

python/ray/tests/test_actor_failures.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,18 +1183,14 @@ def ping(self):
11831183
a = RegressionAsync.remote()
11841184
a.f.remote()
11851185
refs = [a.ping.remote() for _ in range(10000)]
1186-
with pytest.raises(
1187-
(ray.exceptions.RayActorError, ray.exceptions.TaskCancelledError)
1188-
) as exc_info:
1186+
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
11891187
ray.get(refs)
11901188
assert " Worker unexpectedly exits" not in str(exc_info.value)
11911189

11921190
# Test a sync case.
11931191
a = RegressionSync.remote()
11941192
a.f.remote()
1195-
with pytest.raises(
1196-
(ray.exceptions.RayActorError, ray.exceptions.TaskCancelledError)
1197-
) as exc_info:
1193+
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
11981194
ray.get([a.ping.remote() for _ in range(10000)])
11991195
assert " Worker unexpectedly exits" not in str(exc_info.value)
12001196

src/ray/core_worker/task_execution/task_receiver.cc

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,27 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,
164164
};
165165
};
166166

167-
auto cancel_callback = [reply](
167+
auto cancel_callback = [this, reply](
168168
const TaskSpecification &canceled_task_spec,
169169
const Status &status,
170170
const rpc::SendReplyCallback &canceled_send_reply_callback) {
171171
if (canceled_task_spec.IsActorTask()) {
172-
canceled_send_reply_callback(status, nullptr, nullptr);
172+
// If task cancelation is due to worker shutdown, propagate that information
173+
// to the submitter.
174+
bool is_worker_exiting = false;
175+
{
176+
absl::MutexLock lock(&stop_mu_);
177+
is_worker_exiting = stopping_;
178+
if (stopping_) {
179+
reply->set_worker_exiting(true);
180+
reply->set_was_cancelled_before_running(true);
181+
}
182+
}
183+
if (is_worker_exiting) {
184+
canceled_send_reply_callback(Status::OK(), nullptr, nullptr);
185+
} else {
186+
canceled_send_reply_callback(status, nullptr, nullptr);
187+
}
173188
} else {
174189
reply->set_was_cancelled_before_running(true);
175190
canceled_send_reply_callback(status, nullptr, nullptr);
@@ -178,17 +193,16 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,
178193

179194
{
180195
absl::MutexLock lock(&stop_mu_);
196+
task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));
181197
if (stopping_) {
182-
RAY_LOG(INFO)
183-
<< "Rejecting PushTask due to worker shutdown: task will be cancelled";
184198
reply->set_was_cancelled_before_running(true);
185-
send_reply_callback(
186-
Status::SchedulingCancelled("Worker is shutting down"), nullptr, nullptr);
199+
if (task_spec.IsActorTask()) {
200+
reply->set_worker_exiting(true);
201+
}
202+
send_reply_callback(Status::OK(), nullptr, nullptr);
187203
return;
188204
}
189205

190-
task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));
191-
192206
if (task_spec.IsActorCreationTask()) {
193207
SetupActor(task_spec.IsAsyncioActor(),
194208
task_spec.MaxActorConcurrency(),

src/ray/core_worker/task_submission/actor_task_submitter.cc

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -663,23 +663,13 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
663663
/// Whether or not we will retry this actor task.
664664
auto will_retry = false;
665665

666-
if (status.ok() && !is_retryable_exception) {
666+
if ((status.ok() && reply.was_cancelled_before_running()) ||
667+
status.IsSchedulingCancelled()) {
668+
HandleTaskCancelledBeforeExecution(status, reply, task_spec);
669+
} else if (status.ok() && !is_retryable_exception) {
667670
// status.ok() means the worker completed the reply, either succeeded or with a
668671
// retryable failure (e.g. user exceptions). We complete only on non-retryable case.
669672
task_manager_.CompletePendingTask(task_id, reply, addr, reply.is_application_error());
670-
} else if (status.IsSchedulingCancelled()) {
671-
std::ostringstream stream;
672-
stream << "The task " << task_id << " is canceled from an actor " << actor_id
673-
<< " before it executes.";
674-
const auto &msg = stream.str();
675-
RAY_LOG(DEBUG) << msg;
676-
rpc::RayErrorInfo error_info;
677-
error_info.set_error_message(msg);
678-
error_info.set_error_type(rpc::ErrorType::TASK_CANCELLED);
679-
task_manager_.FailPendingTask(task_spec.TaskId(),
680-
rpc::ErrorType::TASK_CANCELLED,
681-
/*status*/ nullptr,
682-
&error_info);
683673
} else {
684674
bool is_actor_dead = false;
685675
bool fail_immediately = false;
@@ -781,6 +771,88 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
781771
}
782772
}
783773

774+
void ActorTaskSubmitter::HandleTaskCancelledBeforeExecution(
775+
const Status &status,
776+
const rpc::PushTaskReply &reply,
777+
const TaskSpecification &task_spec) {
778+
const auto task_id = task_spec.TaskId();
779+
const auto actor_id = task_spec.ActorId();
780+
781+
if (reply.worker_exiting()) {
782+
// Task cancelled due to actor shutdown - use ACTOR_DIED error.
783+
// If we have the death cause, use it immediately. Otherwise,
784+
// wait for it from GCS to provide an accurate error message.
785+
bool is_actor_dead = false;
786+
rpc::RayErrorInfo error_info;
787+
{
788+
absl::MutexLock lock(&mu_);
789+
auto queue_pair = client_queues_.find(actor_id);
790+
if (queue_pair != client_queues_.end()) {
791+
is_actor_dead = queue_pair->second.state_ == rpc::ActorTableData::DEAD;
792+
if (is_actor_dead) {
793+
const auto &death_cause = queue_pair->second.death_cause_;
794+
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
795+
}
796+
}
797+
}
798+
799+
if (is_actor_dead) {
800+
CancelDependencyResolution(task_id);
801+
RAY_LOG(DEBUG) << "Task " << task_id << " cancelled due to actor " << actor_id
802+
<< " death";
803+
task_manager_.FailPendingTask(task_spec.TaskId(),
804+
error_info.error_type(),
805+
/*status*/ nullptr,
806+
&error_info);
807+
} else if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) {
808+
CancelDependencyResolution(task_id);
809+
810+
int64_t death_info_grace_period_ms =
811+
current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info();
812+
813+
error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
814+
error_info.set_error_message(
815+
"The actor is dead because its worker process has died.");
816+
817+
{
818+
absl::MutexLock lock(&mu_);
819+
auto queue_pair = client_queues_.find(actor_id);
820+
RAY_CHECK(queue_pair != client_queues_.end());
821+
auto &queue = queue_pair->second;
822+
queue.wait_for_death_info_tasks_.push_back(
823+
std::make_shared<PendingTaskWaitingForDeathInfo>(
824+
death_info_grace_period_ms, task_spec, status, error_info));
825+
RAY_LOG(INFO).WithField(task_spec.TaskId())
826+
<< "Task cancelled during actor shutdown, waiting for death info from GCS"
827+
<< ", wait_queue_size=" << queue.wait_for_death_info_tasks_.size();
828+
}
829+
} else {
830+
CancelDependencyResolution(task_id);
831+
error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
832+
error_info.set_error_message(
833+
"The actor is dead because its worker process has died.");
834+
task_manager_.FailPendingTask(task_spec.TaskId(),
835+
rpc::ErrorType::ACTOR_DIED,
836+
/*status*/ nullptr,
837+
&error_info);
838+
}
839+
} else {
840+
// Explicit user cancellation - use TASK_CANCELLED error.
841+
std::ostringstream stream;
842+
stream << "The task " << task_id << " is canceled from an actor " << actor_id
843+
<< " before it executes.";
844+
const auto &msg = stream.str();
845+
RAY_LOG(DEBUG) << msg;
846+
rpc::RayErrorInfo error_info;
847+
error_info.set_error_message(msg);
848+
error_info.set_error_type(rpc::ErrorType::TASK_CANCELLED);
849+
task_manager_.FailPendingTask(task_spec.TaskId(),
850+
rpc::ErrorType::TASK_CANCELLED,
851+
/*status*/ nullptr,
852+
&error_info);
853+
}
854+
}
855+
784856
std::optional<rpc::ActorTableData::ActorState> ActorTaskSubmitter::GetLocalActorState(
785857
const ActorID &actor_id) const {
786858
absl::MutexLock lock(&mu_);

src/ray/core_worker/task_submission/actor_task_submitter.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,18 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
257257
timeout_error_info_(std::move(timeout_error_info)) {}
258258
};
259259

260+
/// Handle a task that was cancelled before it could execute.
261+
/// This method determines whether the cancellation was due to:
262+
/// 1. Actor shutdown (worker exiting): If so, raise RayActorError.
263+
/// 2. Explicit user cancellation: If so, raise TaskCancelledError.
264+
///
265+
/// \param status The RPC status from PushTask.
266+
/// \param reply The PushTaskReply message containing cancellation details.
267+
/// \param task_spec The specification of the task that was cancelled.
268+
void HandleTaskCancelledBeforeExecution(const Status &status,
269+
const rpc::PushTaskReply &reply,
270+
const TaskSpecification &task_spec);
271+
260272
struct ClientQueue {
261273
ClientQueue(bool allow_out_of_order_execution,
262274
int32_t max_pending_calls,

0 commit comments

Comments
 (0)