Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.paging.Page;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.BaseService;
import com.google.cloud.ByteArray;
Expand All @@ -36,6 +37,7 @@
import com.google.cloud.spanner.Options.ListOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -81,7 +83,6 @@
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractList;
Expand Down Expand Up @@ -134,7 +135,8 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
}

private final Random random = new Random();
private final SpannerRpc rpc;
private final SpannerRpc rawGrpcRpc;
private final SpannerRpc gapicRpc;
private final int defaultPrefetchChunks;

@GuardedBy("this")
Expand All @@ -146,16 +148,26 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
@GuardedBy("this")
private boolean spannerIsClosed = false;

SpannerImpl(SpannerRpc rpc, int defaultPrefetchChunks, SpannerOptions options) {
SpannerImpl(
SpannerRpc rawGrpcRpc,
SpannerRpc gapicRpc,
int defaultPrefetchChunks,
SpannerOptions options) {
super(options);
this.rpc = rpc;
this.rawGrpcRpc = rawGrpcRpc;
this.gapicRpc = gapicRpc;
this.defaultPrefetchChunks = defaultPrefetchChunks;
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), rpc);
this.instanceClient = new InstanceAdminClientImpl(options.getProjectId(), rpc, dbAdminClient);
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc);
this.instanceClient =
new InstanceAdminClientImpl(options.getProjectId(), gapicRpc, dbAdminClient);
}

SpannerImpl(SpannerOptions options) {
this(options.getSpannerRpcV1(), options.getPrefetchChunks(), options);
this(
options.getSpannerRpcV1(),
GapicSpannerRpc.create(options),
options.getPrefetchChunks(),
options);
}

private static ExponentialBackOff newBackOff() {
Expand Down Expand Up @@ -255,7 +267,8 @@ Session createSession(final DatabaseId db) throws SpannerException {
new Callable<com.google.spanner.v1.Session>() {
@Override
public com.google.spanner.v1.Session call() throws Exception {
return rpc.createSession(db.getName(), getOptions().getSessionLabels(), options);
return rawGrpcRpc.createSession(
db.getName(), getOptions().getSessionLabels(), options);
}
});
span.end();
Expand Down Expand Up @@ -794,7 +807,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
new Callable<CommitResponse>() {
@Override
public CommitResponse call() throws Exception {
return rpc.commit(request, options);
return rawGrpcRpc.commit(request, options);
}
});
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
Expand All @@ -816,7 +829,7 @@ public ReadContext singleUse() {

@Override
public ReadContext singleUse(TimestampBound bound) {
return setActive(new SingleReadContext(this, bound, rpc, defaultPrefetchChunks));
return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -826,7 +839,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
return setActive(new SingleUseReadOnlyTransaction(this, bound, rpc, defaultPrefetchChunks));
return setActive(
new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -836,12 +850,13 @@ public ReadOnlyTransaction readOnlyTransaction() {

@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
return setActive(new MultiUseReadOnlyTransaction(this, bound, rpc, defaultPrefetchChunks));
return setActive(
new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
}

@Override
public TransactionRunner readWriteTransaction() {
return setActive(new TransactionRunnerImpl(this, rpc, defaultPrefetchChunks));
return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks));
}

@Override
Expand All @@ -858,7 +873,7 @@ public void close() {
new Callable<Void>() {
@Override
public Void call() throws Exception {
rpc.deleteSession(name, options);
rawGrpcRpc.deleteSession(name, options);
return null;
}
});
Expand All @@ -884,7 +899,7 @@ ByteString beginTransaction() {
new Callable<Transaction>() {
@Override
public Transaction call() throws Exception {
return rpc.beginTransaction(request, options);
return rawGrpcRpc.beginTransaction(request, options);
}
});
if (txn.getId().isEmpty()) {
Expand Down Expand Up @@ -1041,20 +1056,14 @@ ResultSet executeQueryInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
stream.consumer(),
session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
return new CloseableServerStreamIterator<PartialResultSet>(rpc.executeQuery(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// let resume fail for now
}
};
return new GrpcResultSet(stream, this, queryMode);
Expand Down Expand Up @@ -1154,20 +1163,14 @@ ResultSet readInternalWithOptions(
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
SpannerRpc.StreamingCall call =
rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
stream.consumer(),
session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
}
stream.setCall(call);
return stream;
return new CloseableServerStreamIterator<PartialResultSet>(rpc.read(
resumeToken == null
? request
: request.toBuilder().setResumeToken(resumeToken).build(),
null,
session.options));

// let resume fail for now
}
};
GrpcResultSet resultSet =
Expand Down Expand Up @@ -2273,6 +2276,32 @@ interface CloseableIterator<T> extends Iterator<T> {
void close(@Nullable String message);
}

private static final class CloseableServerStreamIterator<T> implements CloseableIterator<T> {

private final ServerStream<T> stream;
private final Iterator<T> iterator;

public CloseableServerStreamIterator(ServerStream<T> stream) {
this.stream = stream;
this.iterator = stream.iterator();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
return iterator.next();
}

@Override
public void close(@Nullable String message) {
stream.cancel();
}
}

/** Adapts a streaming read/query call into an iterator over partial result sets. */
@VisibleForTesting
static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
Expand Down
Loading