Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;

/**
* Client for reading from and writing to existing Bigtable tables.
Expand Down Expand Up @@ -128,6 +129,19 @@ public static BigtableDataClient create(BigtableDataSettings settings) throws IO
this.stub = stub;
}


/** @deprecated Use {@link #readSingleRowAsync(String, String)} */
@Deprecated
public ApiFuture<Row> readRowAsync(String tableId, String rowKey) {
return readRowAsync(tableId, rowKey);
}

/** @deprecated Use {@link #readSingleRowAsync(String, ByteString)} */
@Deprecated
public ApiFuture<Row> readRowAsync(String tableId, ByteString rowKey) {
return readRowAsync(tableId, rowKey);
}

/**
* Convenience method for asynchronously reading a single row. If the row does not exist, the
* future's value will be null.
Expand All @@ -143,8 +157,8 @@ public static BigtableDataClient create(BigtableDataSettings settings) throws IO
* }
* }</pre>
*/
public ApiFuture<Row> readRowAsync(String tableId, String rowKey) {
return readRowAsync(tableId, ByteString.copyFromUtf8(rowKey));
public ApiFuture<Row> readSingleRowAsync(String tableId, String rowKey) {
return readSingleRowAsync(tableId, ByteString.copyFromUtf8(rowKey));
}

/**
Expand All @@ -162,8 +176,33 @@ public ApiFuture<Row> readRowAsync(String tableId, String rowKey) {
* }
* }</pre>
*/
public ApiFuture<Row> readRowAsync(String tableId, ByteString rowKey) {
return readRowsCallable().first().futureCall(Query.create(tableId).rowKey(rowKey));
public ApiFuture<Row> readSingleRowAsync(String tableId, ByteString rowKey) {
return stub.readSingleRowCallable().futureCall(Query.create(tableId).rowKey(rowKey));
}

/**
* Read the first result from the query.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableClient bigtableClient = BigtableClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .rowKey("[ROW KEY]")
*
* Row row = bigtableClient.readSingleRowsCallable().call(query);
* }
* }</pre>
*
* @see UnaryCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
public UnaryCallable<Query, Row> readSingleRowCallable() {
return stub.readSingleRowCallable();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final RequestContext requestContext;

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readSingleRowCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -150,7 +151,14 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
this.requestContext =
RequestContext.create(settings.getInstanceName(), settings.getAppProfileId());

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowsCallable = createReadRowsCallable(settings.readRowsSettings(), new DefaultRowAdapter());
readSingleRowCallable = createReadRowsCallable(
ServerStreamingCallSettings.<Query,Row>newBuilder()
.setRetryableCodes(settings.readSingleRowSettings().getRetryableCodes())
.setRetrySettings(settings.readSingleRowSettings().getRetrySettings())
.setIdleTimeout(settings.readSingleRowSettings().getRetrySettings().getTotalTimeout())
.build(),
new DefaultRowAdapter()).first();
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand All @@ -176,6 +184,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
*/
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(settings.readRowsSettings(), rowAdapter);
}

/**
* Helper to create the callable chain for both point reads and streaming reads.
* @see #createReadRowsCallable(RowAdapter)
*/
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<Query, Row> settings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
Expand All @@ -185,9 +202,9 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
.setRetryableCodes(settings.getRetryableCodes())
.setRetrySettings(settings.getRetrySettings())
.setIdleTimeout(settings.getIdleTimeout())
.build();

// Retry logic is split into 2 parts to workaround a rare edge case described in
Expand Down Expand Up @@ -360,6 +377,10 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
return readRowsCallable;
}

public UnaryCallable<Query, Row> readSingleRowCallable() {
return readSingleRowCallable;
}

public UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable() {
return sampleRowKeysCallable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.internal.DummyBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.InstanceName;
Expand All @@ -38,6 +39,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -111,6 +113,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readSingleRowSettings;
private final UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings;
private final UnaryCallSettings<RowMutation, Void> mutateRowSettings;
private final BatchingCallSettings<RowMutation, Void> bulkMutateRowsSettings;
Expand All @@ -119,11 +122,20 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS

private EnhancedBigtableStubSettings(Builder builder) {
super(builder);

// Since point reads & streaming reads share the same base callable that converts grpc errors
// into ApiExceptions, they must have the same retry codes.
Preconditions.checkState(
builder.readRowsSettings.getRetryableCodes().equals(builder.readRowsSettings.getRetryableCodes()),
"SingleReadRow retry codes must match ReadRows retry codes"
);

instanceName = builder.instanceName;
appProfileId = builder.appProfileId;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
readSingleRowSettings = builder.readSingleRowSettings.build();
sampleRowKeysSettings = builder.sampleRowKeysSettings.build();
mutateRowSettings = builder.mutateRowSettings.build();
bulkMutateRowsSettings = builder.bulkMutateRowsSettings.build();
Expand Down Expand Up @@ -157,6 +169,11 @@ public ServerStreamingCallSettings<Query, Row> readRowsSettings() {
return readRowsSettings;
}

/** Returns the object with the settings used for point reads via ReadRows. */
public UnaryCallSettings<Query, Row> readSingleRowSettings() {
return readSingleRowSettings;
}

/** Returns the object with the settings used for calls to SampleRowKeys. */
public UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings() {
return sampleRowKeysSettings;
Expand Down Expand Up @@ -199,6 +216,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readSingleRowSettings;
private final UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings;
private final UnaryCallSettings.Builder<RowMutation, Void> mutateRowSettings;
private final BatchingCallSettings.Builder<RowMutation, Void> bulkMutateRowsSettings;
Expand Down Expand Up @@ -233,18 +251,25 @@ private Builder() {

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
/* TODO: copy timeouts, retryCodes & retrySettings from baseSettings.readRows once it exists in GAPIC */
readRowsSettings
.setRetryableCodes(DEFAULT_RETRY_CODES)
.setRetrySettings(
DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofHours(1)).build())
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
.setRetrySettings(baseDefaults.readRowsSettings().getRetrySettings())
.setIdleTimeout(Duration.ofMinutes(5));

// Point reads should use same defaults as streaming reads, but with a shorter timeout
readSingleRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
readSingleRowSettings
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
.setRetrySettings(
baseDefaults.readRowsSettings().getRetrySettings().toBuilder()
.setTotalTimeout(DEFAULT_RETRY_SETTINGS.getTotalTimeout())
.build()
);

sampleRowKeysSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
/* TODO: copy retryCodes & retrySettings from baseSettings.sampleRowKeysSettings once it exists in GAPIC */
sampleRowKeysSettings
.setRetryableCodes(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED)
.setRetrySettings(DEFAULT_RETRY_SETTINGS);
.setRetryableCodes(baseDefaults.sampleRowKeysSettings().getRetryableCodes())
.setRetrySettings(baseDefaults.sampleRowKeysSettings().getRetrySettings());

mutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
copyRetrySettings(baseDefaults.mutateRowSettings(), mutateRowSettings);
Expand Down Expand Up @@ -281,6 +306,7 @@ private Builder(EnhancedBigtableStubSettings settings) {

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
readSingleRowSettings = settings.readSingleRowSettings.toBuilder();
sampleRowKeysSettings = settings.sampleRowKeysSettings.toBuilder();
mutateRowSettings = settings.mutateRowSettings.toBuilder();
bulkMutateRowsSettings = settings.bulkMutateRowsSettings.toBuilder();
Expand Down Expand Up @@ -338,6 +364,11 @@ public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
}

/** Returns the builder for the settings used for calls to readSingleRows. */
public UnaryCallSettings.Builder<Query, Row> readSingleRowSettings() {
return readSingleRowSettings;
}

/** Returns the builder for the settings used for calls to SampleRowKeysSettings. */
public UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings() {
return sampleRowKeysSettings;
Expand Down