Skip to content

Commit 2beb34e

Browse files
committed
add Publisher.Settings
This class is used to configure Publisher. User-visible options are made public. Options that should be automatically populated, like user credentials, are left package-private for now.
1 parent 5593253 commit 2beb34e

File tree

5 files changed

+226
-387
lines changed

5 files changed

+226
-387
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public static ListOption pageToken(String pageToken) {
221221
*/
222222
Future<AsyncPage<Topic>> listTopicsAsync(ListOption... options);
223223

224-
Publisher getPublisher(TopicInfo topic) throws IOException;
224+
Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException;
225225

226226
/**
227227
* Creates a new subscription.

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,13 @@ public Future<AsyncPage<Topic>> listTopicsAsync(ListOption... options) {
286286
}
287287

288288
@Override
289-
public Publisher getPublisher(TopicInfo topic) throws IOException {
290-
// TODO(pongad): Provide a way to pass in the rest of the options.
291-
String topicName =
292-
PublisherClient.formatTopicName(getOptions().getProjectId(), topic.getName());
293-
return Publisher.Builder.newBuilder(topicName)
294-
.setCredentials(getOptions().getCredentials())
295-
.build();
289+
public Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException {
290+
String topicName = PublisherClient.formatTopicName(getOptions().getProjectId(), topic);
291+
try {
292+
return new PublisherImpl(topicName, settings);
293+
} catch (IOException e) {
294+
throw new PubSubException(e, false);
295+
}
296296
}
297297

298298
@Override

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

Lines changed: 91 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import com.google.api.gax.grpc.BundlingSettings;
2121
import com.google.auth.Credentials;
2222
import com.google.auth.oauth2.GoogleCredentials;
23+
import com.google.auto.value.AutoValue;
2324
import com.google.common.base.Optional;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.util.concurrent.ListenableFuture;
2627
import com.google.pubsub.v1.PubsubMessage;
2728
import io.grpc.ManagedChannelBuilder;
28-
import java.io.IOException;
2929
import java.util.concurrent.ScheduledExecutorService;
3030
import org.joda.time.Duration;
3131

@@ -79,28 +79,6 @@
7979
* </pre>
8080
*/
8181
public interface Publisher {
82-
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
83-
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
84-
85-
// API limits.
86-
int MAX_BUNDLE_MESSAGES = 1000;
87-
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
88-
89-
// Meaningful defaults.
90-
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
91-
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
92-
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
93-
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
94-
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
95-
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
96-
97-
BundlingSettings DEFAULT_BUNDLING_SETTINGS =
98-
BundlingSettings.newBuilder()
99-
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
100-
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
101-
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
102-
.build();
103-
10482
/** Topic to which the publisher publishes to. */
10583
String getTopic();
10684

@@ -161,130 +139,115 @@ public interface Publisher {
161139
*/
162140
void shutdown();
163141

164-
/** A builder of {@link Publisher}s. */
165-
final class Builder {
166-
String topic;
142+
@AutoValue
143+
public abstract class Settings {
144+
static final String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
145+
static final String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
167146

168-
// Bundling options
169-
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
147+
// API limits.
148+
static final int MAX_BUNDLE_MESSAGES = 1000;
149+
static final int MAX_BUNDLE_BYTES =
150+
10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
170151

171-
// Client-side flow control options
172-
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
173-
boolean failOnFlowControlLimits = false;
152+
// Meaningful defaults.
153+
static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
154+
static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
155+
static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
156+
static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
157+
static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
158+
static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
174159

175-
// Send bundle deadline
176-
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
160+
static final BundlingSettings DEFAULT_BUNDLING_SETTINGS =
161+
BundlingSettings.newBuilder()
162+
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
163+
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
164+
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
165+
.build();
177166

178-
// RPC options
179-
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
167+
public static Settings DEFAULT = newBuilder().build();
180168

181-
// Channels and credentials
182-
Optional<Credentials> userCredentials = Optional.absent();
183-
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
184-
Optional.absent();
169+
public abstract BundlingSettings getBundlingSettings();
185170

186-
Optional<ScheduledExecutorService> executor = Optional.absent();
171+
public abstract FlowController.Settings getFlowControlSettings();
187172

188-
/** Constructs a new {@link Builder} using the given topic. */
189-
public static Builder newBuilder(String topic) {
190-
return new Builder(topic);
191-
}
173+
public abstract boolean getFailOnFlowControlLimits();
192174

193-
Builder(String topic) {
194-
this.topic = Preconditions.checkNotNull(topic);
195-
}
175+
abstract Duration getSendBundleDeadline();
196176

197-
/**
198-
* Credentials to authenticate with.
199-
*
200-
* <p>Must be properly scoped for accessing Cloud Pub/Sub APIs.
201-
*/
202-
public Builder setCredentials(Credentials userCredentials) {
203-
this.userCredentials = Optional.of(Preconditions.checkNotNull(userCredentials));
204-
return this;
205-
}
177+
abstract Duration getRequestTimeout();
206178

207-
/**
208-
* ManagedChannelBuilder to use to create Channels.
209-
*
210-
* <p>Must point at Cloud Pub/Sub endpoint.
211-
*/
212-
public Builder setChannelBuilder(
213-
ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder) {
214-
this.channelBuilder =
215-
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(
216-
Preconditions.checkNotNull(channelBuilder));
217-
return this;
218-
}
179+
abstract Optional<Credentials> getUserCredentials();
219180

220-
// Bundling options
221-
public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
222-
Preconditions.checkNotNull(bundlingSettings);
223-
Preconditions.checkNotNull(bundlingSettings.getElementCountThreshold());
224-
Preconditions.checkArgument(bundlingSettings.getElementCountThreshold() > 0);
225-
Preconditions.checkNotNull(bundlingSettings.getRequestByteThreshold());
226-
Preconditions.checkArgument(bundlingSettings.getRequestByteThreshold() > 0);
227-
Preconditions.checkNotNull(bundlingSettings.getDelayThreshold());
228-
Preconditions.checkArgument(bundlingSettings.getDelayThreshold().getMillis() > 0);
229-
230-
Preconditions.checkArgument(
231-
bundlingSettings.getElementCountLimit() == null,
232-
"elementCountLimit option not honored by current implementation");
233-
Preconditions.checkArgument(
234-
bundlingSettings.getRequestByteLimit() == null,
235-
"requestByteLimit option not honored by current implementation");
236-
Preconditions.checkArgument(
237-
bundlingSettings.getBlockingCallCountThreshold() == null,
238-
"blockingCallCountThreshold option not honored by current implementation");
239-
240-
this.bundlingSettings = bundlingSettings;
241-
return this;
242-
}
181+
abstract Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>
182+
getChannelBuilder();
243183

244-
// Flow control options
184+
abstract Optional<ScheduledExecutorService> getExecutor();
245185

246-
/** Sets the flow control settings. */
247-
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
248-
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
249-
return this;
186+
public static Builder newBuilder() {
187+
return new AutoValue_Publisher_Settings.Builder()
188+
.setFlowControlSettings(FlowController.Settings.DEFAULT)
189+
.setFailOnFlowControlLimits(false)
190+
.setSendBundleDeadline(MIN_SEND_BUNDLE_DURATION)
191+
.setRequestTimeout(DEFAULT_REQUEST_TIMEOUT)
192+
.setUserCredentials(Optional.<Credentials>absent())
193+
.setChannelBuilder(
194+
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>absent())
195+
.setExecutor(Optional.<ScheduledExecutorService>absent())
196+
.setBundlingSettings(DEFAULT_BUNDLING_SETTINGS);
250197
}
251198

252-
/**
253-
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
254-
* RequestByteMaxOutstandingReachedException} or {@link
255-
* ElementCountMaxOutstandingReachedException} as appropriate.
256-
*
257-
* <p>If set to false, then publish operations will block the current thread until the
258-
* outstanding requests go under the limits.
259-
*/
260-
public Builder setFailOnFlowControlLimits(boolean fail) {
261-
failOnFlowControlLimits = fail;
262-
return this;
263-
}
199+
@AutoValue.Builder
200+
abstract static class Builder {
201+
public abstract Builder setBundlingSettings(BundlingSettings value);
264202

265-
/** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */
266-
public Builder setSendBundleDeadline(Duration deadline) {
267-
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0);
268-
sendBundleDeadline = deadline;
269-
return this;
270-
}
203+
public abstract Builder setFlowControlSettings(FlowController.Settings value);
271204

272-
// Runtime options
273-
/** Time to wait for a publish call to return from the server. */
274-
public Builder setRequestTimeout(Duration timeout) {
275-
Preconditions.checkArgument(timeout.compareTo(MIN_REQUEST_TIMEOUT) >= 0);
276-
requestTimeout = timeout;
277-
return this;
278-
}
205+
public abstract Builder setFailOnFlowControlLimits(boolean value);
279206

280-
/** Gives the ability to set a custom executor to be used by the library. */
281-
public Builder setExecutor(ScheduledExecutorService executor) {
282-
this.executor = Optional.of(Preconditions.checkNotNull(executor));
283-
return this;
284-
}
207+
abstract Builder setSendBundleDeadline(Duration value);
208+
209+
abstract Builder setRequestTimeout(Duration value);
210+
211+
abstract Builder setUserCredentials(Optional<Credentials> value);
212+
213+
Builder setUserCredentials(Credentials value) {
214+
return setUserCredentials(Optional.of(value));
215+
}
216+
217+
abstract Builder setChannelBuilder(
218+
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> value);
219+
220+
Builder setChannelBuilder(ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> value) {
221+
return setChannelBuilder(
222+
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(value));
223+
}
224+
225+
abstract Builder setExecutor(Optional<ScheduledExecutorService> value);
226+
227+
Builder setExecutor(ScheduledExecutorService value) {
228+
return setExecutor(Optional.of(value));
229+
}
230+
231+
abstract Settings autoBuild();
232+
233+
public Settings build() {
234+
Settings settings = autoBuild();
235+
Preconditions.checkArgument(
236+
settings.getBundlingSettings().getElementCountLimit() == null,
237+
"elementCountLimit option not honored by current implementation");
238+
Preconditions.checkArgument(
239+
settings.getBundlingSettings().getRequestByteLimit() == null,
240+
"requestByteLimit option not honored by current implementation");
241+
Preconditions.checkArgument(
242+
settings.getBundlingSettings().getBlockingCallCountThreshold() == null,
243+
"blockingCallCountThreshold option not honored by current implementation");
285244

286-
public Publisher build() throws IOException {
287-
return new PublisherImpl(this);
245+
Preconditions.checkArgument(
246+
settings.getRequestTimeout().compareTo(MIN_REQUEST_TIMEOUT) >= 0);
247+
Preconditions.checkArgument(
248+
settings.getSendBundleDeadline().compareTo(MIN_SEND_BUNDLE_DURATION) >= 0);
249+
return settings;
250+
}
288251
}
289252
}
290253
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,29 +90,28 @@ final class PublisherImpl implements Publisher {
9090
private final Duration sendBundleDeadline;
9191
private ScheduledFuture<?> currentAlarmFuture;
9292

93-
PublisherImpl(Builder builder) throws IOException {
94-
topic = builder.topic;
95-
96-
maxBundleMessages = builder.bundlingSettings.getElementCountThreshold();
97-
maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold();
98-
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
93+
PublisherImpl(String topic, Settings settings) throws IOException {
94+
this.topic = topic;
95+
maxBundleMessages = settings.getBundlingSettings().getElementCountThreshold();
96+
maxBundleBytes = settings.getBundlingSettings().getRequestByteThreshold();
97+
maxBundleDuration = settings.getBundlingSettings().getDelayThreshold();
9998
hasBundlingBytes = maxBundleBytes > 0;
10099

101-
flowControlSettings = builder.flowControlSettings;
102-
failOnFlowControlLimits = builder.failOnFlowControlLimits;
100+
flowControlSettings = settings.getFlowControlSettings();
101+
failOnFlowControlLimits = settings.getFailOnFlowControlLimits();
103102
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);
104103

105-
sendBundleDeadline = builder.sendBundleDeadline;
104+
sendBundleDeadline = settings.getSendBundleDeadline();
106105

107-
requestTimeout = builder.requestTimeout;
106+
requestTimeout = settings.getRequestTimeout();
108107

109108
messagesBundle = new LinkedList<>();
110109
messagesBundleLock = new ReentrantLock();
111110
activeAlarm = new AtomicBoolean(false);
112111
int numCores = Math.max(1, Runtime.getRuntime().availableProcessors());
113112
executor =
114-
builder.executor.isPresent()
115-
? builder.executor.get()
113+
settings.getExecutor().isPresent()
114+
? settings.getExecutor().get()
116115
: Executors.newScheduledThreadPool(
117116
numCores * DEFAULT_MIN_THREAD_POOL_SIZE,
118117
new ThreadFactoryBuilder()
@@ -123,20 +122,20 @@ final class PublisherImpl implements Publisher {
123122
channelIndex = new AtomicLong(0);
124123
for (int i = 0; i < numCores; i++) {
125124
channels[i] =
126-
builder.channelBuilder.isPresent()
127-
? builder.channelBuilder.get().build()
128-
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
125+
settings.getChannelBuilder().isPresent()
126+
? settings.getChannelBuilder().get().build()
127+
: NettyChannelBuilder.forAddress(Publisher.Settings.PUBSUB_API_ADDRESS, 443)
129128
.negotiationType(NegotiationType.TLS)
130129
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
131130
.executor(executor)
132131
.build();
133132
}
134133
credentials =
135134
MoreCallCredentials.from(
136-
builder.userCredentials.isPresent()
137-
? builder.userCredentials.get()
135+
settings.getUserCredentials().isPresent()
136+
? settings.getUserCredentials().get()
138137
: GoogleCredentials.getApplicationDefault()
139-
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE)));
138+
.createScoped(Collections.singletonList(Publisher.Settings.PUBSUB_API_SCOPE)));
140139
shutdown = new AtomicBoolean(false);
141140
messagesWaiter = new MessagesWaiter();
142141
}

0 commit comments

Comments
 (0)