Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Remove Status::GrpcUnavailable and Status::GrpcUnknown since we have Status::RpcError #46167

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/ray/tests/test_unavailable_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def body():
# Break the grpc connection from this process to the actor process. The
# next `ray.get` call should fail with ActorUnavailableError.
close_common_connections(pid)
with pytest.raises(ActorUnavailableError, match="Grpc"):
with pytest.raises(ActorUnavailableError, match="RpcError"):
ray.get(task)
# Since the remote() call happens *before* the break, the actor did receive the
# request, so the side effects are observable, and the actor recovered.
Expand All @@ -93,7 +93,7 @@ def body():
# itself won't raise an error.
close_common_connections(pid)
task2 = a.slow_increment.remote(5, 0.1)
with pytest.raises(ActorUnavailableError, match="Grpc"):
with pytest.raises(ActorUnavailableError, match="RpcError"):
ray.get(task2)
assert ray.get(a.read.remote()) == 9

Expand Down Expand Up @@ -206,7 +206,7 @@ def test_unavailable_then_actor_error(ray_start_regular):
# calls get ActorUnavailableError.
sigkill_actor(a)

with pytest.raises(ActorUnavailableError, match="Grpc"):
with pytest.raises(ActorUnavailableError, match="RpcError"):
print(ray.get(a.ping.remote("unavailable")))
# When the actor is restarting, any method call raises ActorUnavailableError.
with pytest.raises(ActorUnavailableError, match="The actor is restarting"):
Expand Down
8 changes: 2 additions & 6 deletions src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,9 @@ inline Status GrpcStatusToRayStatus(const grpc::Status &grpc_status) {
// See RayStatusToGrpcStatus for details.
return Status(Status::StringToCode(grpc_status.error_message()),
grpc_status.error_details());
} else if (grpc_status.error_code() == grpc::StatusCode::UNAVAILABLE) {
return Status::GrpcUnavailable(GrpcStatusToRayStatusMessage(grpc_status));
} else {
// TODO(jjyao) Use GrpcUnknown as the catch-all status for all
// the unhandled grpc status.
// If needed, we can define a ray status for each grpc status in the future.
return Status::GrpcUnknown(GrpcStatusToRayStatusMessage(grpc_status));
return Status::RpcError(GrpcStatusToRayStatusMessage(grpc_status),
grpc_status.error_code());
}
}

