Skip to content

Commit 7227e77

Browse files
Integrate the MutateRowsRetryingCallable
1 parent 3e3707b commit 7227e77

File tree

6 files changed

+163
-313
lines changed

6 files changed

+163
-313
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub;
1717

18-
import com.google.api.core.ApiFuture;
1918
import com.google.api.core.InternalApi;
20-
import com.google.api.gax.retrying.RetrySettings;
21-
import com.google.api.gax.rpc.ApiCallContext;
19+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
20+
import com.google.api.gax.retrying.RetryAlgorithm;
21+
import com.google.api.gax.retrying.RetryingExecutor;
22+
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
2223
import com.google.api.gax.rpc.BatchingCallSettings;
2324
import com.google.api.gax.rpc.Callables;
2425
import com.google.api.gax.rpc.ClientContext;
2526
import com.google.api.gax.rpc.ServerStreamingCallSettings;
2627
import com.google.api.gax.rpc.ServerStreamingCallable;
2728
import com.google.api.gax.rpc.UnaryCallable;
2829
import com.google.bigtable.v2.MutateRowsRequest;
29-
import com.google.bigtable.v2.MutateRowsResponse;
3030
import com.google.bigtable.v2.ReadRowsRequest;
3131
import com.google.bigtable.v2.SampleRowKeysRequest;
3232
import com.google.bigtable.v2.SampleRowKeysResponse;
@@ -40,13 +40,14 @@
4040
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
4141
import com.google.cloud.bigtable.data.v2.models.RowMutation;
4242
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
43-
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsSpoolingCallable;
43+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
4444
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsUserFacingCallable;
4545
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
4646
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
4747
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
4848
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
4949
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
50+
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
5051
import java.io.IOException;
5152
import java.util.List;
5253
import org.threeten.bp.Duration;
@@ -251,28 +252,39 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
251252
* <li>Convert a {@link RowMutation} into a {@link MutateRowsRequest} with a single entry.
252253
* <li>Using gax's {@link com.google.api.gax.rpc.BatchingCallable} to spool the requests and
253254
* aggregate the {@link MutateRowsRequest.Entry}s.
254-
* <li>Spool the streamed responses.
255+
* <li>Process the response and schedule retries until all entries have been successfully
256+
* applied, fail permanently or there are no more retry attempts left.
257+
* <li>Wrap batch failures in a {@link
258+
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
255259
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
256-
* <li>Apply retries to individual mutations
257260
* </ul>
258261
*/
259262
private UnaryCallable<RowMutation, Void> createMutateRowsCallable() {
260-
MutateRowsSpoolingCallable spooling = new MutateRowsSpoolingCallable(stub.mutateRowsCallable());
263+
RetryAlgorithm<Void> retryAlgorithm =
264+
new RetryAlgorithm<>(
265+
new ApiResultRetryAlgorithm<Void>(),
266+
new ExponentialRetryAlgorithm(
267+
settings.mutateRowsSettings().getRetrySettings(), clientContext.getClock()));
268+
RetryingExecutor<Void> retryingExecutor =
269+
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
270+
271+
UnaryCallable<MutateRowsRequest, Void> retrying =
272+
new MutateRowsRetryingCallable(
273+
clientContext.getDefaultCallContext(),
274+
stub.mutateRowsCallable(),
275+
retryingExecutor,
276+
settings.mutateRowsSettings().getRetryableCodes());
261277

262278
// recreate BatchingCallSettings with the correct descriptor
263-
BatchingCallSettings.Builder<MutateRowsRequest, MutateRowsResponse> batchingCallSettings =
264-
BatchingCallSettings.newBuilder(
265-
new MutateRowsBatchingDescriptor(settings.mutateRowsSettings().getRetryableCodes()))
279+
BatchingCallSettings.Builder<MutateRowsRequest, Void> batchingCallSettings =
280+
BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor())
266281
.setBatchingSettings(settings.mutateRowsSettings().getBatchingSettings());
267282

268-
UnaryCallable<MutateRowsRequest, MutateRowsResponse> batching =
269-
Callables.batching(spooling, batchingCallSettings.build(), clientContext);
270-
271-
UnaryCallable<MutateRowsRequest, MutateRowsResponse> retrying =
272-
Callables.retrying(batching, settings.mutateRowsSettings(), clientContext);
283+
UnaryCallable<MutateRowsRequest, Void> batching =
284+
Callables.batching(retrying, batchingCallSettings.build(), clientContext);
273285

274286
MutateRowsUserFacingCallable userFacing =
275-
new MutateRowsUserFacingCallable(retrying, requestContext);
287+
new MutateRowsUserFacingCallable(batching, requestContext);
276288

277289
return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
278290
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java

Lines changed: 43 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,28 @@
1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.batching.PartitionKey;
2020
import com.google.api.gax.batching.RequestBuilder;
21-
import com.google.api.gax.grpc.GrpcStatusCode;
2221
import com.google.api.gax.rpc.ApiException;
23-
import com.google.api.gax.rpc.ApiExceptionFactory;
2422
import com.google.api.gax.rpc.BatchedRequestIssuer;
2523
import com.google.api.gax.rpc.BatchingDescriptor;
26-
import com.google.api.gax.rpc.StatusCode;
2724
import com.google.bigtable.v2.MutateRowsRequest;
28-
import com.google.bigtable.v2.MutateRowsResponse;
29-
import com.google.common.base.Preconditions;
30-
import com.google.common.collect.ImmutableSet;
31-
import com.google.common.primitives.Ints;
32-
import com.google.rpc.Code;
33-
import com.google.rpc.Status;
34-
import io.grpc.StatusException;
35-
import io.grpc.StatusRuntimeException;
25+
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
26+
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
27+
import com.google.common.base.Function;
28+
import com.google.common.collect.Maps;
3629
import java.util.Collection;
37-
import java.util.Set;
30+
import java.util.List;
31+
import java.util.Map;
32+
import javax.annotation.Nullable;
3833

3934
/**
40-
* A custom implementation of a {@link BatchingDescriptor} to split individual results of a bulk
41-
* MutateRowsResponse. Each individual result will be matched with its issuer. Since the embedded
42-
* results bypass gax's result processing chains, this class is responsible for wrapping errors in
43-
* {@link ApiException}s and marking each error as retryable.
35+
* A custom implementation of a {@link BatchingDescriptor} to split individual results in a {@link
36+
* MutateRowsException}. Each individual result will be matched with its issuer.
4437
*
4538
* <p>This class is considered an internal implementation detail and not meant to be used by
4639
* applications directly.
4740
*/
4841
@InternalApi
49-
public class MutateRowsBatchingDescriptor
50-
implements BatchingDescriptor<MutateRowsRequest, MutateRowsResponse> {
51-
52-
// Shared response to notify individual issuers of a successful mutation.
53-
private static final MutateRowsResponse OK_RESPONSE =
54-
MutateRowsResponse.newBuilder()
55-
.addEntries(
56-
MutateRowsResponse.Entry.newBuilder()
57-
.setIndex(0)
58-
.setStatus(Status.newBuilder().setCode(Code.OK_VALUE)))
59-
.build();
60-
61-
private final ImmutableSet<StatusCode.Code> retryableCodes;
62-
63-
public MutateRowsBatchingDescriptor(Set<StatusCode.Code> retryableCodes) {
64-
this.retryableCodes = ImmutableSet.copyOf(retryableCodes);
65-
}
66-
42+
public class MutateRowsBatchingDescriptor implements BatchingDescriptor<MutateRowsRequest, Void> {
6743
/** Return the target table name. This will be used to combine batcheable requests */
6844
@Override
6945
public PartitionKey getBatchPartitionKey(MutateRowsRequest request) {
@@ -76,44 +52,49 @@ public RequestBuilder<MutateRowsRequest> getRequestBuilder() {
7652
return new MyRequestBuilder();
7753
}
7854

79-
/** {@inheritDoc} */
8055
@Override
8156
public void splitResponse(
82-
MutateRowsResponse batchResponse,
83-
Collection<? extends BatchedRequestIssuer<MutateRowsResponse>> batch) {
57+
Void batchResponse, Collection<? extends BatchedRequestIssuer<Void>> batch) {
8458

85-
// Sort the result entries by index.
86-
Status[] sortedEntries = new Status[batchResponse.getEntriesCount()];
87-
88-
for (MutateRowsResponse.Entry entry : batchResponse.getEntriesList()) {
89-
int index = Ints.checkedCast(entry.getIndex());
90-
Preconditions.checkState(
91-
sortedEntries[index] == null, "Got multiple results for the same sub-mutation");
92-
sortedEntries[index] = entry.getStatus();
59+
for (BatchedRequestIssuer<Void> issuer : batch) {
60+
issuer.setResponse(null);
9361
}
62+
}
9463

95-
// Notify all of issuers of the corresponding result.
96-
int i = 0;
97-
for (BatchedRequestIssuer<MutateRowsResponse> issuer : batch) {
98-
Status entry = sortedEntries[i++];
99-
Preconditions.checkState(entry != null, "Missing result for entry");
64+
@Override
65+
public void splitException(
66+
Throwable throwable, Collection<? extends BatchedRequestIssuer<Void>> batch) {
10067

101-
if (entry.getCode() == Code.OK_VALUE) {
102-
issuer.setResponse(OK_RESPONSE);
103-
} else {
104-
issuer.setException(createElementException(entry));
68+
if (!(throwable instanceof MutateRowsException)) {
69+
for (BatchedRequestIssuer<Void> issuer : batch) {
70+
issuer.setException(throwable);
10571
}
72+
return;
10673
}
107-
}
10874

109-
/** {@inheritDoc} */
110-
@Override
111-
public void splitException(
112-
Throwable throwable, Collection<? extends BatchedRequestIssuer<MutateRowsResponse>> batch) {
113-
throwable = createElementException(throwable);
75+
List<FailedMutation> failedMutations = ((MutateRowsException) throwable).getFailedMutations();
76+
77+
Map<Integer, FailedMutation> errorsByIndex =
78+
Maps.uniqueIndex(
79+
failedMutations,
80+
new Function<FailedMutation, Integer>() {
81+
@Nullable
82+
@Override
83+
public Integer apply(@Nullable FailedMutation input) {
84+
return input.getIndex();
85+
}
86+
});
11487

115-
for (BatchedRequestIssuer<MutateRowsResponse> responder : batch) {
116-
responder.setException(throwable);
88+
int i = 0;
89+
for (BatchedRequestIssuer<Void> issuer : batch) {
90+
for (int j = 0; j < issuer.getMessageCount(); j++) {
91+
FailedMutation failure = errorsByIndex.get(i++);
92+
if (failure == null) {
93+
issuer.setResponse(null);
94+
} else {
95+
issuer.setException(failure.getError());
96+
}
97+
}
11798
}
11899
}
119100

@@ -129,38 +110,6 @@ public long countBytes(MutateRowsRequest request) {
129110
return request.getSerializedSize();
130111
}
131112

132-
/** Convert an element error Status into an ApiException */
133-
private ApiException createElementException(Status protoStatus) {
134-
Preconditions.checkArgument(protoStatus.getCode() != Code.OK_VALUE, "OK is not an error");
135-
136-
StatusRuntimeException throwable =
137-
io.grpc.Status.fromCodeValue(protoStatus.getCode())
138-
.withDescription(protoStatus.getMessage())
139-
.asRuntimeException();
140-
141-
return createElementException(throwable);
142-
}
143-
144-
/** Convert a Throwable into an ApiException, marking it as retryable when appropriate. */
145-
private ApiException createElementException(Throwable throwable) {
146-
final io.grpc.Status.Code code;
147-
148-
if (throwable instanceof ApiException) {
149-
return (ApiException) throwable;
150-
} else if (throwable instanceof StatusRuntimeException) {
151-
code = ((StatusRuntimeException) throwable).getStatus().getCode();
152-
} else if (throwable instanceof StatusException) {
153-
code = ((StatusException) throwable).getStatus().getCode();
154-
} else {
155-
code = io.grpc.Status.Code.UNKNOWN;
156-
}
157-
158-
GrpcStatusCode gaxStatusCode = GrpcStatusCode.of(code);
159-
boolean isRetryable = retryableCodes.contains(gaxStatusCode.getCode());
160-
161-
return ApiExceptionFactory.createException(throwable, gaxStatusCode, isRetryable);
162-
}
163-
164113
/** A {@link com.google.api.gax.batching.RequestBuilder} that can aggregate MutateRowsRequest */
165114
static class MyRequestBuilder implements RequestBuilder<MutateRowsRequest> {
166115
private MutateRowsRequest.Builder builder;

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsSpoolingCallable.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsUserFacingCallable.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,11 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub.mutaterows;
1717

18-
import com.google.api.core.ApiFunction;
1918
import com.google.api.core.ApiFuture;
20-
import com.google.api.core.ApiFutures;
2119
import com.google.api.core.InternalApi;
2220
import com.google.api.gax.rpc.ApiCallContext;
2321
import com.google.api.gax.rpc.UnaryCallable;
2422
import com.google.bigtable.v2.MutateRowsRequest;
25-
import com.google.bigtable.v2.MutateRowsResponse;
2623
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
2724
import com.google.cloud.bigtable.data.v2.models.RowMutation;
2825

@@ -34,28 +31,18 @@
3431
*/
3532
@InternalApi
3633
public class MutateRowsUserFacingCallable extends UnaryCallable<RowMutation, Void> {
37-
private final UnaryCallable<MutateRowsRequest, MutateRowsResponse> inner;
34+
private final UnaryCallable<MutateRowsRequest, Void> inner;
3835
private final RequestContext requestContext;
3936

4037
public MutateRowsUserFacingCallable(
41-
UnaryCallable<MutateRowsRequest, MutateRowsResponse> inner, RequestContext requestContext) {
38+
UnaryCallable<MutateRowsRequest, Void> inner, RequestContext requestContext) {
4239

4340
this.inner = inner;
4441
this.requestContext = requestContext;
4542
}
4643

4744
@Override
4845
public ApiFuture<Void> futureCall(RowMutation request, ApiCallContext context) {
49-
ApiFuture<MutateRowsResponse> rawResponse =
50-
inner.futureCall(request.toBulkProto(requestContext), context);
51-
52-
return ApiFutures.transform(
53-
rawResponse,
54-
new ApiFunction<MutateRowsResponse, Void>() {
55-
@Override
56-
public Void apply(MutateRowsResponse mutateRowsResponse) {
57-
return null;
58-
}
59-
});
46+
return inner.futureCall(request.toBulkProto(requestContext), context);
6047
}
6148
}

0 commit comments

Comments
 (0)