Skip to content

Commit

Permalink
[Core] Fixing gRPC server handlers to properly respect max inflight R…
Browse files Browse the repository at this point in the history
…PCs settings (ray-project#47664)

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin authored Oct 1, 2024
1 parent 289374a commit 073d143
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
8 changes: 7 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,13 @@ RAY_CONFIG(bool, isolate_workers_across_resource_types, true)
RAY_CONFIG(bool, isolate_workers_across_task_types, true)

/// ServerCall instance number of each RPC service handler
RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)
///
/// NOTE: Default value is temporarily pegged at `gcs_server_rpc_server_thread_num * 100`
/// to keep it at the level it has been prior to
/// https://github.com/ray-project/ray/pull/47664
RAY_CONFIG(int64_t,
gcs_max_active_rpcs_per_handler,
gcs_server_rpc_server_thread_num() * 100)

/// grpc keepalive sent interval for server.
/// This is only configured in GCS server now.
Expand Down
32 changes: 20 additions & 12 deletions src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,27 @@ void GrpcServer::Run() {
RAY_CHECK(port_ > 0);
RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << ".";

// Create calls for all the server call factories.
// Create calls for all the server call factories
//
// NOTE: That ServerCallFactory is created for every thread processing respective
// CompletionQueue
for (auto &entry : server_call_factories_) {
for (int i = 0; i < num_threads_; i++) {
// Create a buffer of 100 calls for each RPC handler.
// TODO(edoakes): a small buffer should be fine and seems to have better
// performance, but we don't currently handle backpressure on the client.
int buffer_size = 100;
if (entry->GetMaxActiveRPCs() != -1) {
buffer_size = entry->GetMaxActiveRPCs();
}
for (int j = 0; j < std::max(1, buffer_size / num_threads_); j++) {
entry->CreateCall();
}
// Derive target max inflight RPCs buffer based on `gcs_max_active_rpcs_per_handler`
//
// NOTE: For these handlers that have set it to -1, we set default (per
// thread) buffer at 32, though it doesn't have any impact on concurrency
// (since we're recreating new instance of `ServerCall` as soon as one
// gets occupied therefore not serving as back-pressure mechanism)
size_t buffer_size;
if (entry->GetMaxActiveRPCs() != -1) {
buffer_size = std::max(1, int(entry->GetMaxActiveRPCs() / num_threads_));
} else {
buffer_size = 32;
}

for (size_t j = 0; j < buffer_size; j++) {
// Create pending `ServerCall` ready to accept incoming requests
entry->CreateCall();
}
}
// Start threads that polls incoming requests.
Expand Down

0 comments on commit 073d143

Please sign in to comment.