Skip to content

Commit c5999db

Browse files
ctillercopybara-github
authored andcommitted
[call-v3] Fix leak with cq-based server (#37972)
Three problems: 1. We have an owning waker, but on the `Expire` path we never wake it, leading to calls being stranded until the pending timer runs out - instead we now call Finish and have it always wake things up (slightly more expensive in shutdown case, but not on the fast path) 2. Avoid a race condition whereby two threads could wake the same waker 3. Don't add new requests to the pending queue after we've removed all requests Closes #37972 COPYBARA_INTEGRATE_REVIEW=#37972 from ctiller:flake-fightas-21 2bbd1cf PiperOrigin-RevId: 688310530
1 parent eacb2f7 commit c5999db

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

src/core/server/server.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
311311
pending_promises_.front()->Finish(absl::InternalError("Server closed"));
312312
pending_promises_.pop();
313313
}
314+
zombified_ = true;
314315
}
315316

316317
void KillRequests(grpc_error_handle error) override {
@@ -468,6 +469,9 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
468469
return Immediate(absl::ResourceExhaustedError(
469470
"Too many pending requests for this server"));
470471
}
472+
if (zombified_) {
473+
return Immediate(absl::InternalError("Server closed"));
474+
}
471475
auto w = std::make_shared<ActivityWaiter>(
472476
GetContext<Activity>()->MakeOwningWaker());
473477
pending_promises_.push(w);
@@ -478,7 +482,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
478482
if (r == nullptr) return Pending{};
479483
return std::move(*r);
480484
},
481-
[w]() { w->Expire(); });
485+
[w]() { w->Finish(absl::CancelledError()); });
482486
}
483487
}
484488
return Immediate(MatchResult(server(), cq_idx, rc));
@@ -498,8 +502,14 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
498502
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
499503
~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
500504
void Finish(absl::Status status) {
501-
delete result.exchange(new ResultType(std::move(status)),
502-
std::memory_order_acq_rel);
505+
ResultType* expected = nullptr;
506+
ResultType* new_value = new ResultType(std::move(status));
507+
if (!result.compare_exchange_strong(expected, new_value,
508+
std::memory_order_acq_rel,
509+
std::memory_order_acquire)) {
510+
delete new_value;
511+
return;
512+
}
503513
waker.WakeupAsync();
504514
}
505515
// Returns true if requested_call consumed, false otherwise.
@@ -518,10 +528,6 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
518528
waker.WakeupAsync();
519529
return true;
520530
}
521-
void Expire() {
522-
delete result.exchange(new ResultType(absl::CancelledError()),
523-
std::memory_order_acq_rel);
524-
}
525531
Duration Age() { return Timestamp::Now() - created; }
526532
Waker waker;
527533
std::atomic<ResultType*> result{nullptr};
@@ -531,6 +537,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
531537
std::queue<PendingCallFilterStack> pending_filter_stack_;
532538
std::queue<PendingCallPromises> pending_promises_;
533539
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
540+
bool zombified_ = false;
534541
};
535542

536543
// AllocatingRequestMatchers don't allow the application to request an RPC in

0 commit comments

Comments
 (0)