Skip to content

Commit

Permalink
[Core] Remove Status::GrpcUnavailable and Status::GrpcUnknown since w…
Browse files Browse the repository at this point in the history
…e have Status::RpcError (ray-project#46167)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Jun 21, 2024
1 parent 50cbe6a commit e5d651d
Show file tree
Hide file tree
Showing 12 changed files with 20 additions and 49 deletions.
6 changes: 3 additions & 3 deletions python/ray/tests/test_unavailable_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def body():
# Break the grpc connection from this process to the actor process. The
# next `ray.get` call should fail with ActorUnavailableError.
close_common_connections(pid)
with pytest.raises(ActorUnavailableError, match="Grpc"):
with pytest.raises(ActorUnavailableError, match="RpcError"):
ray.get(task)
# Since the remote() call happens *before* the break, the actor did receive the
# request, so the side effects are observable, and the actor recovered.
Expand All @@ -93,7 +93,7 @@ def body():
# itself won't raise an error.
close_common_connections(pid)
task2 = a.slow_increment.remote(5, 0.1)
with pytest.raises(ActorUnavailableError, match="Grpc"):
with pytest.raises(ActorUnavailableError, match="RpcError"):
ray.get(task2)
assert ray.get(a.read.remote()) == 9

Expand Down Expand Up @@ -206,7 +206,7 @@ def test_unavailable_then_actor_error(ray_start_regular):
# calls get ActorUnavailableError.
sigkill_actor(a)

with pytest.raises(ActorUnavailableError, match="Grpc"):
with pytest.raises(ActorUnavailableError, match="RpcError"):
print(ray.get(a.ping.remote("unavailable")))
# When the actor is restarting, any method call raises ActorUnavailableError.
with pytest.raises(ActorUnavailableError, match="The actor is restarting"):
Expand Down
8 changes: 2 additions & 6 deletions src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,9 @@ inline Status GrpcStatusToRayStatus(const grpc::Status &grpc_status) {
// See RayStatusToGrpcStatus for details.
return Status(Status::StringToCode(grpc_status.error_message()),
grpc_status.error_details());
} else if (grpc_status.error_code() == grpc::StatusCode::UNAVAILABLE) {
return Status::GrpcUnavailable(GrpcStatusToRayStatusMessage(grpc_status));
} else {
// TODO(jjyao) Use GrpcUnknown as the catch-all status for all
// the unhandled grpc status.
// If needed, we can define a ray status for each grpc status in the future.
return Status::GrpcUnknown(GrpcStatusToRayStatusMessage(grpc_status));
return Status::RpcError(GrpcStatusToRayStatusMessage(grpc_status),
grpc_status.error_code());
}
}

