diff --git a/python/ray/tests/test_unavailable_actors.py b/python/ray/tests/test_unavailable_actors.py index 27775e4b8f3e8..19f3224a941fd 100644 --- a/python/ray/tests/test_unavailable_actors.py +++ b/python/ray/tests/test_unavailable_actors.py @@ -80,7 +80,7 @@ def body(): # Break the grpc connection from this process to the actor process. The # next `ray.get` call should fail with ActorUnavailableError. close_common_connections(pid) - with pytest.raises(ActorUnavailableError, match="Grpc"): + with pytest.raises(ActorUnavailableError, match="RpcError"): ray.get(task) # Since the remote() call happens *before* the break, the actor did receive the # request, so the side effects are observable, and the actor recovered. @@ -93,7 +93,7 @@ def body(): # itself won't raise an error. close_common_connections(pid) task2 = a.slow_increment.remote(5, 0.1) - with pytest.raises(ActorUnavailableError, match="Grpc"): + with pytest.raises(ActorUnavailableError, match="RpcError"): ray.get(task2) assert ray.get(a.read.remote()) == 9 @@ -206,7 +206,7 @@ def test_unavailable_then_actor_error(ray_start_regular): # calls get ActorUnavailableError. sigkill_actor(a) - with pytest.raises(ActorUnavailableError, match="Grpc"): + with pytest.raises(ActorUnavailableError, match="RpcError"): print(ray.get(a.ping.remote("unavailable"))) # When the actor is restarting, any method call raises ActorUnavailableError. with pytest.raises(ActorUnavailableError, match="The actor is restarting"): diff --git a/src/ray/common/grpc_util.h b/src/ray/common/grpc_util.h index 4ac4ac235d547..a3df2ffe33c4e 100644 --- a/src/ray/common/grpc_util.h +++ b/src/ray/common/grpc_util.h @@ -108,13 +108,9 @@ inline Status GrpcStatusToRayStatus(const grpc::Status &grpc_status) { // See RayStatusToGrpcStatus for details. return Status(Status::StringToCode(grpc_status.error_message()), grpc_status.error_details()); - } else if (grpc_status.error_code() == grpc::StatusCode::UNAVAILABLE) { - return Status::GrpcUnavailable(GrpcStatusToRayStatusMessage(grpc_status)); } else { - // TODO(jjyao) Use GrpcUnknown as the catch-all status for all - // the unhandled grpc status. - // If needed, we can define a ray status for each grpc status in the future. - return Status::GrpcUnknown(GrpcStatusToRayStatusMessage(grpc_status)); + return Status::RpcError(GrpcStatusToRayStatusMessage(grpc_status), + grpc_status.error_code()); } } diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 4be843f7952c8..8275c78c7538b 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -58,8 +58,6 @@ namespace ray { #define STATUS_CODE_OBJECT_ALREADY_SEALED "ObjectAlreadySealed" #define STATUS_CODE_OBJECT_STORE_FULL "ObjectStoreFull" #define STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL "TransientObjectStoreFull" -#define STATUS_CODE_GRPC_UNAVAILABLE "GrpcUnavailable" -#define STATUS_CODE_GRPC_UNKNOWN "GrpcUnknown" #define STATUS_CODE_OUT_OF_DISK "OutOfDisk" #define STATUS_CODE_OBJECT_UNKNOWN_OWNER "ObjectUnknownOwner" #define STATUS_CODE_RPC_ERROR "RpcError" @@ -98,8 +96,6 @@ const absl::flat_hash_map kCodeToStr = { {StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_ALREADY_SEALED}, {StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL}, {StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL}, - {StatusCode::GrpcUnavailable, STATUS_CODE_GRPC_UNAVAILABLE}, - {StatusCode::GrpcUnknown, STATUS_CODE_GRPC_UNKNOWN}, {StatusCode::OutOfDisk, STATUS_CODE_OUT_OF_DISK}, {StatusCode::ObjectUnknownOwner, STATUS_CODE_OBJECT_UNKNOWN_OWNER}, {StatusCode::RpcError, STATUS_CODE_RPC_ERROR}, diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 307acff34b416..a93c19db0621b 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -103,13 +103,6 @@ enum class StatusCode : char { ObjectAlreadySealed = 23, ObjectStoreFull = 24, TransientObjectStoreFull = 25, - // grpc status - // This represents UNAVAILABLE status code - // returned by grpc. - GrpcUnavailable = 26, - // This represents all other status codes - // returned by grpc that are not defined above. - GrpcUnknown = 27, // Object store is both out of memory and // out of disk. OutOfDisk = 28, @@ -243,14 +236,6 @@ class RAY_EXPORT Status { return Status(StatusCode::OutOfDisk, msg); } - static Status GrpcUnavailable(const std::string &msg) { - return Status(StatusCode::GrpcUnavailable, msg); - } - - static Status GrpcUnknown(const std::string &msg) { - return Status(StatusCode::GrpcUnknown, msg); - } - static Status RpcError(const std::string &msg, int rpc_code) { return Status(StatusCode::RpcError, msg, rpc_code); } @@ -306,10 +291,6 @@ class RAY_EXPORT Status { bool IsTransientObjectStoreFull() const { return code() == StatusCode::TransientObjectStoreFull; } - bool IsGrpcUnavailable() const { return code() == StatusCode::GrpcUnavailable; } - bool IsGrpcUnknown() const { return code() == StatusCode::GrpcUnknown; } - - bool IsGrpcError() const { return IsGrpcUnknown() || IsGrpcUnavailable(); } bool IsRpcError() const { return code() == StatusCode::RpcError; } diff --git a/src/ray/common/test/status_test.cc b/src/ray/common/test/status_test.cc index 83c9c07211d33..46cc063c17f40 100644 --- a/src/ray/common/test/status_test.cc +++ b/src/ray/common/test/status_test.cc @@ -49,11 +49,13 @@ TEST_F(StatusTest, GrpcStatusToRayStatus) { grpc_status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "foo", "bar"); ray_status = GrpcStatusToRayStatus(grpc_status); - ASSERT_TRUE(ray_status.IsGrpcUnavailable()); + ASSERT_TRUE(ray_status.IsRpcError()); + ASSERT_EQ(ray_status.rpc_code(), grpc::StatusCode::UNAVAILABLE); grpc_status = grpc::Status(grpc::StatusCode::UNKNOWN, "foo", "bar"); ray_status = GrpcStatusToRayStatus(grpc_status); - ASSERT_TRUE(ray_status.IsGrpcUnknown()); + ASSERT_TRUE(ray_status.IsRpcError()); + ASSERT_EQ(ray_status.rpc_code(), grpc::StatusCode::UNKNOWN); grpc_status = grpc::Status(grpc::StatusCode::ABORTED, "foo", "bar"); ray_status = GrpcStatusToRayStatus(grpc_status); diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 95ab400f1689a..98ff0c18db49c 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -206,7 +206,8 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() { } // If there's no more attempt to try. - if (status.IsGrpcUnavailable()) { + if (status.IsRpcError() && + status.rpc_code() == grpc::StatusCode::UNAVAILABLE) { std::ostringstream ss; ss << "Failed to get the system config from raylet because " << "it is dead. Worker will terminate. Status: " << status diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 8f11897ad1ca7..9b48e51c7cfb5 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -287,7 +287,7 @@ class MockRayletClient : public WorkerLeaseInterface { return false; } else { auto callback = callbacks.front(); - callback(Status::GrpcUnavailable("unavailable"), reply); + callback(Status::RpcError("unavailable", grpc::StatusCode::UNAVAILABLE), reply); callbacks.pop_front(); return true; } diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index cf12907e6c4cc..63e24485ec098 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -271,7 +271,7 @@ TEST_F(TaskEventBufferTest, TestFailedFlush) { .Times(2) .WillOnce([&](std::unique_ptr actual_data, ray::gcs::StatusCallback callback) { - callback(Status::GrpcUnknown("grpc error")); + callback(Status::RpcError("grpc error", grpc::StatusCode::UNKNOWN)); return Status::OK(); }) .WillOnce([&](std::unique_ptr actual_data, diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index b419fae2ffea2..15e7c386c7c37 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -528,7 +528,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( RequestNewWorkerIfNeeded(scheduling_key); } else { - if (status.IsGrpcUnavailable()) { + if (status.IsRpcError() && + status.rpc_code() == grpc::StatusCode::UNAVAILABLE) { RAY_LOG(WARNING) << "The worker failed to receive a response from the local " << "raylet because the raylet is unavailable (crashed). " diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 21bd76b1e5f45..54a23ed97cec4 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -184,7 +184,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, RAY_LOG(DEBUG) << "Received cluster ID from GCS server: " << cluster_id_; RAY_CHECK(!cluster_id_.IsNil()); break; - } else if (!connect_status.IsGrpcError()) { + } else if (!connect_status.IsRpcError()) { return HandleGcsError(reply.status()); } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -415,15 +415,9 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, if (status.ok()) { if (reply.status().code() == static_cast(StatusCode::OK)) { return Status::OK(); - } else if (reply.status().code() == static_cast(StatusCode::GrpcUnavailable)) { - std::string msg = - "Failed to pin URI reference for " + uri + " due to the GCS being " + - "unavailable, most likely it has crashed: " + reply.status().message() + "."; - return Status::GrpcUnavailable(msg); + } else { + return Status(StatusCode(reply.status().code()), reply.status().message()); } - std::string msg = "Failed to pin URI reference for " + uri + - " due to unexpected error " + reply.status().message() + "."; - return Status::GrpcUnknown(msg); } return Status::RpcError(status.error_message(), status.error_code()); } diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index f94d3ac2a8b2a..cbbf4b7f89939 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -994,7 +994,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeErrorDuringCommittingResources) { ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); // node1 is experiencing transient connection failure. ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources( - ray::Status::GrpcUnavailable("unavailable"))); + ray::Status::RpcError("unavailable", grpc::StatusCode::UNAVAILABLE))); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 2fb74996b2900..fe28d1632a8f6 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -238,7 +238,7 @@ class GcsRpcClient { callback(status, reply); } delete executor; - } else if (!status.IsGrpcError()) { + } else if (!status.IsRpcError()) { callback(status, reply); delete executor; } else {