Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -97,6 +99,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final String subscription;
private final SubscriptionName subscriptionNameObject;
private final ScheduledExecutorService systemExecutor;
private final ExecutorService eodAckCallbackExecutor;
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;
Expand Down Expand Up @@ -128,6 +131,7 @@ private StreamingSubscriberConnection(Builder builder) {
subscription = builder.subscription;
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
systemExecutor = builder.systemExecutor;
eodAckCallbackExecutor = builder.eodAckCallbackExecutor;

// We need to set the default stream ack deadline on the initial request, this will be
// updated by modack requests in the message dispatcher
Expand Down Expand Up @@ -455,7 +459,7 @@ private void sendAckOperations(
.setSubscription(subscription)
.addAllAckIds(ackIdsInRequest)
.build());
ApiFutures.addCallback(ackFuture, callback, directExecutor());
ApiFutures.addCallback(ackFuture, callback, getCallbackExecutor());
pendingOperations++;
}
ackOperationsWaiter.incrementPendingCount(pendingOperations);
Expand Down Expand Up @@ -504,7 +508,7 @@ private void sendModackOperations(
.addAllAckIds(ackIdsInRequest)
.setAckDeadlineSeconds(modackRequestData.getDeadlineExtensionSeconds())
.build());
ApiFutures.addCallback(modackFuture, callback, directExecutor());
ApiFutures.addCallback(modackFuture, callback, getCallbackExecutor());
pendingOperations++;
}
}
Expand Down Expand Up @@ -716,6 +720,13 @@ public void run() {
};
}

private Executor getCallbackExecutor() {
if (!getExactlyOnceDeliveryEnabled()) {
return directExecutor();
}
return eodAckCallbackExecutor;
}

/** Builder of {@link StreamingSubscriberConnection StreamingSubscriberConnections}. */
public static final class Builder {
private MessageReceiver receiver;
Expand All @@ -736,6 +747,7 @@ public static final class Builder {
private boolean useLegacyFlowControl;
private ScheduledExecutorService executor;
private ScheduledExecutorService systemExecutor;
private ExecutorService eodAckCallbackExecutor;
private ApiClock clock;

private boolean enableOpenTelemetryTracing;
Expand Down Expand Up @@ -826,6 +838,11 @@ public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
return this;
}

public Builder setEodAckCallbackExecutor(ExecutorService eodAckCallbackExecutor) {
this.eodAckCallbackExecutor = eodAckCallbackExecutor;
return this;
}

public Builder setClock(ApiClock clock) {
this.clock = clock;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -150,6 +151,9 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
// An instantiation of the SystemExecutorProvider used for processing acks
// and other system actions.
@Nullable private final ScheduledExecutorService alarmsExecutor;
// An executor used for handling ack and modack callbacks when exactly-once delivery is enabled.
private final ExecutorService eodAckCallbackExecutor;

private final Distribution ackLatencyDistribution =
new Distribution(Math.toIntExact(MAX_STREAM_ACK_DEADLINE.getSeconds()) + 1);

Expand Down Expand Up @@ -200,6 +204,15 @@ private Subscriber(Builder builder) {
backgroundResources.add(new ExecutorAsBackgroundResource((alarmsExecutor)));
}

// A cached thread pool will create new threads as needed but can reuse previously constructed
// threads when available, which helps to improve performance.
ThreadFactory eodAckCallbackThreadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Subscriber-EOD-CallbackExecutor-%d")
.build();
eodAckCallbackExecutor = Executors.newCachedThreadPool(eodAckCallbackThreadFactory);

TransportChannelProvider channelProvider = builder.channelProvider;
if (channelProvider.acceptsPoolSize()) {
channelProvider = channelProvider.withPoolSize(numPullers);
Expand Down Expand Up @@ -416,6 +429,7 @@ private void startStreamingConnections() {
.setUseLegacyFlowControl(useLegacyFlowControl)
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setEodAckCallbackExecutor(eodAckCallbackExecutor)
.setClock(clock)
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
.setTracer(tracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde
.setFlowController(mock(FlowController.class))
.setExecutor(executor)
.setSystemExecutor(systemExecutor)
.setEodAckCallbackExecutor(systemExecutor)
.setClock(clock)
.setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
.setMinDurationPerAckExtensionDefaultUsed(true)
Expand Down
Loading