Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,18 +1183,14 @@ def ping(self):
a = RegressionAsync.remote()
a.f.remote()
refs = [a.ping.remote() for _ in range(10000)]
with pytest.raises(
(ray.exceptions.RayActorError, ray.exceptions.TaskCancelledError)
) as exc_info:
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(refs)
assert " Worker unexpectedly exits" not in str(exc_info.value)

# Test a sync case.
a = RegressionSync.remote()
a.f.remote()
with pytest.raises(
(ray.exceptions.RayActorError, ray.exceptions.TaskCancelledError)
) as exc_info:
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get([a.ping.remote() for _ in range(10000)])
assert " Worker unexpectedly exits" not in str(exc_info.value)

Expand Down
30 changes: 22 additions & 8 deletions src/ray/core_worker/task_execution/task_receiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,27 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,
};
};

auto cancel_callback = [reply](
auto cancel_callback = [this, reply](
const TaskSpecification &canceled_task_spec,
const Status &status,
const rpc::SendReplyCallback &canceled_send_reply_callback) {
if (canceled_task_spec.IsActorTask()) {
canceled_send_reply_callback(status, nullptr, nullptr);
// If task cancelation is due to worker shutdown, propagate that information
// to the submitter.
bool is_worker_exiting = false;
{
absl::MutexLock lock(&stop_mu_);
is_worker_exiting = stopping_;
if (stopping_) {
reply->set_worker_exiting(true);
reply->set_was_cancelled_before_running(true);
}
}
if (is_worker_exiting) {
canceled_send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
canceled_send_reply_callback(status, nullptr, nullptr);
}
} else {
reply->set_was_cancelled_before_running(true);
canceled_send_reply_callback(status, nullptr, nullptr);
Expand All @@ -174,17 +189,16 @@ void TaskReceiver::HandleTask(rpc::PushTaskRequest request,

{
absl::MutexLock lock(&stop_mu_);
task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));
if (stopping_) {
RAY_LOG(INFO)
<< "Rejecting PushTask due to worker shutdown: task will be cancelled";
reply->set_was_cancelled_before_running(true);
send_reply_callback(
Status::SchedulingCancelled("Worker is shutting down"), nullptr, nullptr);
if (task_spec.IsActorTask()) {
reply->set_worker_exiting(true);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}

task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));

if (task_spec.IsActorCreationTask()) {
SetupActor(task_spec.IsAsyncioActor(),
task_spec.MaxActorConcurrency(),
Expand Down
100 changes: 86 additions & 14 deletions src/ray/core_worker/task_submission/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -662,23 +662,13 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
/// Whether or not we will retry this actor task.
auto will_retry = false;

if (status.ok() && !is_retryable_exception) {
if ((status.ok() && reply.was_cancelled_before_running()) ||
status.IsSchedulingCancelled()) {
HandleTaskCancelledBeforeExecution(status, reply, task_spec);
} else if (status.ok() && !is_retryable_exception) {
// status.ok() means the worker completed the reply, either succeeded or with a
// retryable failure (e.g. user exceptions). We complete only on non-retryable case.
task_manager_.CompletePendingTask(task_id, reply, addr, reply.is_application_error());
} else if (status.IsSchedulingCancelled()) {
std::ostringstream stream;
stream << "The task " << task_id << " is canceled from an actor " << actor_id
<< " before it executes.";
const auto &msg = stream.str();
RAY_LOG(DEBUG) << msg;
rpc::RayErrorInfo error_info;
error_info.set_error_message(msg);
error_info.set_error_type(rpc::ErrorType::TASK_CANCELLED);
task_manager_.FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED,
/*status*/ nullptr,
&error_info);
} else {
bool is_actor_dead = false;
bool fail_immediately = false;
Expand Down Expand Up @@ -780,6 +770,88 @@ void ActorTaskSubmitter::HandlePushTaskReply(const Status &status,
}
}

void ActorTaskSubmitter::HandleTaskCancelledBeforeExecution(
const Status &status,
const rpc::PushTaskReply &reply,
const TaskSpecification &task_spec) {
const auto task_id = task_spec.TaskId();
const auto actor_id = task_spec.ActorId();

if (reply.worker_exiting()) {
// Task cancelled due to actor shutdown - use ACTOR_DIED error.
// If we have the death cause, use it immediately. Otherwise,
// wait for it from GCS to provide an accurate error message.
bool is_actor_dead = false;
rpc::RayErrorInfo error_info;
{
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
if (queue_pair != client_queues_.end()) {
Comment on lines +788 to +789
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a RAY_CHECK failure in the case where the actor is not dead. I don't like adding more RAY_CHECKs, but I don't see how we can recover if we're handling a PushTaskReply and the actor doesn't have a client_queue_.

is_actor_dead = queue_pair->second.state_ == rpc::ActorTableData::DEAD;
if (is_actor_dead) {
const auto &death_cause = queue_pair->second.death_cause_;
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
}
}
}

if (is_actor_dead) {
CancelDependencyResolution(task_id);
RAY_LOG(DEBUG) << "Task " << task_id << " cancelled due to actor " << actor_id
<< " death";
task_manager_.FailPendingTask(task_spec.TaskId(),
error_info.error_type(),
/*status*/ nullptr,
&error_info);
} else if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) {
CancelDependencyResolution(task_id);

int64_t death_info_grace_period_ms =
current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info();

error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
error_info.set_error_message(
"The actor is dead because its worker process has died.");

{
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
queue.wait_for_death_info_tasks_.push_back(
std::make_shared<PendingTaskWaitingForDeathInfo>(
death_info_grace_period_ms, task_spec, status, error_info));
RAY_LOG(INFO).WithField(task_spec.TaskId())
<< "Task cancelled during actor shutdown, waiting for death info from GCS"
<< ", wait_queue_size=" << queue.wait_for_death_info_tasks_.size();
}
} else {
CancelDependencyResolution(task_id);
error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
error_info.set_error_message(
"The actor is dead because its worker process has died.");
task_manager_.FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED,
/*status*/ nullptr,
&error_info);
}
} else {
// Explicit user cancellation - use TASK_CANCELLED error.
std::ostringstream stream;
stream << "The task " << task_id << " is canceled from an actor " << actor_id
<< " before it executes.";
const auto &msg = stream.str();
RAY_LOG(DEBUG) << msg;
rpc::RayErrorInfo error_info;
error_info.set_error_message(msg);
error_info.set_error_type(rpc::ErrorType::TASK_CANCELLED);
task_manager_.FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED,
/*status*/ nullptr,
&error_info);
}
}

std::optional<rpc::ActorTableData::ActorState> ActorTaskSubmitter::GetLocalActorState(
const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
Expand Down
12 changes: 12 additions & 0 deletions src/ray/core_worker/task_submission/actor_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,18 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
timeout_error_info_(std::move(timeout_error_info)) {}
};

/// Handle a task that was cancelled before it could execute.
/// This method determines whether the cancellation was due to:
/// 1. Actor shutdown (worker exiting): If so, raise RayActorError.
/// 2. Explicit user cancellation: If so, raise TaskCancelledError.
///
/// \param status The RPC status from PushTask.
/// \param reply The PushTaskReply message containing cancellation details.
/// \param task_spec The specification of the task that was cancelled.
void HandleTaskCancelledBeforeExecution(const Status &status,
const rpc::PushTaskReply &reply,
const TaskSpecification &task_spec);

struct ClientQueue {
ClientQueue(bool allow_out_of_order_execution,
int32_t max_pending_calls,
Expand Down