Skip to content

Commit 350f90e

Browse files
authored
services: Avoid cancellation exceptions when notifying watchers that already have their connections cancelled (#11934)
Some clients watching health status can cancel their watch and `HealthService` when trying to notify these watchers were getting CANCELLED exception because there was no cancellation handler set on the `StreamObserver`. This change sets the cancellation handler that removes the watcher from the set of watcher clients to be notified of the health status.
1 parent 3961a92 commit 350f90e

File tree

2 files changed

+38
-10
lines changed

2 files changed

+38
-10
lines changed

services/src/main/java/io/grpc/protobuf/services/HealthServiceImpl.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.health.v1.HealthCheckResponse;
2828
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
2929
import io.grpc.health.v1.HealthGrpc;
30+
import io.grpc.stub.ServerCallStreamObserver;
3031
import io.grpc.stub.StreamObserver;
3132
import java.util.HashMap;
3233
import java.util.IdentityHashMap;
@@ -83,6 +84,11 @@ public void watch(HealthCheckRequest request,
8384
final StreamObserver<HealthCheckResponse> responseObserver) {
8485
final String service = request.getService();
8586
synchronized (watchLock) {
87+
if (responseObserver instanceof ServerCallStreamObserver) {
88+
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(() -> {
89+
removeWatcher(service, responseObserver);
90+
});
91+
}
8692
ServingStatus status = statusMap.get(service);
8793
responseObserver.onNext(getResponseForWatch(status));
8894
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
@@ -98,21 +104,25 @@ public void watch(HealthCheckRequest request,
98104
@Override
99105
// Called when the client has closed the stream
100106
public void cancelled(Context context) {
101-
synchronized (watchLock) {
102-
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
103-
watchers.get(service);
104-
if (serviceWatchers != null) {
105-
serviceWatchers.remove(responseObserver);
106-
if (serviceWatchers.isEmpty()) {
107-
watchers.remove(service);
108-
}
109-
}
110-
}
107+
removeWatcher(service, responseObserver);
111108
}
112109
},
113110
MoreExecutors.directExecutor());
114111
}
115112

113+
void removeWatcher(String service, StreamObserver<HealthCheckResponse> responseObserver) {
114+
synchronized (watchLock) {
115+
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
116+
watchers.get(service);
117+
if (serviceWatchers != null) {
118+
serviceWatchers.remove(responseObserver);
119+
if (serviceWatchers.isEmpty()) {
120+
watchers.remove(service);
121+
}
122+
}
123+
}
124+
}
125+
116126
void setStatus(String service, ServingStatus status) {
117127
synchronized (watchLock) {
118128
if (terminal) {

services/src/test/java/io/grpc/protobuf/services/HealthStatusManagerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020
import static org.junit.Assert.fail;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
2126

2227
import io.grpc.BindableService;
2328
import io.grpc.Context;
@@ -28,6 +33,7 @@
2833
import io.grpc.health.v1.HealthCheckResponse;
2934
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
3035
import io.grpc.health.v1.HealthGrpc;
36+
import io.grpc.stub.ServerCallStreamObserver;
3137
import io.grpc.stub.StreamObserver;
3238
import io.grpc.testing.GrpcServerRule;
3339
import java.util.ArrayDeque;
@@ -109,6 +115,18 @@ public void enterTerminalState_watch() throws Exception {
109115
assertThat(obs.responses).isEmpty();
110116
}
111117

118+
@Test
119+
@SuppressWarnings("unchecked")
120+
public void serverCallStreamObserver_watch() throws Exception {
121+
manager.setStatus(SERVICE1, ServingStatus.SERVING);
122+
ServerCallStreamObserver<HealthCheckResponse> observer = mock(ServerCallStreamObserver.class);
123+
service.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), observer);
124+
125+
verify(observer, times(1))
126+
.onNext(eq(HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()));
127+
verify(observer, times(1)).setOnCancelHandler(any(Runnable.class));
128+
}
129+
112130
@Test
113131
public void enterTerminalState_ignoreClear() throws Exception {
114132
manager.setStatus(SERVICE1, ServingStatus.SERVING);

0 commit comments

Comments
 (0)