Skip to content

Commit d70feec

Browse files
authored
pubsub: remove polling implementation (#3040)
There has been no way to use it since Aug 2017 and removing it will make moving the GAPIC stub easier.
1 parent ab231c9 commit d70feec

File tree

3 files changed

+2
-303
lines changed

3 files changed

+2
-303
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 0 additions & 224 deletions
This file was deleted.

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 1 addition & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,10 @@ public class Subscriber extends AbstractApiService {
122122
private final List<Channel> channels;
123123
private final MessageReceiver receiver;
124124
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
125-
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
126125
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
127126
new LinkedList<>();
128127
private final ApiClock clock;
129128
private final List<AutoCloseable> closeables = new ArrayList<>();
130-
private final boolean useStreaming;
131129
private ScheduledFuture<?> ackDeadlineUpdater;
132130

133131
private Subscriber(Builder builder) {
@@ -195,8 +193,6 @@ public void close() throws IOException {
195193
numChannels = builder.parallelPullCount;
196194
channels = new ArrayList<>(numChannels);
197195
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
198-
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
199-
useStreaming = builder.useStreaming;
200196
}
201197

202198
/**
@@ -310,11 +306,7 @@ protected void doStart() {
310306
@Override
311307
public void run() {
312308
try {
313-
if (useStreaming) {
314309
startStreamingConnections();
315-
} else {
316-
startPollingConnections();
317-
}
318310
notifyStarted();
319311
} catch (Throwable t) {
320312
notifyFailed(t);
@@ -327,7 +319,6 @@ public void run() {
327319
@Override
328320
protected void doStop() {
329321
// stop connection is no-op if connections haven't been started.
330-
stopAllPollingConnections();
331322
stopAllStreamingConnections();
332323
try {
333324
for (AutoCloseable closeable : closeables) {
@@ -339,64 +330,6 @@ protected void doStop() {
339330
}
340331
}
341332

342-
private void startPollingConnections() throws IOException {
343-
synchronized (pollingSubscriberConnections) {
344-
Credentials credentials = credentialsProvider.getCredentials();
345-
CallCredentials callCredentials =
346-
credentials == null ? null : MoreCallCredentials.from(credentials);
347-
348-
SubscriberGrpc.SubscriberBlockingStub getSubStub =
349-
SubscriberGrpc.newBlockingStub(channels.get(0))
350-
.withDeadlineAfter(
351-
PollingSubscriberConnection.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
352-
if (callCredentials != null) {
353-
getSubStub = getSubStub.withCallCredentials(callCredentials);
354-
}
355-
Subscription subscriptionInfo =
356-
getSubStub.getSubscription(
357-
GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build());
358-
359-
for (Channel channel : channels) {
360-
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel);
361-
if (callCredentials != null) {
362-
stub = stub.withCallCredentials(callCredentials);
363-
}
364-
pollingSubscriberConnections.add(
365-
new PollingSubscriberConnection(
366-
subscriptionInfo,
367-
receiver,
368-
ackExpirationPadding,
369-
maxAckExtensionPeriod,
370-
ackLatencyDistribution,
371-
stub,
372-
flowController,
373-
flowControlSettings.getMaxOutstandingElementCount(),
374-
outstandingMessageBatches,
375-
executor,
376-
alarmsExecutor,
377-
clock));
378-
}
379-
startConnections(
380-
pollingSubscriberConnections,
381-
new Listener() {
382-
@Override
383-
public void failed(State from, Throwable failure) {
384-
// If a connection failed is because of a fatal error, we should fail the
385-
// whole subscriber.
386-
stopAllPollingConnections();
387-
try {
388-
notifyFailed(failure);
389-
} catch (IllegalStateException e) {
390-
if (isRunning()) {
391-
throw e;
392-
}
393-
// It could happen that we are shutting down while some channels fail.
394-
}
395-
}
396-
});
397-
}
398-
}
399-
400333
private void startStreamingConnections() throws IOException {
401334
synchronized (streamingSubscriberConnections) {
402335
Credentials credentials = credentialsProvider.getCredentials();
@@ -443,10 +376,6 @@ public void failed(State from, Throwable failure) {
443376
}
444377
}
445378

446-
private void stopAllPollingConnections() {
447-
stopConnections(pollingSubscriberConnections);
448-
}
449-
450379
private void stopAllStreamingConnections() {
451380
stopConnections(streamingSubscriberConnections);
452381
if (ackDeadlineUpdater != null) {
@@ -525,7 +454,6 @@ public static final class Builder {
525454
CredentialsProvider credentialsProvider =
526455
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
527456
Optional<ApiClock> clock = Optional.absent();
528-
boolean useStreaming = true;
529457
int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE;
530458

531459
Builder(String subscriptionName, MessageReceiver receiver) {
@@ -630,7 +558,7 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
630558
}
631559

632560
/**
633-
* Gives the ability to set a custom executor for polling and managing lease extensions. If none
561+
* Gives the ability to set a custom executor for managing lease extensions. If none
634562
* is provided a shared one will be used by all {@link Subscriber} instances.
635563
*/
636564
public Builder setSystemExecutorProvider(ExecutorProvider executorProvider) {
@@ -653,11 +581,6 @@ Builder setClock(ApiClock clock) {
653581
return this;
654582
}
655583

656-
Builder setUseStreaming(boolean useStreaming) {
657-
this.useStreaming = useStreaming;
658-
return this;
659-
}
660-
661584
public Subscriber build() {
662585
return new Subscriber(this);
663586
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
141141
}
142142

143143
private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception {
144-
Subscriber subscriber = testSubscriberBuilder.setUseStreaming(true).build();
144+
Subscriber subscriber = testSubscriberBuilder.build();
145145
subscriber.startAsync().awaitRunning();
146146
return subscriber;
147147
}

0 commit comments

Comments
 (0)