-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[core] Preserve err type in case of task cancellation due to actor death #57538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to differentiate between task cancellations due to actor shutdown versus explicit user cancellation, ensuring RayActorError is raised for shutdowns. The changes span both the C++ core and Python tests and seem to correctly implement the intended logic. I've found a critical issue and a couple of opportunities for code improvement.
| if (queue_pair != client_queues_.end()) { | ||
| is_actor_dead = queue_pair->second.state_ == rpc::ActorTableData::DEAD; | ||
| if (is_actor_dead) { | ||
| const auto &death_cause = queue_pair->second.death_cause_; | ||
| error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential issue here. If client_queues_.find(actor_id) returns end(), is_actor_dead will remain false. The code then proceeds to the else if block at line 807, which contains a RAY_CHECK(queue_pair != client_queues_.end()) at line 820. This RAY_CHECK will fail, causing a crash.\n\nGiven that SubmitTask already performs a RAY_CHECK to ensure the queue exists, it seems the queue is expected to be present here as well. To maintain consistency and prevent a deferred crash, it would be better to use RAY_CHECK here instead of an if statement. This ensures that if the queue is unexpectedly missing, it fails fast.
RAY_CHECK(queue_pair != client_queues_.end());\n is_actor_dead = queue_pair->second.state_ == rpc::ActorTableData::DEAD;\n if (is_actor_dead) {\n const auto &death_cause = queue_pair->second.death_cause_;\n error_info = gcs::GetErrorInfoFromActorDeathCause(death_cause);\n }| bool is_worker_exiting = false; | ||
| { | ||
| absl::MutexLock lock(&stop_mu_); | ||
| is_worker_exiting = stopping_; | ||
| if (stopping_) { | ||
| reply->set_worker_exiting(true); | ||
| reply->set_was_cancelled_before_running(true); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block can be simplified. The if (stopping_) check is redundant since is_worker_exiting is assigned the value of stopping_ just before. Also, the initialization of is_worker_exiting to false is unnecessary as it's always reassigned within the lock.
bool is_worker_exiting;\n {\n absl::MutexLock lock(&stop_mu_);\n is_worker_exiting = stopping_;\n if (is_worker_exiting) {\n reply->set_worker_exiting(true);\n reply->set_was_cancelled_before_running(true);\n }\n }| if (is_actor_dead) { | ||
| CancelDependencyResolution(task_id); | ||
| RAY_LOG(DEBUG) << "Task " << task_id << " cancelled due to actor " << actor_id | ||
| << " death"; | ||
| task_manager_.FailPendingTask(task_spec.TaskId(), | ||
| error_info.error_type(), | ||
| /*status*/ nullptr, | ||
| &error_info); | ||
| } else if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) { | ||
| CancelDependencyResolution(task_id); | ||
|
|
||
| int64_t death_info_grace_period_ms = | ||
| current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info(); | ||
|
|
||
| error_info.set_error_type(rpc::ErrorType::ACTOR_DIED); | ||
| error_info.set_error_message( | ||
| "The actor is dead because its worker process has died."); | ||
|
|
||
| { | ||
| absl::MutexLock lock(&mu_); | ||
| auto queue_pair = client_queues_.find(actor_id); | ||
| RAY_CHECK(queue_pair != client_queues_.end()); | ||
| auto &queue = queue_pair->second; | ||
| queue.wait_for_death_info_tasks_.push_back( | ||
| std::make_shared<PendingTaskWaitingForDeathInfo>( | ||
| death_info_grace_period_ms, task_spec, status, error_info)); | ||
| RAY_LOG(INFO).WithField(task_spec.TaskId()) | ||
| << "Task cancelled during actor shutdown, waiting for death info from GCS" | ||
| << ", wait_queue_size=" << queue.wait_for_death_info_tasks_.size(); | ||
| } | ||
| } else { | ||
| CancelDependencyResolution(task_id); | ||
| error_info.set_error_type(rpc::ErrorType::ACTOR_DIED); | ||
| error_info.set_error_message( | ||
| "The actor is dead because its worker process has died."); | ||
| task_manager_.FailPendingTask(task_spec.TaskId(), | ||
| rpc::ErrorType::ACTOR_DIED, | ||
| /*status*/ nullptr, | ||
| &error_info); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some code duplication in this if-else if-else block that can be refactored for better readability and maintainability.\n\nSpecifically:\n- CancelDependencyResolution(task_id) is called in all three branches.\n- The logic to set error_info is duplicated in the else if and else branches.\n\nMoving the common code out of the branches would make the logic clearer.
CancelDependencyResolution(task_id);\n if (is_actor_dead) {\n RAY_LOG(DEBUG) << \"Task \" << task_id << \" cancelled due to actor \" << actor_id\n << \" death\";\n task_manager_.FailPendingTask(task_spec.TaskId(),\n error_info.error_type(),\n /*status*/ nullptr,\n &error_info);\n } else {\n // The actor is not yet marked as DEAD, but we know the worker is exiting.\n // We will either wait for the death info from GCS, or fail the task immediately.\n error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);\n error_info.set_error_message(\n \"The actor is dead because its worker process has died.\");\n\n if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) {\n int64_t death_info_grace_period_ms =\n current_time_ms() +\n RayConfig::instance().timeout_ms_task_wait_for_death_info();\n\n {\n absl::MutexLock lock(&mu_);\n auto queue_pair = client_queues_.find(actor_id);\n RAY_CHECK(queue_pair != client_queues_.end());\n auto &queue = queue_pair->second;\n queue.wait_for_death_info_tasks_.push_back(\n std::make_shared<PendingTaskWaitingForDeathInfo>(\n death_info_grace_period_ms, task_spec, status, error_info));\n RAY_LOG(INFO).WithField(task_spec.TaskId())\n << \"Task cancelled during actor shutdown, waiting for death info from GCS\"\n << \", wait_queue_size=\" << queue.wait_for_death_info_tasks_.size();\n }\n } else {\n task_manager_.FailPendingTask(task_spec.TaskId(),\n rpc::ErrorType::ACTOR_DIED,\n /*status*/ nullptr,\n &error_info);\n }\n }Original PR #57538 by aslonnie Original: ray-project/ray#57538
…llation due to actor death Merged from original PR #57538 Original: ray-project/ray#57538
cherrypick #57095
Check if task cancellation is due to actor shutdown or explicit user cancellation. Actor shutdown should raise RayActorError, not TaskCancelledError.
Closes #57092