Skip to content

Commit cc6ef72

Browse files
michalvavrikgsmet
authored andcommitted
fix(grpc): avoid repeated call and closing when interceptor closes call
(cherry picked from commit 8d93d7f)
1 parent 54bcb79 commit cc6ef72

File tree

2 files changed

+131
-17
lines changed

2 files changed

+131
-17
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.quarkus.grpc.server.interceptors;
2+
3+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4+
5+
import java.time.Duration;
6+
import java.util.logging.LogRecord;
7+
8+
import jakarta.enterprise.context.ApplicationScoped;
9+
10+
import org.assertj.core.api.Condition;
11+
import org.jboss.shrinkwrap.api.ShrinkWrap;
12+
import org.jboss.shrinkwrap.api.asset.StringAsset;
13+
import org.jboss.shrinkwrap.api.spec.JavaArchive;
14+
import org.junit.jupiter.api.Assertions;
15+
import org.junit.jupiter.api.Test;
16+
import org.junit.jupiter.api.extension.RegisterExtension;
17+
18+
import io.grpc.Metadata;
19+
import io.grpc.ServerCall;
20+
import io.grpc.ServerCallHandler;
21+
import io.grpc.ServerInterceptor;
22+
import io.grpc.Status;
23+
import io.grpc.StatusRuntimeException;
24+
import io.grpc.examples.helloworld.Greeter;
25+
import io.grpc.examples.helloworld.GreeterBean;
26+
import io.grpc.examples.helloworld.GreeterGrpc;
27+
import io.grpc.examples.helloworld.HelloReply;
28+
import io.grpc.examples.helloworld.HelloRequest;
29+
import io.quarkus.grpc.GlobalInterceptor;
30+
import io.quarkus.grpc.GrpcClient;
31+
import io.quarkus.grpc.server.services.HelloService;
32+
import io.quarkus.test.QuarkusUnitTest;
33+
import io.smallrye.mutiny.Uni;
34+
35+
public class ClosingCallInInterceptorTest {
36+
37+
private static final Metadata.Key<String> CLOSE_REASON_KEY = Metadata.Key.of("CUSTOM_CLOSE_REASON",
38+
Metadata.ASCII_STRING_MARSHALLER);
39+
private static final String STATED_REASON_TO_CLOSE = "Because I want to close it.";
40+
41+
@RegisterExtension
42+
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
43+
() -> ShrinkWrap.create(JavaArchive.class)
44+
.addPackage(GreeterGrpc.class.getPackage())
45+
.addClasses(MyClosingCallInterceptor.class, GreeterBean.class, HelloRequest.class, HelloService.class)
46+
.addAsResource(new StringAsset("quarkus.grpc.server.use-separate-server=false" + System.lineSeparator()),
47+
"application.properties"))
48+
.setLogRecordPredicate(
49+
record -> record.getMessage() != null && record.getMessage().contains("Closing gRPC call due to an error"))
50+
.assertLogRecords(logRecords -> {
51+
if (!logRecords.isEmpty()) {
52+
for (LogRecord logRecord : logRecords) {
53+
if (logRecord.getThrown() instanceof IllegalStateException ise
54+
&& ise.getMessage().contains("Already closed")) {
55+
Assertions.fail("Log contains message with 'java.lang.IllegalStateException: Already closed'");
56+
}
57+
}
58+
}
59+
});
60+
61+
@GrpcClient
62+
Greeter greeter;
63+
64+
@Test
65+
void test() {
66+
Uni<HelloReply> result = greeter.sayHello(HelloRequest.newBuilder().setName("ServiceA").build());
67+
assertThatThrownBy(() -> result.await().atMost(Duration.ofSeconds(4)))
68+
.isInstanceOf(StatusRuntimeException.class)
69+
.has(new Condition<Throwable>(t -> {
70+
if (t instanceof StatusRuntimeException statusRuntimeException) {
71+
var trailers = statusRuntimeException.getTrailers();
72+
if (trailers != null) {
73+
return STATED_REASON_TO_CLOSE.equals(trailers.get(CLOSE_REASON_KEY));
74+
}
75+
}
76+
return false;
77+
}, "Checking close reason returned in metadata"))
78+
.hasMessageContaining("UNAUTHENTICATED");
79+
}
80+
81+
@ApplicationScoped
82+
@GlobalInterceptor
83+
public static class MyClosingCallInterceptor implements ServerInterceptor {
84+
85+
@Override
86+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
87+
ServerCallHandler<ReqT, RespT> next) {
88+
var metadata = new Metadata();
89+
metadata.put(CLOSE_REASON_KEY, STATED_REASON_TO_CLOSE);
90+
call.close(Status.UNAUTHENTICATED, metadata);
91+
return new ServerCall.Listener<>() {
92+
};
93+
}
94+
}
95+
96+
}

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55
import java.util.Optional;
66
import java.util.concurrent.atomic.AtomicBoolean;
77
import java.util.function.Consumer;
8-
import java.util.function.Supplier;
8+
import java.util.function.Function;
99

