Skip to content

Commit 36fbd97

Browse files
Bigtable: add a separate callable for point reads (#4264)
1 parent 31971fb commit 36fbd97

File tree

7 files changed

+355
-178
lines changed

7 files changed

+355
-178
lines changed

google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,88 @@ public ApiFuture<Row> readRowAsync(String tableId, ByteString rowKey, @Nullable
422422
if (filter != null) {
423423
query = query.filter(filter);
424424
}
425-
return readRowsCallable().first().futureCall(query);
425+
return readRowCallable().futureCall(query);
426+
}
427+
428+
/**
429+
* Reads a single row. The returned callable object allows for customization of api invocation.
430+
*
431+
* <p>Sample code:
432+
*
433+
* <pre>{@code
434+
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
435+
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
436+
* String tableId = "[TABLE]";
437+
*
438+
* Query query = Query.create(tableId)
439+
* .rowKey("[KEY]")
440+
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
441+
*
442+
* // Synchronous invocation
443+
* try {
444+
* Row row = bigtableDataClient.readRowCallable().call(query);
445+
* if (row == null) {
446+
* System.out.println("Row not found");
447+
* }
448+
* } catch (RuntimeException e) {
449+
* e.printStackTrace();
450+
* }
451+
*
452+
* // Asynchronous invocation
453+
* ApiFuture<Row> rowFuture = bigtableDataClient.readRowCallable().futureCall(query);
454+
*
455+
* ApiFutures.addCallback(rowFuture, new ApiFutureCallback<Row>() {
456+
* public void onFailure(Throwable t) {
457+
* if (t instanceof NotFoundException) {
458+
* System.out.println("Tried to read a non-existent table");
459+
* } else {
460+
* t.printStackTrace();
461+
* }
462+
* }
463+
* public void onSuccess(Row row) {
464+
* if (row == null) {
465+
* System.out.println("Row not found");
466+
* }
467+
* }
468+
* }, MoreExecutors.directExecutor());
469+
* }
470+
* }</pre>
471+
*
472+
* @see UnaryCallable For call styles.
473+
* @see Query For query options.
474+
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
475+
*/
476+
public UnaryCallable<Query, Row> readRowCallable() {
477+
return stub.readRowCallable();
478+
}
479+
480+
/**
481+
* Reads a single row. This callable allows for customization of the logical representation of a
482+
* row. It's meant for advanced use cases.
483+
*
484+
* <p>Sample code:
485+
*
486+
* <pre>{@code
487+
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
488+
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
489+
* String tableId = "[TABLE]";
490+
*
491+
* Query query = Query.create(tableId)
492+
* .rowKey("[KEY]")
493+
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
494+
*
495+
* // Synchronous invocation
496+
* CustomRow row = bigtableDataClient.readRowCallable(new CustomRowAdapter()).call(query));
497+
* // Do something with row
498+
* }
499+
* }</pre>
500+
*
501+
* @see ServerStreamingCallable For call styles.
502+
* @see Query For query options.
503+
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
504+
*/
505+
public <RowT> UnaryCallable<Query, RowT> readRowCallable(RowAdapter<RowT> rowAdapter) {
506+
return stub.createReadRowCallable(rowAdapter);
426507
}
427508

428509
/**

google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
2525
import com.google.cloud.bigtable.data.v2.internal.RowSetUtil;
2626
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
27+
import com.google.common.base.MoreObjects;
28+
import com.google.common.base.Objects;
2729
import com.google.common.base.Preconditions;
2830
import com.google.common.collect.ImmutableSortedSet;
2931
import com.google.common.collect.Lists;
@@ -264,4 +266,34 @@ private static ByteString wrapKey(String key) {
264266
}
265267
return ByteString.copyFromUtf8(key);
266268
}
269+
270+
@Override
271+
public boolean equals(Object o) {
272+
if (this == o) {
273+
return true;
274+
}
275+
if (o == null || getClass() != o.getClass()) {
276+
return false;
277+
}
278+
Query query = (Query) o;
279+
return Objects.equal(tableId, query.tableId)
280+
&& Objects.equal(builder.build(), query.builder.build());
281+
}
282+
283+
@Override
284+
public int hashCode() {
285+
return Objects.hashCode(tableId, builder.build());
286+
}
287+
288+
@Override
289+
public String toString() {
290+
ReadRowsRequest request = builder.build();
291+
292+
return MoreObjects.toStringHelper(this)
293+
.add("tableId", tableId)
294+
.add("keys", request.getRows().getRowKeysList())
295+
.add("ranges", request.getRows().getRowRangesList())
296+
.add("filter", request.getFilter())
297+
.toString();
298+
}
267299
}

google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
7474
private final RequestContext requestContext;
7575

7676
private final ServerStreamingCallable<Query, Row> readRowsCallable;
77+
private final UnaryCallable<Query, Row> readRowCallable;
7778
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
7879
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
7980
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
@@ -151,6 +152,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
151152
RequestContext.create(settings.getInstanceName(), settings.getAppProfileId());
152153

153154
readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
155+
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
154156
sampleRowKeysCallable = createSampleRowKeysCallable();
155157
mutateRowCallable = createMutateRowCallable();
156158
bulkMutateRowsCallable = createBulkMutateRowsCallable();
@@ -162,7 +164,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
162164
// <editor-fold desc="Callable creators">
163165

164166
/**
165-
* Creates a callable chain to handle ReadRows RPCs. The chain will:
167+
* Creates a callable chain to handle streaming ReadRows RPCs. The chain will:
166168
*
167169
* <ul>
168170
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
@@ -176,6 +178,48 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
176178
*/
177179
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
178180
RowAdapter<RowT> rowAdapter) {
181+
return createReadRowsCallable(settings.readRowsSettings(), rowAdapter);
182+
}
183+
184+
/**
185+
* Creates a callable chain to handle point ReadRows RPCs. The chain will:
186+
*
187+
* <ul>
188+
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
189+
* dispatch the RPC.
190+
* <li>Upon receiving the response stream, it will merge the {@link
191+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
192+
* implementation can be configured in by the {@code rowAdapter} parameter.
193+
* <li>Retry/resume on failure.
194+
* <li>Filter out marker rows.
195+
* </ul>
196+
*/
197+
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
198+
return createReadRowsCallable(
199+
ServerStreamingCallSettings.<Query, Row>newBuilder()
200+
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
201+
.setRetrySettings(settings.readRowSettings().getRetrySettings())
202+
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
203+
.build(),
204+
rowAdapter)
205+
.first();
206+
}
207+
208+
/**
209+
* Creates a callable chain to handle ReadRows RPCs. The chain will:
210+
*
211+
* <ul>
212+
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
213+
* dispatch the RPC.
214+
* <li>Upon receiving the response stream, it will merge the {@link
215+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
216+
* implementation can be configured in by the {@code rowAdapter} parameter.
217+
* <li>Retry/resume on failure.
218+
* <li>Filter out marker rows.
219+
* </ul>
220+
*/
221+
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
222+
ServerStreamingCallSettings<Query, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {
179223

180224
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
181225
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
@@ -185,9 +229,9 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
185229
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
186230
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
187231
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
188-
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
189-
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
190-
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
232+
.setRetryableCodes(readRowsSettings.getRetryableCodes())
233+
.setRetrySettings(readRowsSettings.getRetrySettings())
234+
.setIdleTimeout(readRowsSettings.getIdleTimeout())
191235
.build();
192236

193237
// Retry logic is split into 2 parts to workaround a rare edge case described in
@@ -356,10 +400,16 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
356400
// </editor-fold>
357401

358402
// <editor-fold desc="Callable accessors">
403+
/** Returns a streaming read rows callable */
359404
public ServerStreamingCallable<Query, Row> readRowsCallable() {
360405
return readRowsCallable;
361406
}
362407

408+
/** Return a point read callable */
409+
public UnaryCallable<Query, Row> readRowCallable() {
410+
return readRowCallable;
411+
}
412+
363413
public UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable() {
364414
return sampleRowKeysCallable;
365415
}

google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
112112
private final String appProfileId;
113113

114114
private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
115+
private final UnaryCallSettings<Query, Row> readRowSettings;
115116
private final UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings;
116117
private final UnaryCallSettings<RowMutation, Void> mutateRowSettings;
117118
private final BatchingCallSettings<RowMutation, Void> bulkMutateRowsSettings;
@@ -120,11 +121,22 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
120121

121122
private EnhancedBigtableStubSettings(Builder builder) {
122123
super(builder);
124+
125+
// Since point reads & streaming reads share the same base callable that converts grpc errors
126+
// into ApiExceptions, they must have the same retry codes.
127+
Preconditions.checkState(
128+
builder
129+
.readRowSettings
130+
.getRetryableCodes()
131+
.equals(builder.readRowsSettings.getRetryableCodes()),
132+
"Single ReadRow retry codes must match ReadRows retry codes");
133+
123134
instanceName = builder.instanceName;
124135
appProfileId = builder.appProfileId;
125136

126137
// Per method settings.
127138
readRowsSettings = builder.readRowsSettings.build();
139+
readRowSettings = builder.readRowSettings.build();
128140
sampleRowKeysSettings = builder.sampleRowKeysSettings.build();
129141
mutateRowSettings = builder.mutateRowSettings.build();
130142
bulkMutateRowsSettings = builder.bulkMutateRowsSettings.build();
@@ -163,6 +175,11 @@ public UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings() {
163175
return sampleRowKeysSettings;
164176
}
165177

178+
/** Returns the object with the settings used for point reads via ReadRows. */
179+
public UnaryCallSettings<Query, Row> readRowSettings() {
180+
return readRowSettings;
181+
}
182+
166183
/** Returns the object with the settings used for calls to MutateRow. */
167184
public UnaryCallSettings<RowMutation, Void> mutateRowSettings() {
168185
return mutateRowSettings;
@@ -200,6 +217,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
200217
private String appProfileId;
201218

202219
private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
220+
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
203221
private final UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings;
204222
private final UnaryCallSettings.Builder<RowMutation, Void> mutateRowSettings;
205223
private final BatchingCallSettings.Builder<RowMutation, Void> bulkMutateRowsSettings;
@@ -234,18 +252,27 @@ private Builder() {
234252

235253
// Per-method settings using baseSettings for defaults.
236254
readRowsSettings = ServerStreamingCallSettings.newBuilder();
237-
/* TODO: copy timeouts, retryCodes & retrySettings from baseSettings.readRows once it exists in GAPIC */
238255
readRowsSettings
239-
.setRetryableCodes(DEFAULT_RETRY_CODES)
240-
.setRetrySettings(
241-
DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofHours(1)).build())
256+
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
257+
.setRetrySettings(baseDefaults.readRowsSettings().getRetrySettings())
242258
.setIdleTimeout(Duration.ofMinutes(5));
243259

260+
// Point reads should use same defaults as streaming reads, but with a shorter timeout
261+
readRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
262+
readRowSettings
263+
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
264+
.setRetrySettings(
265+
baseDefaults
266+
.readRowsSettings()
267+
.getRetrySettings()
268+
.toBuilder()
269+
.setTotalTimeout(DEFAULT_RETRY_SETTINGS.getTotalTimeout())
270+
.build());
271+
244272
sampleRowKeysSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
245-
/* TODO: copy retryCodes & retrySettings from baseSettings.sampleRowKeysSettings once it exists in GAPIC */
246273
sampleRowKeysSettings
247-
.setRetryableCodes(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED)
248-
.setRetrySettings(DEFAULT_RETRY_SETTINGS);
274+
.setRetryableCodes(baseDefaults.sampleRowKeysSettings().getRetryableCodes())
275+
.setRetrySettings(baseDefaults.sampleRowKeysSettings().getRetrySettings());
249276

250277
mutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
251278
copyRetrySettings(baseDefaults.mutateRowSettings(), mutateRowSettings);
@@ -282,6 +309,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
282309

283310
// Per method settings.
284311
readRowsSettings = settings.readRowsSettings.toBuilder();
312+
readRowSettings = settings.readRowSettings.toBuilder();
285313
sampleRowKeysSettings = settings.sampleRowKeysSettings.toBuilder();
286314
mutateRowSettings = settings.mutateRowSettings.toBuilder();
287315
bulkMutateRowsSettings = settings.bulkMutateRowsSettings.toBuilder();
@@ -339,6 +367,11 @@ public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
339367
return readRowsSettings;
340368
}
341369

370+
/** Returns the builder for the settings used for point reads using readRow. */
371+
public UnaryCallSettings.Builder<Query, Row> readRowSettings() {
372+
return readRowSettings;
373+
}
374+
342375
/** Returns the builder for the settings used for calls to SampleRowKeysSettings. */
343376
public UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings() {
344377
return sampleRowKeysSettings;

0 commit comments

Comments
 (0)