Skip to content

Commit

Permalink
Cap concurrent requests (ray-project#5341)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen authored and pcmoritz committed Aug 6, 2019
1 parent e3c9f7e commit 3ad2fe7
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 13 deletions.
4 changes: 1 addition & 3 deletions src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ void GrpcServer::PollEventsFromCompletionQueue() {
switch (server_call->GetState()) {
case ServerCallState::PENDING:
// We've received a new incoming request. Now this call object is used to
// track this request. So we need to create another call to handle next
// incoming request.
server_call->GetFactory().CreateCall();
// track this request.
server_call->SetState(ServerCallState::PROCESSING);
server_call->HandleRequest();
break;
Expand Down
8 changes: 3 additions & 5 deletions src/ray/rpc/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class GrpcServer {
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.
std::vector<std::reference_wrapper<grpc::Service>> services_;
/// The `ServerCallFactory` objects, and the maximum number of concurrent requests that
/// gRPC server can accept.
/// this gRPC server can handle.
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
server_call_factories_and_concurrencies_;
/// The `ServerCompletionQueue` object used for polling events.
Expand Down Expand Up @@ -121,13 +121,11 @@ class GrpcService {

/// Subclasses should implement this method to initialize the `ServerCallFactory`
/// instances, as well as specify maximum number of concurrent requests that gRPC
/// server can "accept" (not "handle"). Each factory will be used to create
/// `accept_concurrency` `ServerCall` objects, each of which will be used to accept and
/// handle an incoming request.
/// server can handle.
///
/// \param[in] cq The grpc completion queue.
/// \param[out] server_call_factories_and_concurrencies The `ServerCallFactory` objects,
/// and the maximum number of concurrent requests that gRPC server can accept.
/// and the maximum number of concurrent requests that this gRPC server can handle.
virtual void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
Expand Down
11 changes: 6 additions & 5 deletions src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ class ServerCall {
/// `GrpcServer` when the request is received.
virtual void HandleRequest() = 0;

/// Get the factory that created this `ServerCall`.
virtual const ServerCallFactory &GetFactory() const = 0;

/// Invoked when sending reply successes.
virtual void OnReplySent() = 0;

Expand Down Expand Up @@ -141,6 +138,9 @@ class ServerCallImpl : public ServerCall {

void HandleRequestImpl() {
state_ = ServerCallState::PROCESSING;
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
const auto &factory = factory_;
(service_handler_.*handle_request_function_)(
request_, &reply_,
[this](Status status, std::function<void()> success,
Expand All @@ -155,10 +155,11 @@ class ServerCallImpl : public ServerCall {
// this server call might be deleted
SendReply(status);
});
// We've finished handling this request,
// create a new `ServerCall` to accept the next incoming request.
factory.CreateCall();
}

const ServerCallFactory &GetFactory() const override { return factory_; }

void OnReplySent() override {
if (send_reply_success_callback_ && !io_service_.stopped()) {
auto callback = std::move(send_reply_success_callback_);
Expand Down

0 comments on commit 3ad2fe7

Please sign in to comment.