Expand Down
4 changes: 0 additions & 4 deletions src/ray/common/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ namespace ray {
#define STATUS_CODE_OBJECT_ALREADY_SEALED "ObjectAlreadySealed"
#define STATUS_CODE_OBJECT_STORE_FULL "ObjectStoreFull"
#define STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL "TransientObjectStoreFull"
#define STATUS_CODE_GRPC_UNAVAILABLE "GrpcUnavailable"
#define STATUS_CODE_GRPC_UNKNOWN "GrpcUnknown"
#define STATUS_CODE_OUT_OF_DISK "OutOfDisk"
#define STATUS_CODE_OBJECT_UNKNOWN_OWNER "ObjectUnknownOwner"
#define STATUS_CODE_RPC_ERROR "RpcError"
Expand Down Expand Up @@ -98,8 +96,6 @@ const absl::flat_hash_map<StatusCode, std::string> kCodeToStr = {
{StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_ALREADY_SEALED},
{StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL},
{StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL},
{StatusCode::GrpcUnavailable, STATUS_CODE_GRPC_UNAVAILABLE},
{StatusCode::GrpcUnknown, STATUS_CODE_GRPC_UNKNOWN},
{StatusCode::OutOfDisk, STATUS_CODE_OUT_OF_DISK},
{StatusCode::ObjectUnknownOwner, STATUS_CODE_OBJECT_UNKNOWN_OWNER},
{StatusCode::RpcError, STATUS_CODE_RPC_ERROR},
Expand Down
19 changes: 0 additions & 19 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ enum class StatusCode : char {
ObjectAlreadySealed = 23,
ObjectStoreFull = 24,
TransientObjectStoreFull = 25,
// grpc status
// This represents UNAVAILABLE status code
// returned by grpc.
GrpcUnavailable = 26,
// This represents all other status codes
// returned by grpc that are not defined above.
GrpcUnknown = 27,
// Object store is both out of memory and
// out of disk.
OutOfDisk = 28,
Expand Down Expand Up @@ -243,14 +236,6 @@ class RAY_EXPORT Status {
return Status(StatusCode::OutOfDisk, msg);
}

static Status GrpcUnavailable(const std::string &msg) {
return Status(StatusCode::GrpcUnavailable, msg);
}

static Status GrpcUnknown(const std::string &msg) {
return Status(StatusCode::GrpcUnknown, msg);
}

static Status RpcError(const std::string &msg, int rpc_code) {
return Status(StatusCode::RpcError, msg, rpc_code);
}
Expand Down Expand Up @@ -306,10 +291,6 @@ class RAY_EXPORT Status {
bool IsTransientObjectStoreFull() const {
return code() == StatusCode::TransientObjectStoreFull;
}
bool IsGrpcUnavailable() const { return code() == StatusCode::GrpcUnavailable; }
bool IsGrpcUnknown() const { return code() == StatusCode::GrpcUnknown; }

bool IsGrpcError() const { return IsGrpcUnknown() || IsGrpcUnavailable(); }

bool IsRpcError() const { return code() == StatusCode::RpcError; }

Expand Down
6 changes: 4 additions & 2 deletions src/ray/common/test/status_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ TEST_F(StatusTest, GrpcStatusToRayStatus) {

grpc_status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "foo", "bar");
ray_status = GrpcStatusToRayStatus(grpc_status);
ASSERT_TRUE(ray_status.IsGrpcUnavailable());
ASSERT_TRUE(ray_status.IsRpcError());
ASSERT_EQ(ray_status.rpc_code(), grpc::StatusCode::UNAVAILABLE);

grpc_status = grpc::Status(grpc::StatusCode::UNKNOWN, "foo", "bar");
ray_status = GrpcStatusToRayStatus(grpc_status);
ASSERT_TRUE(ray_status.IsGrpcUnknown());
ASSERT_TRUE(ray_status.IsRpcError());
ASSERT_EQ(ray_status.rpc_code(), grpc::StatusCode::UNKNOWN);

grpc_status = grpc::Status(grpc::StatusCode::ABORTED, "foo", "bar");
ray_status = GrpcStatusToRayStatus(grpc_status);
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
}

// If there's no more attempt to try.
if (status.IsGrpcUnavailable()) {
if (status.IsRpcError() &&
status.rpc_code() == grpc::StatusCode::UNAVAILABLE) {
std::ostringstream ss;
ss << "Failed to get the system config from raylet because "
<< "it is dead. Worker will terminate. Status: " << status
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/direct_task_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MockRayletClient : public WorkerLeaseInterface {
return false;
} else {
auto callback = callbacks.front();
callback(Status::GrpcUnavailable("unavailable"), reply);
callback(Status::RpcError("unavailable", grpc::StatusCode::UNAVAILABLE), reply);
callbacks.pop_front();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/task_event_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ TEST_F(TaskEventBufferTest, TestFailedFlush) {
.Times(2)
.WillOnce([&](std::unique_ptr<rpc::TaskEventData> actual_data,
ray::gcs::StatusCallback callback) {
callback(Status::GrpcUnknown("grpc error"));
callback(Status::RpcError("grpc error", grpc::StatusCode::UNKNOWN));
return Status::OK();
})
.WillOnce([&](std::unique_ptr<rpc::TaskEventData> actual_data,
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
RequestNewWorkerIfNeeded(scheduling_key);

} else {
if (status.IsGrpcUnavailable()) {
if (status.IsRpcError() &&
status.rpc_code() == grpc::StatusCode::UNAVAILABLE) {
RAY_LOG(WARNING)
<< "The worker failed to receive a response from the local "
<< "raylet because the raylet is unavailable (crashed). "
Expand Down
12 changes: 3 additions & 9 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id,
RAY_LOG(DEBUG) << "Received cluster ID from GCS server: " << cluster_id_;
RAY_CHECK(!cluster_id_.IsNil());
break;
} else if (!connect_status.IsGrpcError()) {
} else if (!connect_status.IsRpcError()) {
return HandleGcsError(reply.status());
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
Expand Down Expand Up @@ -415,15 +415,9 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri,
if (status.ok()) {
if (reply.status().code() == static_cast<int>(StatusCode::OK)) {
return Status::OK();
} else if (reply.status().code() == static_cast<int>(StatusCode::GrpcUnavailable)) {
std::string msg =
"Failed to pin URI reference for " + uri + " due to the GCS being " +
"unavailable, most likely it has crashed: " + reply.status().message() + ".";
return Status::GrpcUnavailable(msg);
} else {
return Status(StatusCode(reply.status().code()), reply.status().message());
}
std::string msg = "Failed to pin URI reference for " + uri +
" due to unexpected error " + reply.status().message() + ".";
return Status::GrpcUnknown(msg);
}
return Status::RpcError(status.error_message(), status.error_code());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeErrorDuringCommittingResources) {
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
// node1 is experiencing transient connection failure.
ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources(
ray::Status::GrpcUnavailable("unavailable")));
ray::Status::RpcError("unavailable", grpc::StatusCode::UNAVAILABLE)));
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class GcsRpcClient {
callback(status, reply);
}
delete executor;
} else if (!status.IsGrpcError()) {
} else if (!status.IsRpcError()) {
callback(status, reply);
delete executor;
} else {
Expand Down

0 comments on commit e5d651d

Please sign in to comment.