3030import com .google .common .util .concurrent .FutureCallback ;
3131import com .google .common .util .concurrent .Futures ;
3232import com .google .common .util .concurrent .SettableFuture ;
33+ import com .google .protobuf .Empty ;
34+ import com .google .pubsub .v1 .AcknowledgeRequest ;
35+ import com .google .pubsub .v1 .ModifyAckDeadlineRequest ;
3336import com .google .pubsub .v1 .StreamingPullRequest ;
3437import com .google .pubsub .v1 .StreamingPullResponse ;
3538import com .google .pubsub .v1 .SubscriberGrpc .SubscriberStub ;
3639import io .grpc .Status ;
3740import io .grpc .stub .ClientCallStreamObserver ;
3841import io .grpc .stub .ClientResponseObserver ;
42+ import io .grpc .stub .StreamObserver ;
3943import java .util .ArrayList ;
4044import java .util .Deque ;
4145import java .util .List ;
@@ -56,9 +60,10 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
5660
5761 private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration .ofMillis (100 );
5862 private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration .ofSeconds (10 );
59- private static final int MAX_PER_REQUEST_CHANGES = 10000 ;
63+ private static final int MAX_PER_REQUEST_CHANGES = 1000 ;
64+ private static final Duration UNARY_TIMEOUT = Duration .ofSeconds (60 );
6065
61- private final SubscriberStub asyncStub ;
66+ private final SubscriberStub stub ;
6267 private final String subscription ;
6368 private final ScheduledExecutorService systemExecutor ;
6469 private final MessageDispatcher messageDispatcher ;
@@ -75,15 +80,15 @@ public StreamingSubscriberConnection(
7580 Duration ackExpirationPadding ,
7681 Duration maxAckExtensionPeriod ,
7782 Distribution ackLatencyDistribution ,
78- SubscriberStub asyncStub ,
83+ SubscriberStub stub ,
7984 FlowController flowController ,
8085 Deque <MessageDispatcher .OutstandingMessageBatch > outstandingMessageBatches ,
8186 ScheduledExecutorService executor ,
8287 ScheduledExecutorService systemExecutor ,
8388 ApiClock clock ) {
8489 this .subscription = subscription ;
8590 this .systemExecutor = systemExecutor ;
86- this .asyncStub = asyncStub ;
91+ this .stub = stub ;
8792 this .messageDispatcher =
8893 new MessageDispatcher (
8994 receiver ,
@@ -185,8 +190,7 @@ private void initialize() {
185190 final ClientResponseObserver <StreamingPullRequest , StreamingPullResponse > responseObserver =
186191 new StreamingPullResponseObserver (errorFuture );
187192 final ClientCallStreamObserver <StreamingPullRequest > requestObserver =
188- (ClientCallStreamObserver <StreamingPullRequest >)
189- (asyncStub .streamingPull (responseObserver ));
193+ (ClientCallStreamObserver <StreamingPullRequest >) (stub .streamingPull (responseObserver ));
190194 logger .log (
191195 Level .FINER ,
192196 "Initializing stream to subscription {0}" ,subscription );
@@ -260,24 +264,52 @@ public void run() {
260264 }
261265
262266 private boolean isAlive () {
263- return state () == State .RUNNING || state () == State .STARTING ;
267+ State state = state (); // Read the state only once.
268+ return state == State .RUNNING || state == State .STARTING ;
264269 }
265270
266271 @ Override
267272 public void sendAckOperations (
268273 List <String > acksToSend , List <PendingModifyAckDeadline > ackDeadlineExtensions ) {
269- List < StreamingPullRequest > requests =
270- partitionAckOperations ( acksToSend , ackDeadlineExtensions , MAX_PER_REQUEST_CHANGES );
271- lock . lock ();
272- try {
273- for ( StreamingPullRequest request : requests ) {
274- requestObserver . onNext ( request );
274+ SubscriberStub timeoutStub =
275+ stub . withDeadlineAfter ( UNARY_TIMEOUT . toMillis (), TimeUnit . MILLISECONDS );
276+ StreamObserver < Empty > loggingObserver = new StreamObserver < Empty >() {
277+ @ Override
278+ public void onCompleted ( ) {
279+ // noop
275280 }
276- } catch (Exception e ) {
277- Level level = isAlive () ? Level .WARNING : Level .FINER ;
278- logger .log (level , "failed to send ack operations" , e );
279- } finally {
280- lock .unlock ();
281+
282+ @ Override
283+ public void onNext (Empty e ) {
284+ // noop
285+ }
286+
287+ @ Override
288+ public void onError (Throwable t ) {
289+ Level level = isAlive () ? Level .WARNING : Level .FINER ;
290+ logger .log (level , "failed to send operations" , t );
291+ }
292+ };
293+
294+ for (PendingModifyAckDeadline modack : ackDeadlineExtensions ) {
295+ for (List <String > idChunk : Lists .partition (modack .ackIds , MAX_PER_REQUEST_CHANGES )) {
296+ timeoutStub .modifyAckDeadline (
297+ ModifyAckDeadlineRequest .newBuilder ()
298+ .setSubscription (subscription )
299+ .addAllAckIds (idChunk )
300+ .setAckDeadlineSeconds (modack .deadlineExtensionSeconds )
301+ .build (),
302+ loggingObserver );
303+ }
304+ }
305+
306+ for (List <String > idChunk : Lists .partition (acksToSend , MAX_PER_REQUEST_CHANGES )) {
307+ timeoutStub .acknowledge (
308+ AcknowledgeRequest .newBuilder ()
309+ .setSubscription (subscription )
310+ .addAllAckIds (idChunk )
311+ .build (),
312+ loggingObserver );
281313 }
282314 }
283315
0 commit comments