Expand Down
4 changes: 0 additions & 4 deletions src/ray/common/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ namespace ray {
#define STATUS_CODE_OBJECT_ALREADY_SEALED "ObjectAlreadySealed"
#define STATUS_CODE_OBJECT_STORE_FULL "ObjectStoreFull"
#define STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL "TransientObjectStoreFull"
#define STATUS_CODE_GRPC_UNAVAILABLE "GrpcUnavailable"
#define STATUS_CODE_GRPC_UNKNOWN "GrpcUnknown"
#define STATUS_CODE_OUT_OF_DISK "OutOfDisk"
#define STATUS_CODE_OBJECT_UNKNOWN_OWNER "ObjectUnknownOwner"
#define STATUS_CODE_RPC_ERROR "RpcError"
Expand Down Expand Up @@ -98,8 +96,6 @@ const absl::flat_hash_map<StatusCode, std::string> kCodeToStr = {
{StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_ALREADY_SEALED},
{StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL},
{StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL},
{StatusCode::GrpcUnavailable, STATUS_CODE_GRPC_UNAVAILABLE},
{StatusCode::GrpcUnknown, STATUS_CODE_GRPC_UNKNOWN},
{StatusCode::OutOfDisk, STATUS_CODE_OUT_OF_DISK},
{StatusCode::ObjectUnknownOwner, STATUS_CODE_OBJECT_UNKNOWN_OWNER},
{StatusCode::RpcError, STATUS_CODE_RPC_ERROR},
Expand Down
19 changes: 0 additions & 19 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ enum class StatusCode : char {
ObjectAlreadySealed = 23,
ObjectStoreFull = 24,
TransientObjectStoreFull = 25,
// grpc status
// This represents UNAVAILABLE status code
// returned by grpc.
GrpcUnavailable = 26,
// This represents all other status codes
// returned by grpc that are not defined above.
GrpcUnknown = 27,
// Object store is both out of memory and
// out of disk.
OutOfDisk = 28,
Expand Down Expand Up @@ -243,14 +236,6 @@ class RAY_EXPORT Status {
return Status(StatusCode::OutOfDisk, msg);
}

static Status GrpcUnavailable(const std::string &msg) {
return Status(StatusCode::GrpcUnavailable, msg);
}

static Status GrpcUnknown(const std::string &msg) {
return Status(StatusCode::GrpcUnknown, msg);
}

static Status RpcError(const std::string &msg, int rpc_code) {
return Status(StatusCode::RpcError, msg, rpc_code);
}
Expand Down Expand Up @@ -306,10 +291,6 @@ class RAY_EXPORT Status {
bool IsTransientObjectStoreFull() const {
return code() == StatusCode::TransientObjectStoreFull;
}
bool IsGrpcUnavailable() const { return code() == StatusCode::GrpcUnavailable; }
bool IsGrpcUnknown() const { return code() == StatusCode::GrpcUnknown; }

bool IsGrpcError() const { return IsGrpcUnknown() || IsGrpcUnavailable(); }

bool IsRpcError() const { return code() == StatusCode::RpcError; }

Expand Down
6 changes: 4 additions & 2 deletions src/ray/common/test/status_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ TEST_F(StatusTest, GrpcStatusToRayStatus) {

grpc_status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "foo", "bar");
ray_status = GrpcStatusToRayStatus(grpc_status);
ASSERT_TRUE(ray_status.IsGrpcUnavailable());
ASSERT_TRUE(ray_status.IsRpcError());
ASSERT_EQ(ray_status.rpc_code(), grpc::StatusCode::UNAVAILABLE);

grpc_status = grpc::Status(grpc::StatusCode::UNKNOWN, "foo", "bar");
ray_status = GrpcStatusToRayStatus(grpc_status);
ASSERT_TRUE(ray_status.IsGrpcUnknown());
ASSERT_TRUE(ray_status.IsRpcError());
ASSERT_EQ(ray_status.rpc_code(), grpc::StatusCode::UNKNOWN);

grpc_status = grpc::Status(grpc::StatusCode::ABORTED, "foo", "bar");
ray_status = GrpcStatusToRayStatus(grpc_status);
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
}

// If there's no more attempt to try.
if (status.IsGrpcUnavailable()) {
if (status.IsRpcError() &&
status.rpc_code() == grpc::StatusCode::UNAVAILABLE) {
std::ostringstream ss;
ss << "Failed to get the system config from raylet because "
<< "it is dead. Worker will terminate. Status: " << status
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/direct_task_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MockRayletClient : public WorkerLeaseInterface {
return false;
} else {
auto callback = callbacks.front();
callback(Status::GrpcUnavailable("unavailable"), reply);
callback(Status::RpcError("unavailable", grpc::StatusCode::UNAVAILABLE), reply);
callbacks.pop_front();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/task_event_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ TEST_F(TaskEventBufferTest, TestFailedFlush) {
.Times(2)
.WillOnce([&](std::unique_ptr<rpc::TaskEventData> actual_data,
ray::gcs::StatusCallback callback) {
callback(Status::GrpcUnknown("grpc error"));
callback(Status::RpcError("grpc error", grpc::StatusCode::UNKNOWN));
return Status::OK();
})
.WillOnce([&](std::unique_ptr<rpc::TaskEventData> actual_data,
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
RequestNewWorkerIfNeeded(scheduling_key);

} else {
if (status.IsGrpcUnavailable()) {
if (status.IsRpcError() &&
status.rpc_code() == grpc::StatusCode::UNAVAILABLE) {
RAY_LOG(WARNING)
<< "The worker failed to receive a response from the local "
<< "raylet because the raylet is unavailable (crashed). "
Expand Down
12 changes: 3 additions & 9 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id,
RAY_LOG(DEBUG) << "Received cluster ID from GCS server: " << cluster_id_;
RAY_CHECK(!cluster_id_.IsNil());
break;
} else if (!connect_status.IsGrpcError()) {
} else if (!connect_status.IsRpcError()) {
return HandleGcsError(reply.status());
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
Expand Down Expand Up @@ -415,15 +415,9 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri,
if (status.ok()) {
if (reply.status().code() == static_cast<int>(StatusCode::OK)) {
return Status::OK();
} else if (reply.status().code() == static_cast<int>(StatusCode::GrpcUnavailable)) {
std::string msg =
"Failed to pin URI reference for " + uri + " due to the GCS being " +
"unavailable, most likely it has crashed: " + reply.status().message() + ".";
return Status::GrpcUnavailable(msg);
} else {
return Status(StatusCode(reply.status().code()), reply.status().message());
}
std::string msg = "Failed to pin URI reference for " + uri +
" due to unexpected error " + reply.status().message() + ".";
return Status::GrpcUnknown(msg);
}
return Status::RpcError(status.error_message(), status.error_code());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestNodeErrorDuringCommittingResources) {
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
// node1 is experiencing transient connection failure.
ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources(
ray::Status::GrpcUnavailable("unavailable")));
ray::Status::RpcError("unavailable", grpc::StatusCode::UNAVAILABLE)));
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class GcsRpcClient {
callback(status, reply);
}
delete executor;
} else if (!status.IsGrpcError()) {
} else if (!status.IsRpcError()) {
callback(status, reply);
delete executor;
} else {
Expand Down
Loading