Skip to content

Conversation

@aslonnie
Copy link
Collaborator

@aslonnie aslonnie commented Oct 8, 2025

cherrypick #57095

Check if task cancellation is due to actor shutdown or explicit user cancellation. Actor shutdown should raise RayActorError, not TaskCancelledError.

Closes #57092

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
@aslonnie aslonnie requested a review from a team as a code owner October 8, 2025 04:52
@aslonnie aslonnie changed the title cherrypick #57095 [core] Preserve err type in case of task cancellation due to actor death Oct 8, 2025
@aslonnie aslonnie merged commit 720d9c1 into releases/2.50.0 Oct 8, 2025
3 of 5 checks passed
@aslonnie aslonnie deleted the lonnie-2500-cp1 branch October 8, 2025 04:53
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to differentiate between task cancellations due to actor shutdown versus explicit user cancellation, ensuring RayActorError is raised for shutdowns. The changes span both the C++ core and Python tests and seem to correctly implement the intended logic. I've found a critical issue and a couple of opportunities for code improvement.

Comment on lines +790 to +796
if (queue_pair != client_queues_.end()) {
is_actor_dead = queue_pair->second.state_ == rpc::ActorTableData::DEAD;
if (is_actor_dead) {
const auto &death_cause = queue_pair->second.death_cause_;
error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a potential issue here. If client_queues_.find(actor_id) returns end(), is_actor_dead will remain false. The code then proceeds to the else if block at line 807, which contains a RAY_CHECK(queue_pair != client_queues_.end()) at line 820. This RAY_CHECK will fail, causing a crash.\n\nGiven that SubmitTask already performs a RAY_CHECK to ensure the queue exists, it seems the queue is expected to be present here as well. To maintain consistency and prevent a deferred crash, it would be better to use RAY_CHECK here instead of an if statement. This ensures that if the queue is unexpectedly missing, it fails fast.

      RAY_CHECK(queue_pair != client_queues_.end());\n      is_actor_dead = queue_pair->second.state_ == rpc::ActorTableData::DEAD;\n      if (is_actor_dead) {\n        const auto &death_cause = queue_pair->second.death_cause_;\n        error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);\n      }

Comment on lines +174 to +182
bool is_worker_exiting = false;
{
absl::MutexLock lock(&stop_mu_);
is_worker_exiting = stopping_;
if (stopping_) {
reply->set_worker_exiting(true);
reply->set_was_cancelled_before_running(true);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block can be simplified. The if (stopping_) check is redundant since is_worker_exiting is assigned the value of stopping_ just before. Also, the initialization of is_worker_exiting to false is unnecessary as it's always reassigned within the lock.

      bool is_worker_exiting;\n      {\n        absl::MutexLock lock(&stop_mu_);\n        is_worker_exiting = stopping_;\n        if (is_worker_exiting) {\n          reply->set_worker_exiting(true);\n          reply->set_was_cancelled_before_running(true);\n        }\n      }

Comment on lines +799 to +838
if (is_actor_dead) {
CancelDependencyResolution(task_id);
RAY_LOG(DEBUG) << "Task " << task_id << " cancelled due to actor " << actor_id
<< " death";
task_manager_.FailPendingTask(task_spec.TaskId(),
error_info.error_type(),
/*status*/ nullptr,
&error_info);
} else if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) {
CancelDependencyResolution(task_id);

int64_t death_info_grace_period_ms =
current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info();

error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
error_info.set_error_message(
"The actor is dead because its worker process has died.");

{
absl::MutexLock lock(&mu_);
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
queue.wait_for_death_info_tasks_.push_back(
std::make_shared<PendingTaskWaitingForDeathInfo>(
death_info_grace_period_ms, task_spec, status, error_info));
RAY_LOG(INFO).WithField(task_spec.TaskId())
<< "Task cancelled during actor shutdown, waiting for death info from GCS"
<< ", wait_queue_size=" << queue.wait_for_death_info_tasks_.size();
}
} else {
CancelDependencyResolution(task_id);
error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
error_info.set_error_message(
"The actor is dead because its worker process has died.");
task_manager_.FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED,
/*status*/ nullptr,
&error_info);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's some code duplication in this if-else if-else block that can be refactored for better readability and maintainability.\n\nSpecifically:\n- CancelDependencyResolution(task_id) is called in all three branches.\n- The logic to set error_info is duplicated in the else if and else branches.\n\nMoving the common code out of the branches would make the logic clearer.

    CancelDependencyResolution(task_id);\n    if (is_actor_dead) {\n      RAY_LOG(DEBUG) << \"Task \" << task_id << \" cancelled due to actor \" << actor_id\n                     << \" death\";\n      task_manager_.FailPendingTask(task_spec.TaskId(),\n                                    error_info.error_type(),\n                                    /*status*/ nullptr,\n                                    &error_info);\n    } else {\n      // The actor is not yet marked as DEAD, but we know the worker is exiting.\n      // We will either wait for the death info from GCS, or fail the task immediately.\n      error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);\n      error_info.set_error_message(\n          \"The actor is dead because its worker process has died.\");\n\n      if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) {\n        int64_t death_info_grace_period_ms =\n            current_time_ms() +\n            RayConfig::instance().timeout_ms_task_wait_for_death_info();\n\n        {\n          absl::MutexLock lock(&mu_);\n          auto queue_pair = client_queues_.find(actor_id);\n          RAY_CHECK(queue_pair != client_queues_.end());\n          auto &queue = queue_pair->second;\n          queue.wait_for_death_info_tasks_.push_back(\n              std::make_shared<PendingTaskWaitingForDeathInfo>(\n                  death_info_grace_period_ms, task_spec, status, error_info));\n          RAY_LOG(INFO).WithField(task_spec.TaskId())\n              << \"Task cancelled during actor shutdown, waiting for death info from GCS\"\n              << \", wait_queue_size=\" << queue.wait_for_death_info_tasks_.size();\n        }\n      } else {\n        task_manager_.FailPendingTask(task_spec.TaskId(),\n                                      rpc::ErrorType::ACTOR_DIED,\n                                      /*status*/ nullptr,\n                                      &error_info);\n      }\n    }

snorkelopstesting2-coder pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_57538_a01a48d5-53f1-4999-93fa-f8883c18e41a that referenced this pull request Oct 22, 2025
snorkelopstesting3-bot added a commit to snorkel-marlin-repos/ray-project_ray_pr_57538_a01a48d5-53f1-4999-93fa-f8883c18e41a that referenced this pull request Oct 22, 2025
…llation due to actor death

Merged from original PR #57538
Original: ray-project/ray#57538
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants