Skip to content

Commit 128c75b

Browse files
committed
Clean up for Spanner before merging to master
- Add TransportChannelProvider and GrpcInterceptorProvider in SpannerOperations GapicSpannerRpc can be configured through this - Exposes SpannerInterceptorProvider for testing - Make SpannerInterceptorProvider configurable - Remove GrpcSpannerRpc and RpcChannelFactory - Make streaming calls honor preferedChunks through StreamController and ResponseObserver
1 parent 0de6267 commit 128c75b

File tree

13 files changed

+225
-916
lines changed

13 files changed

+225
-916
lines changed

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
6969
super(
7070
checkNotNull(session),
7171
checkNotNull(bound),
72-
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
72+
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
7373
spanner.getOptions().getPrefetchChunks());
7474
this.sessionName = session.getName();
7575
this.options = session.getOptions();
@@ -82,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
8282
checkNotNull(session),
8383
checkNotNull(batchTransactionId).getTransactionId(),
8484
batchTransactionId.getTimestamp(),
85-
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
85+
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
8686
spanner.getOptions().getPrefetchChunks());
8787
this.sessionName = session.getName();
8888
this.options = session.getOptions();

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
139139
}
140140

141141
private final Random random = new Random();
142-
private final SpannerRpc rawGrpcRpc;
143142
private final SpannerRpc gapicRpc;
144143
private final int defaultPrefetchChunks;
145144

@@ -153,12 +152,10 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
153152
private boolean spannerIsClosed = false;
154153

155154
SpannerImpl(
156-
SpannerRpc rawGrpcRpc,
157155
SpannerRpc gapicRpc,
158156
int defaultPrefetchChunks,
159157
SpannerOptions options) {
160158
super(options);
161-
this.rawGrpcRpc = rawGrpcRpc;
162159
this.gapicRpc = gapicRpc;
163160
this.defaultPrefetchChunks = defaultPrefetchChunks;
164161
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc);
@@ -169,7 +166,6 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
169166
SpannerImpl(SpannerOptions options) {
170167
this(
171168
options.getSpannerRpcV1(),
172-
options.getGapicSpannerRpc(),
173169
options.getPrefetchChunks(),
174170
options);
175171
}
@@ -336,12 +332,10 @@ public void close() {
336332
} catch (InterruptedException | ExecutionException e) {
337333
throw SpannerExceptionFactory.newSpannerException(e);
338334
}
339-
for (ManagedChannel channel : getOptions().getRpcChannels()) {
340-
try {
341-
channel.shutdown();
342-
} catch (RuntimeException e) {
343-
logger.log(Level.WARNING, "Failed to close channel", e);
344-
}
335+
try {
336+
gapicRpc.shutdown();
337+
} catch (RuntimeException e) {
338+
logger.log(Level.WARNING, "Failed to close channel", e);
345339
}
346340
}
347341

@@ -1064,21 +1058,24 @@ ResultSet executeQueryInternalWithOptions(
10641058
final int prefetchChunks =
10651059
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
10661060
ResumableStreamIterator stream =
1067-
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
1061+
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
10681062
@Override
10691063
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
1070-
return new CloseableServerStreamIterator<PartialResultSet>(
1064+
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
1065+
SpannerRpc.StreamingCall call =
10711066
rpc.executeQuery(
10721067
resumeToken == null
10731068
? request
10741069
: request.toBuilder().setResumeToken(resumeToken).build(),
1075-
null,
1076-
session.options));
1077-
1078-
// TODO(hzyi): make resume work
1079-
// Let resume fail for now. Gapic has its own resume, but in order not
1080-
// to introduce too much change at a time, we decide to plumb up
1081-
// ServerStream first and then figure out how to make resume work
1070+
stream.consumer(),
1071+
session.options);
1072+
// StreamController does not auto-request 1 message. Kick it off mannually
1073+
call.request(1);
1074+
if (prefetchChunks > 1) {
1075+
call.request(prefetchChunks - 1);
1076+
}
1077+
stream.setCall(call);
1078+
return stream;
10821079
}
10831080
};
10841081
return new GrpcResultSet(stream, this, queryMode);
@@ -1178,18 +1175,21 @@ ResultSet readInternalWithOptions(
11781175
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
11791176
@Override
11801177
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
1181-
return new CloseableServerStreamIterator<PartialResultSet>(
1178+
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
1179+
SpannerRpc.StreamingCall call =
11821180
rpc.read(
11831181
resumeToken == null
11841182
? request
11851183
: request.toBuilder().setResumeToken(resumeToken).build(),
1186-
null,
1187-
session.options));
1188-
1189-
// TODO(hzyi): make resume work
1190-
// Let resume fail for now. Gapic has its own resume, but in order not
1191-
// to introduce too much change at a time, we decide to plumb up
1192-
// ServerStream first and then figure out how to make resume work
1184+
stream.consumer(),
1185+
session.options);
1186+
// StreamController does not auto-request 1 message. Kick it off mannually
1187+
call.request(1);
1188+
if (prefetchChunks > 1) {
1189+
call.request(prefetchChunks - 1);
1190+
}
1191+
stream.setCall(call);
1192+
return stream;
11931193
}
11941194
};
11951195
GrpcResultSet resultSet =

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 43 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.gax.grpc.GrpcInterceptorProvider;
20+
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
21+
import com.google.api.gax.rpc.TransportChannelProvider;
1922
import com.google.cloud.ServiceDefaults;
2023
import com.google.cloud.ServiceOptions;
2124
import com.google.cloud.ServiceRpc;
2225
import com.google.cloud.TransportOptions;
2326
import com.google.cloud.grpc.GrpcTransportOptions;
2427
import com.google.cloud.spanner.spi.SpannerRpcFactory;
2528
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
26-
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
2729
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2830
import com.google.common.base.MoreObjects;
2931
import com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
5355
"https://www.googleapis.com/auth/spanner.admin",
5456
"https://www.googleapis.com/auth/spanner.data");
5557
private static final int MAX_CHANNELS = 256;
56-
private static final RpcChannelFactory DEFAULT_RPC_CHANNEL_FACTORY = new NettyRpcChannelFactory();
58+
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
59+
private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes
5760

5861
/** Default implementation of {@code SpannerFactory}. */
5962
private static class DefaultSpannerFactory implements SpannerFactory {
@@ -71,29 +74,28 @@ private static class DefaultSpannerRpcFactory implements SpannerRpcFactory {
7174

7275
@Override
7376
public ServiceRpc create(SpannerOptions options) {
74-
return new GrpcSpannerRpc(options);
77+
return new GapicSpannerRpc(options);
7578
}
7679
}
7780

78-
private final List<ManagedChannel> rpcChannels;
81+
private final TransportChannelProvider channelProvider;
82+
private final GrpcInterceptorProvider interceptorProvider;
7983
private final SessionPoolOptions sessionPoolOptions;
8084
private final int prefetchChunks;
8185
private final int numChannels;
8286
private final ImmutableMap<String, String> sessionLabels;
8387

8488
private SpannerOptions(Builder builder) {
8589
super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults());
86-
numChannels = builder.numChannels;
87-
String userAgent = getUserAgent();
88-
RpcChannelFactory defaultRpcChannelFactory =
89-
userAgent == null
90-
? DEFAULT_RPC_CHANNEL_FACTORY
91-
: new NettyRpcChannelFactory(userAgent);
92-
rpcChannels =
93-
createChannels(
94-
getHost(),
95-
MoreObjects.firstNonNull(builder.rpcChannelFactory, defaultRpcChannelFactory),
96-
numChannels);
90+
numChannels = builder.numChannels;
91+
Preconditions.checkArgument(
92+
numChannels >= 1 && numChannels <= MAX_CHANNELS,
93+
"Number of channels must fall in the range [1, %s], found: %s",
94+
MAX_CHANNELS,
95+
numChannels);
96+
97+
channelProvider = builder.channelProvider;
98+
interceptorProvider = builder.interceptorProvider;
9799
sessionPoolOptions =
98100
builder.sessionPoolOptions != null
99101
? builder.sessionPoolOptions
@@ -107,10 +109,11 @@ public static class Builder
107109
extends ServiceOptions.Builder<
108110
Spanner, SpannerOptions, SpannerOptions.Builder> {
109111
private static final int DEFAULT_PREFETCH_CHUNKS = 4;
110-
private RpcChannelFactory rpcChannelFactory;
112+
private TransportChannelProvider channelProvider;
113+
private GrpcInterceptorProvider interceptorProvider;
114+
111115
/** By default, we create 4 channels per {@link SpannerOptions} */
112116
private int numChannels = 4;
113-
114117
private int prefetchChunks = DEFAULT_PREFETCH_CHUNKS;
115118
private SessionPoolOptions sessionPoolOptions;
116119
private ImmutableMap<String, String> sessionLabels;
@@ -123,6 +126,8 @@ private Builder() {}
123126
this.sessionPoolOptions = options.sessionPoolOptions;
124127
this.prefetchChunks = options.prefetchChunks;
125128
this.sessionLabels = options.sessionLabels;
129+
this.channelProvider = options.channelProvider;
130+
this.interceptorProvider = options.interceptorProvider;
126131
}
127132

128133
@Override
@@ -134,9 +139,21 @@ public Builder setTransportOptions(TransportOptions transportOptions) {
134139
return super.setTransportOptions(transportOptions);
135140
}
136141

137-
/** Sets the factory for creating gRPC channels. If not set, a default will be used. */
138-
public Builder setRpcChannelFactory(RpcChannelFactory factory) {
139-
this.rpcChannelFactory = factory;
142+
/**
143+
* Sets the {@code ChannelProvider}. {@link GapicSpannerRpc} would create a default
144+
* one if none is provided.
145+
*/
146+
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
147+
this.channelProvider = channelProvider;
148+
return this;
149+
}
150+
151+
/**
152+
* Sets the {@code GrpcInterceptorProvider}. {@link GapicSpannerRpc} would create
153+
* a default one if none is provided.
154+
*/
155+
public Builder setInterceptorProvider(GrpcInterceptorProvider interceptorProvider) {
156+
this.interceptorProvider = interceptorProvider;
140157
return this;
141158
}
142159

@@ -197,14 +214,6 @@ public SpannerOptions build() {
197214
}
198215
}
199216

