@@ -104,85 +104,182 @@ public static interface BidiStreamingMethod<ReqT, RespT>
104104 extends StreamingRequestMethod <ReqT , RespT > {
105105 }
106106
107+ private static final class UnaryServerCallHandler <ReqT , RespT >
108+ implements ServerCallHandler <ReqT , RespT > {
109+
110+ private final UnaryRequestMethod <ReqT , RespT > method ;
111+
112+ // Non private to avoid synthetic class
113+ UnaryServerCallHandler (UnaryRequestMethod <ReqT , RespT > method ) {
114+ this .method = method ;
115+ }
116+
117+ @ Override
118+ public ServerCall .Listener <ReqT > startCall (ServerCall <ReqT , RespT > call , Metadata headers ) {
119+ Preconditions .checkArgument (
120+ call .getMethodDescriptor ().getType ().clientSendsOneMessage (),
121+ "asyncUnaryRequestCall is only for clientSendsOneMessage methods" );
122+ ServerCallStreamObserverImpl <ReqT , RespT > responseObserver =
123+ new ServerCallStreamObserverImpl <ReqT , RespT >(call );
124+ // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
125+ // sends more than 1 requests, ServerCall will catch it. Note that disabling auto
126+ // inbound flow control has no effect on unary calls.
127+ call .request (2 );
128+ return new UnaryServerCallListener (responseObserver , call );
129+ }
130+
131+ private final class UnaryServerCallListener extends ServerCall .Listener <ReqT > {
132+ private final ServerCall <ReqT , RespT > call ;
133+ private final ServerCallStreamObserverImpl <ReqT , RespT > responseObserver ;
134+ private boolean canInvoke = true ;
135+ private ReqT request ;
136+
137+ // Non private to avoid synthetic class
138+ UnaryServerCallListener (
139+ ServerCallStreamObserverImpl <ReqT , RespT > responseObserver ,
140+ ServerCall <ReqT , RespT > call ) {
141+ this .call = call ;
142+ this .responseObserver = responseObserver ;
143+ }
144+
145+ @ Override
146+ public void onMessage (ReqT request ) {
147+ if (this .request != null ) {
148+ // Safe to close the call, because the application has not yet been invoked
149+ call .close (
150+ Status .INTERNAL .withDescription (TOO_MANY_REQUESTS ),
151+ new Metadata ());
152+ canInvoke = false ;
153+ return ;
154+ }
155+
156+ // We delay calling method.invoke() until onHalfClose() to make sure the client
157+ // half-closes.
158+ this .request = request ;
159+ }
160+
161+ @ Override
162+ public void onHalfClose () {
163+ if (!canInvoke ) {
164+ return ;
165+ }
166+ if (request == null ) {
167+ // Safe to close the call, because the application has not yet been invoked
168+ call .close (
169+ Status .INTERNAL .withDescription (MISSING_REQUEST ),
170+ new Metadata ());
171+ return ;
172+ }
173+
174+ method .invoke (request , responseObserver );
175+ responseObserver .freeze ();
176+ if (call .isReady ()) {
177+ // Since we are calling invoke in halfClose we have missed the onReady
178+ // event from the transport so recover it here.
179+ onReady ();
180+ }
181+ }
182+
183+ @ Override
184+ public void onCancel () {
185+ responseObserver .cancelled = true ;
186+ if (responseObserver .onCancelHandler != null ) {
187+ responseObserver .onCancelHandler .run ();
188+ }
189+ }
190+
191+ @ Override
192+ public void onReady () {
193+ if (responseObserver .onReadyHandler != null ) {
194+ responseObserver .onReadyHandler .run ();
195+ }
196+ }
197+ }
198+ }
199+
107200 /**
108201 * Creates a {@code ServerCallHandler} for a unary request call method of the service.
109202 *
110203 * @param method an adaptor to the actual method on the service implementation.
111204 */
112205 private static <ReqT , RespT > ServerCallHandler <ReqT , RespT > asyncUnaryRequestCall (
113- final UnaryRequestMethod <ReqT , RespT > method ) {
114- return new ServerCallHandler <ReqT , RespT >() {
206+ UnaryRequestMethod <ReqT , RespT > method ) {
207+ return new UnaryServerCallHandler <ReqT , RespT >(method );
208+ }
209+
210+ private static final class StreamingServerCallHandler <ReqT , RespT >
211+ implements ServerCallHandler <ReqT , RespT > {
212+
213+ private final StreamingRequestMethod <ReqT , RespT > method ;
214+
215+ // Non private to avoid synthetic class
216+ StreamingServerCallHandler (StreamingRequestMethod <ReqT , RespT > method ) {
217+ this .method = method ;
218+ }
219+
220+ @ Override
221+ public ServerCall .Listener <ReqT > startCall (ServerCall <ReqT , RespT > call , Metadata headers ) {
222+ ServerCallStreamObserverImpl <ReqT , RespT > responseObserver =
223+ new ServerCallStreamObserverImpl <ReqT , RespT >(call );
224+ StreamObserver <ReqT > requestObserver = method .invoke (responseObserver );
225+ responseObserver .freeze ();
226+ if (responseObserver .autoFlowControlEnabled ) {
227+ call .request (1 );
228+ }
229+ return new StreamingServerCallListener (requestObserver , responseObserver , call );
230+ }
231+
232+ private final class StreamingServerCallListener extends ServerCall .Listener <ReqT > {
233+
234+ private final StreamObserver <ReqT > requestObserver ;
235+ private final ServerCallStreamObserverImpl <ReqT , RespT > responseObserver ;
236+ private final ServerCall <ReqT , RespT > call ;
237+ private boolean halfClosed = false ;
238+
239+ // Non private to avoid synthetic class
240+ StreamingServerCallListener (
241+ StreamObserver <ReqT > requestObserver ,
242+ ServerCallStreamObserverImpl <ReqT , RespT > responseObserver ,
243+ ServerCall <ReqT , RespT > call ) {
244+ this .requestObserver = requestObserver ;
245+ this .responseObserver = responseObserver ;
246+ this .call = call ;
247+ }
248+
249+ @ Override
250+ public void onMessage (ReqT request ) {
251+ requestObserver .onNext (request );
252+
253+ // Request delivery of the next inbound message.
254+ if (responseObserver .autoFlowControlEnabled ) {
255+ call .request (1 );
256+ }
257+ }
258+
259+ @ Override
260+ public void onHalfClose () {
261+ halfClosed = true ;
262+ requestObserver .onCompleted ();
263+ }
264+
265+ @ Override
266+ public void onCancel () {
267+ responseObserver .cancelled = true ;
268+ if (responseObserver .onCancelHandler != null ) {
269+ responseObserver .onCancelHandler .run ();
270+ }
271+ if (!halfClosed ) {
272+ requestObserver .onError (Status .CANCELLED .asException ());
273+ }
274+ }
275+
115276 @ Override
116- public ServerCall .Listener <ReqT > startCall (
117- final ServerCall <ReqT , RespT > call ,
118- Metadata headers ) {
119- Preconditions .checkArgument (
120- call .getMethodDescriptor ().getType ().clientSendsOneMessage (),
121- "asyncUnaryRequestCall is only for clientSendsOneMessage methods" );
122- final ServerCallStreamObserverImpl <ReqT , RespT > responseObserver =
123- new ServerCallStreamObserverImpl <ReqT , RespT >(call );
124- // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
125- // sends more than 1 requests, ServerCall will catch it. Note that disabling auto
126- // inbound flow control has no effect on unary calls.
127- call .request (2 );
128- return new EmptyServerCallListener <ReqT >() {
129- boolean canInvoke = true ;
130- ReqT request ;
131- @ Override
132- public void onMessage (ReqT request ) {
133- if (this .request != null ) {
134- // Safe to close the call, because the application has not yet been invoked
135- call .close (
136- Status .INTERNAL .withDescription (TOO_MANY_REQUESTS ),
137- new Metadata ());
138- canInvoke = false ;
139- return ;
140- }
141-
142- // We delay calling method.invoke() until onHalfClose() to make sure the client
143- // half-closes.
144- this .request = request ;
145- }
146-
147- @ Override
148- public void onHalfClose () {
149- if (!canInvoke ) {
150- return ;
151- }
152- if (request == null ) {
153- // Safe to close the call, because the application has not yet been invoked
154- call .close (
155- Status .INTERNAL .withDescription (MISSING_REQUEST ),
156- new Metadata ());
157- return ;
158- }
159-
160- method .invoke (request , responseObserver );
161- responseObserver .freeze ();
162- if (call .isReady ()) {
163- // Since we are calling invoke in halfClose we have missed the onReady
164- // event from the transport so recover it here.
165- onReady ();
166- }
167- }
168-
169- @ Override
170- public void onCancel () {
171- responseObserver .cancelled = true ;
172- if (responseObserver .onCancelHandler != null ) {
173- responseObserver .onCancelHandler .run ();
174- }
175- }
176-
177- @ Override
178- public void onReady () {
179- if (responseObserver .onReadyHandler != null ) {
180- responseObserver .onReadyHandler .run ();
181- }
182- }
183- };
277+ public void onReady () {
278+ if (responseObserver .onReadyHandler != null ) {
279+ responseObserver .onReadyHandler .run ();
280+ }
184281 }
185- };
282+ }
186283 }
187284
188285 /**
@@ -191,58 +288,8 @@ public void onReady() {
191288 * @param method an adaptor to the actual method on the service implementation.
192289 */
193290 private static <ReqT , RespT > ServerCallHandler <ReqT , RespT > asyncStreamingRequestCall (
194- final StreamingRequestMethod <ReqT , RespT > method ) {
195- return new ServerCallHandler <ReqT , RespT >() {
196- @ Override
197- public ServerCall .Listener <ReqT > startCall (
198- final ServerCall <ReqT , RespT > call ,
199- Metadata headers ) {
200- final ServerCallStreamObserverImpl <ReqT , RespT > responseObserver =
201- new ServerCallStreamObserverImpl <ReqT , RespT >(call );
202- final StreamObserver <ReqT > requestObserver = method .invoke (responseObserver );
203- responseObserver .freeze ();
204- if (responseObserver .autoFlowControlEnabled ) {
205- call .request (1 );
206- }
207- return new EmptyServerCallListener <ReqT >() {
208- boolean halfClosed = false ;
209-
210- @ Override
211- public void onMessage (ReqT request ) {
212- requestObserver .onNext (request );
213-
214- // Request delivery of the next inbound message.
215- if (responseObserver .autoFlowControlEnabled ) {
216- call .request (1 );
217- }
218- }
219-
220- @ Override
221- public void onHalfClose () {
222- halfClosed = true ;
223- requestObserver .onCompleted ();
224- }
225-
226- @ Override
227- public void onCancel () {
228- responseObserver .cancelled = true ;
229- if (responseObserver .onCancelHandler != null ) {
230- responseObserver .onCancelHandler .run ();
231- }
232- if (!halfClosed ) {
233- requestObserver .onError (Status .CANCELLED .asException ());
234- }
235- }
236-
237- @ Override
238- public void onReady () {
239- if (responseObserver .onReadyHandler != null ) {
240- responseObserver .onReadyHandler .run ();
241- }
242- }
243- };
244- }
245- };
291+ StreamingRequestMethod <ReqT , RespT > method ) {
292+ return new StreamingServerCallHandler <ReqT , RespT >(method );
246293 }
247294
248295 private static interface UnaryRequestMethod <ReqT , RespT > {
@@ -263,6 +310,7 @@ private static final class ServerCallStreamObserverImpl<ReqT, RespT>
263310 private Runnable onReadyHandler ;
264311 private Runnable onCancelHandler ;
265312
313+ // Non private to avoid synthetic class
266314 ServerCallStreamObserverImpl (ServerCall <ReqT , RespT > call ) {
267315 this .call = call ;
268316 }
@@ -352,24 +400,6 @@ public void request(int count) {
352400 }
353401 }
354402
355- private static class EmptyServerCallListener <ReqT > extends ServerCall .Listener <ReqT > {
356- @ Override
357- public void onMessage (ReqT request ) {
358- }
359-
360- @ Override
361- public void onHalfClose () {
362- }
363-
364- @ Override
365- public void onCancel () {
366- }
367-
368- @ Override
369- public void onComplete () {
370- }
371- }
372-
373403 /**
374404 * Sets unimplemented status for method on given response stream for unary call.
375405 *
0 commit comments