1010
import jakarta.enterprise.context.ApplicationScoped;
1111
import jakarta.enterprise.inject.spi.Prioritized;
1212
import jakarta.inject.Inject;
1313

1414
import org.jboss.logging.Logger;
1515

16+
import io.grpc.ForwardingServerCall;
1617
import io.grpc.Metadata;
1718
import io.grpc.ServerCall;
1819
import io.grpc.ServerCallHandler;
1920
import io.grpc.ServerInterceptor;
21+
import io.grpc.Status;
2022
import io.grpc.StatusException;
2123
import io.quarkus.grpc.ExceptionHandlerProvider;
2224
import io.quarkus.grpc.GlobalInterceptor;
@@ -54,15 +56,27 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
5456
}
5557
}
5658

57-
private <ReqT, RespT> Supplier<ServerCall.Listener<ReqT>> nextCall(ServerCall<ReqT, RespT> call,
59+
private <ReqT, RespT> Function<Runnable, ServerCall.Listener<ReqT>> nextCall(ServerCall<ReqT, RespT> call,
5860
Metadata headers,
5961
ServerCallHandler<ReqT, RespT> next) {
6062
// Must be sure to call next.startCall on the right context
6163
io.grpc.Context current = io.grpc.Context.current();
62-
return () -> {
64+
return onClose -> {
6365
io.grpc.Context previous = current.attach();
6466
try {
65-
return next.startCall(call, headers);
67+
var forwardingCall = new ForwardingServerCall<ReqT, RespT>() {
68+
@Override
69+
protected ServerCall<ReqT, RespT> delegate() {
70+
return call;
71+
}
72+
73+
@Override
74+
public void close(Status status, Metadata trailers) {
75+
onClose.run();
76+
super.close(status, trailers);
77+
}
78+
};
79+
return next.startCall(forwardingCall, headers);
6680
} finally {
6781
current.detach(previous);
6882
}
@@ -77,31 +91,35 @@ public int getPriority() {
7791
static class ListenedOnDuplicatedContext<ReqT, RespT> extends ServerCall.Listener<ReqT> {
7892

7993
private final Context context;
80-
private final Supplier<ServerCall.Listener<ReqT>> supplier;
94+
private final Function<Runnable, ServerCall.Listener<ReqT>> listenerCreator;
8195
private final ExceptionHandlerProvider ehp;
8296
private final ServerCall<ReqT, RespT> call;
83-
private ServerCall.Listener<ReqT> delegate;
97+
private volatile ServerCall.Listener<ReqT> delegate;
8498

8599
private final AtomicBoolean closed = new AtomicBoolean();
86100

87101
public ListenedOnDuplicatedContext(
88102
ExceptionHandlerProvider ehp,
89-
ServerCall<ReqT, RespT> call, Supplier<ServerCall.Listener<ReqT>> supplier, Context context) {
103+
ServerCall<ReqT, RespT> call, Function<Runnable, ServerCall.Listener<ReqT>> listenerCreator, Context context) {
90104
this.ehp = ehp;
91105
this.context = context;
92-
this.supplier = supplier;
106+
this.listenerCreator = listenerCreator;
93107
this.call = call;
94108
}
95109

96-
private synchronized ServerCall.Listener<ReqT> getDelegate() {
97-
if (delegate == null) {
98-
try {
99-
delegate = supplier.get();
100-
} catch (Throwable t) {
101-
// If the interceptor supplier throws an exception, catch it, and close the call.
102-
log.warn("Unable to retrieve gRPC Server call listener, see the cause below.");
103-
close(t);
104-
return null;
110+
private ServerCall.Listener<ReqT> getDelegate() {
111+
if (delegate == null && !closed.get()) {
112+
synchronized (this) {
113+
if (delegate == null && !closed.get()) {
114+
try {
115+
delegate = listenerCreator.apply(() -> closed.set(true));
116+
} catch (Throwable t) {
117+
// If the interceptor supplier throws an exception, catch it, and close the call.
118+
log.warn("Unable to retrieve gRPC Server call listener, see the cause below.");
119+
close(t);
120+
return null;
121+
}
122+
}
105123
}
106124
}
107125
return delegate;

0 commit comments

Comments
 (0)