200-
/**
201-
* Interface for gRPC channel creation. Most users won't need to use this, as the default covers
202-
* typical deployment scenarios.
203-
*/
204-
public interface RpcChannelFactory {
205-
ManagedChannel newChannel(String host, int port);
206-
}
207-
208217
/** Returns default instance of {@code SpannerOptions}. */
209218
public static SpannerOptions getDefaultInstance() {
210219
return newBuilder().build();
@@ -214,8 +223,12 @@ public static Builder newBuilder() {
214223
return new Builder();
215224
}
216225

217-
public List<ManagedChannel> getRpcChannels() {
218-
return rpcChannels;
226+
public TransportChannelProvider getChannelProvider() {
227+
return channelProvider;
228+
}
229+
230+
public GrpcInterceptorProvider getInterceptorProvider() {
231+
return interceptorProvider;
219232
}
220233

221234
public int getNumChannels() {
@@ -238,88 +251,11 @@ public static GrpcTransportOptions getDefaultGrpcTransportOptions() {
238251
return GrpcTransportOptions.newBuilder().build();
239252
}
240253

241-
/**
242-
* Returns the default RPC channel factory used when none is specified. This may be useful for
243-
* callers that wish to add interceptors to gRPC channels used by the Cloud Spanner client
244-
* library.
245-
*/
246-
public static RpcChannelFactory getDefaultRpcChannelFactory() {
247-
return DEFAULT_RPC_CHANNEL_FACTORY;
248-
}
249-
250254
@Override
251255
protected String getDefaultHost() {
252256
return DEFAULT_HOST;
253257
}
254258

255-
private static List<ManagedChannel> createChannels(
256-
String rootUrl, RpcChannelFactory factory, int numChannels) {
257-
Preconditions.checkArgument(
258-
numChannels >= 1 && numChannels <= MAX_CHANNELS,
259-
"Number of channels must fall in the range [1, %s], found: %s",
260-
MAX_CHANNELS,
261-
numChannels);
262-
ImmutableList.Builder<ManagedChannel> builder = ImmutableList.builder();
263-
for (int i = 0; i < numChannels; i++) {
264-
builder.add(createChannel(rootUrl, factory));
265-
}
266-
return builder.build();
267-
}
268-
269-
private static ManagedChannel createChannel(String rootUrl, RpcChannelFactory factory) {
270-
URL url;
271-
try {
272-
url = new URL(rootUrl);
273-
} catch (MalformedURLException e) {
274-
throw new IllegalArgumentException("Invalid host: " + rootUrl, e);
275-
}
276-
ManagedChannel channel =
277-
factory.newChannel(url.getHost(), url.getPort() > 0 ? url.getPort() : url.getDefaultPort());
278-
return channel;
279-
}
280-
281-
static class NettyRpcChannelFactory implements RpcChannelFactory {
282-
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
283-
private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes
284-
private final String userAgent;
285-
private final List<ClientInterceptor> interceptors;
286-
287-
NettyRpcChannelFactory() {
288-
this(null);
289-
}
290-
291-
NettyRpcChannelFactory(String userAgent) {
292-
this(userAgent, ImmutableList.<ClientInterceptor>of());
293-
}
294-
295-
NettyRpcChannelFactory(String userAgent, List<ClientInterceptor> interceptors) {
296-
this.userAgent = userAgent;
297-
this.interceptors = interceptors;
298-
}
299-
300-
@Override
301-
public ManagedChannel newChannel(String host, int port) {
302-
NettyChannelBuilder builder =
303-
NettyChannelBuilder.forAddress(host, port)
304-
.sslContext(newSslContext())
305-
.intercept(interceptors)
306-
.maxHeaderListSize(MAX_HEADER_LIST_SIZE)
307-
.maxMessageSize(MAX_MESSAGE_SIZE);
308-
if (userAgent != null) {
309-
builder.userAgent(userAgent);
310-
}
311-
return builder.build();
312-
}
313-
314-
private static SslContext newSslContext() {
315-
try {
316-
return GrpcSslContexts.forClient().ciphers(null).build();
317-
} catch (SSLException e) {
318-
throw new RuntimeException("SSL configuration failed: " + e.getMessage(), e);
319-
}
320-
}
321-
}
322-
323259
private static class SpannerDefaults implements
324260
ServiceDefaults<Spanner, SpannerOptions> {
325261

@@ -348,10 +284,6 @@ protected SpannerRpc getSpannerRpcV1() {
348284
return (SpannerRpc) getRpc();
349285
}
350286

351-
protected SpannerRpc getGapicSpannerRpc() {
352-
return GapicSpannerRpc.create(this);
353-
}
354-
355287
@SuppressWarnings("unchecked")
356288
@Override
357289
public Builder toBuilder() {

0 commit comments

Comments
 (0)