Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: feature parity for veneer attempt timeouts #3027

Merged
merged 4 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: feature parity for veneer attempt timeouts
* add attempt timeouts for reads. java-bigtable doesnt directly support per attempt timeouts for ReadRows. This PR works around it by setting a timeout in ApiCallContext for every call. Also, it ensures that the overall timeout is still respected after customized the attempt timeout
* re-add the ability to "disable" timeouts. java-bigtable-hbase has a USE_TIMEOUTS config, which effectively ignores user configured deadlines and forces all by scans to use a 6 minute timeout. This config is deprecated and will be removed. But during the transition to veneer, we should keep it
  • Loading branch information
igorbernstein2 committed Jun 23, 2021
commit 564a3697a2cdcb29fffbd5b6c53169e1e5d480ce
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,19 @@ public interface DataClientWrapper extends AutoCloseable {
/** Creates instance of bulkMutation with specified table ID. */
BulkMutationWrapper createBulkMutation(String tableId);

/** Creates {@link BulkReadWrapper} with specified table ID. */
/**
* Creates {@link BulkReadWrapper} with specified table ID.
*
* <p>The BulkRead instance will be scoped to a single user visible operation. The operation
* timeout is started from the time the createBulkRead is invoked.
mutianf marked this conversation as resolved.
Show resolved Hide resolved
*
* <pre>{@code
* try (BulkReadWrapper batch = wrapper.createBulkRead(tableId)) {
* batch.add(key1, filter1);
* batch.add(key2, filter2);
* }
* }</pre>
*/
BulkReadWrapper createBulkRead(String tableId);

/** Mutate a row atomically. */
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public BigtableVeneerApi(BigtableHBaseVeneerSettings settings) throws IOExceptio
channelPoolSize = 0;
} else {
dataClientWrapper =
new DataClientVeneerApi(BigtableDataClient.create(settings.getDataSettings()));
new DataClientVeneerApi(
BigtableDataClient.create(settings.getDataSettings()), settings.getClientTimeouts());
channelPoolSize = getChannelPoolSize(settings.getDataSettings().getStubSettings());
for (int i = 0; i < channelPoolSize; i++) {
BigtableClientMetrics.counter(MetricLevel.Info, "grpc.channel.active").inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.bigtable.v2.RowFilter;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.Filters;
Expand Down Expand Up @@ -68,10 +69,12 @@ public Thread newThread(Runnable r) {
// TODO: remove this once gax-java's Batcher supports asyncClose(). This will eliminate the need
// to track individual entries
private final AtomicLong cleanupBarrier;
private final GrpcCallContext callContext;

BulkReadVeneerApi(BigtableDataClient client, String tableId) {
BulkReadVeneerApi(BigtableDataClient client, String tableId, GrpcCallContext callContext) {
this.client = client;
this.tableId = tableId;
this.callContext = callContext;

this.batchers = new HashMap<>();
this.cleanupBarrier = new AtomicLong();
Expand Down Expand Up @@ -135,7 +138,7 @@ private Batcher<ByteString, Row> getOrCreateBatcher(@Nullable Filters.Filter fil

Batcher<ByteString, Row> batcher = batchers.get(proto);
if (batcher == null) {
batcher = client.newBulkReadRowsBatcher(tableId, filter);
batcher = client.newBulkReadRowsBatcher(tableId, filter, callContext);
batchers.put(proto, batcher);
}
return batcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
Expand All @@ -34,15 +36,21 @@
import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper;
import com.google.cloud.bigtable.hbase.wrappers.BulkReadWrapper;
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import com.google.cloud.bigtable.hbase.wrappers.veneer.BigtableHBaseVeneerSettings.ClientOperationTimeouts;
import com.google.cloud.bigtable.hbase.wrappers.veneer.BigtableHBaseVeneerSettings.NoTimeoutsInterceptor;
import com.google.cloud.bigtable.hbase.wrappers.veneer.BigtableHBaseVeneerSettings.OperationTimeouts;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.Meter;
import com.google.cloud.bigtable.metrics.Timer;
import com.google.cloud.bigtable.metrics.Timer.Context;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -55,9 +63,12 @@ public class DataClientVeneerApi implements DataClientWrapper {
private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter();

private final BigtableDataClient delegate;
private final ClientOperationTimeouts clientOperationTimeouts;

DataClientVeneerApi(BigtableDataClient delegate) {
DataClientVeneerApi(
BigtableDataClient delegate, ClientOperationTimeouts clientOperationTimeouts) {
this.delegate = delegate;
this.clientOperationTimeouts = clientOperationTimeouts;
}

@Override
Expand All @@ -67,7 +78,7 @@ public BulkMutationWrapper createBulkMutation(String tableId) {

@Override
public BulkReadWrapper createBulkRead(String tableId) {
return new BulkReadVeneerApi(delegate, tableId);
return new BulkReadVeneerApi(delegate, tableId, createScanCallContext());
}

@Override
Expand Down Expand Up @@ -101,8 +112,14 @@ public ApiFuture<List<KeyOffset>> sampleRowKeysAsync(String tableId) {
@Override
public ApiFuture<Result> readRowAsync(
String tableId, ByteString rowKey, @Nullable Filters.Filter filter) {

Query query = Query.create(tableId).rowKey(rowKey).limit(1);
if (filter != null) {
query.filter(filter);
}

return ApiFutures.transform(
delegate.readRowAsync(tableId, rowKey, filter),
delegate.readRowCallable().futureCall(query, createReadRowCallContext()),
new ApiFunction<Row, Result>() {
@Override
public Result apply(Row row) {
Expand All @@ -114,17 +131,84 @@ public Result apply(Row row) {

@Override
public ResultScanner readRows(Query request) {
return new RowResultScanner(delegate.readRowsCallable(RESULT_ADAPTER).call(request));
return new RowResultScanner(
delegate.readRowsCallable(RESULT_ADAPTER).call(request, createScanCallContext()));
}

@Override
public ApiFuture<List<Result>> readRowsAsync(Query request) {
return delegate.readRowsCallable(RESULT_ADAPTER).all().futureCall(request);
return delegate
.readRowsCallable(RESULT_ADAPTER)
.all()
.futureCall(request, createScanCallContext());
}

@Override
public void readRowsAsync(Query request, StreamObserver<Result> observer) {
delegate.readRowsCallable(RESULT_ADAPTER).call(request, new StreamObserverAdapter<>(observer));
delegate
.readRowsCallable(RESULT_ADAPTER)
.call(request, new StreamObserverAdapter<>(observer), createScanCallContext());
}

// Point reads are implemented using a streaming ReadRows RPC. So timeouts need to be managed
// similar to scans below.
private ApiCallContext createReadRowCallContext() {
GrpcCallContext ctx = GrpcCallContext.createDefault();
OperationTimeouts callSettings = clientOperationTimeouts.getUnaryTimeouts();

if (clientOperationTimeouts.getUseTimeouts()) {
// By default veneer doesnt have timeouts per attempt for streaming rpcs
if (callSettings.getAttemptTimeout().isPresent()) {
ctx = ctx.withTimeout(callSettings.getAttemptTimeout().get());
}
// TODO: remove this after fixing it in veneer/gax
// If the attempt timeout was overridden, it disables overall timeout limiting
// Fix it by settings the underlying grpc deadline
if (callSettings.getOperationTimeout().isPresent()) {
ctx =
ctx.withCallOptions(
CallOptions.DEFAULT.withDeadline(
Deadline.after(
callSettings.getOperationTimeout().get().toMillis(),
TimeUnit.MILLISECONDS)));
}
}

return ctx;
}

// Support 2 bigtable-hbase features not directly available in veneer:
// - disabling timeouts - when timeouts are disabled, bigtable-hbase ignores user configured
// timeouts and forces 6 minute deadlines per attempt for all RPCs except scans. This is
// implemented by an interceptor. However the interceptor must be informed that this is a scan
// - per attempt deadlines - vener doesn't implement deadlines for attempts. To workaround this,
// the timeouts are set per call in the ApiCallContext. However this creates a separate issue of
// over running the operation deadline, so gRPC deadline is also set.
private GrpcCallContext createScanCallContext() {
GrpcCallContext ctx = GrpcCallContext.createDefault();
OperationTimeouts callSettings = clientOperationTimeouts.getScanTimeouts();

if (!clientOperationTimeouts.getUseTimeouts()) {
ctx =
ctx.withCallOptions(
CallOptions.DEFAULT.withOption(
NoTimeoutsInterceptor.SKIP_DEFAULT_ATTEMPT_TIMEOUT, true));
} else if (callSettings.getAttemptTimeout().isPresent()) {
ctx = ctx.withTimeout(callSettings.getAttemptTimeout().get());

// TODO: remove this after fixing it in veneer/gax
// If the attempt timeout was overridden, it disables overall timeout limiting
// Fix it by settings the underlying grpc deadline
if (callSettings.getOperationTimeout().isPresent()) {
ctx =
ctx.withCallOptions(
CallOptions.DEFAULT.withDeadline(
Deadline.after(
callSettings.getOperationTimeout().get().toMillis(),
TimeUnit.MILLISECONDS)));
}
}
return ctx;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ synchronized DataClientWrapper createDataClient(BigtableHBaseVeneerSettings sett
.setHeaderProvider(FixedHeaderProvider.create(sharedCtx.getHeaders()))
.setClock(sharedCtx.getClock());

BigtableDataSettings data = builder.build();
// Create a reference counted client wrapper
return new SharedDataClientWrapper(
this, key, new DataClientVeneerApi(BigtableDataClient.create(builder.build())));
this,
key,
new DataClientVeneerApi(BigtableDataClient.create(data), settings.getClientTimeouts()));
} catch (IOException | RuntimeException e) {
release(key);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void setUp() throws IOException {
configuration.set(BigtableOptionsFactory.BIGTABLE_NULL_CREDENTIAL_ENABLE_KEY, "true");
configuration.set(BigtableOptionsFactory.BIGTABLE_DATA_CHANNEL_COUNT_KEY, "1");
configuration.set(BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY, "localhost:" + port);
configuration.setBoolean(BigtableOptionsFactory.BIGTABLE_USE_GCJ_CLIENT, false);
bigtableHBaseSettings = BigtableHBaseClassicSettings.create(configuration);
bigtableApi = BigtableApi.create(bigtableHBaseSettings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import com.google.cloud.bigtable.hbase.wrappers.veneer.metrics.MetricsApiTracerAdapterFactory;
import com.google.common.base.Optional;
import io.grpc.internal.GrpcUtil;
import java.io.IOException;
import java.net.ServerSocket;
Expand All @@ -72,6 +73,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class TestBigtableHBaseVeneerSettings {
Expand Down Expand Up @@ -215,25 +217,35 @@ public void testTimeoutBeingPassed() throws IOException {
configuration.setLong(MAX_SCAN_TIMEOUT_RETRIES, maxAttempt);
configuration.setInt(BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, readRowStreamTimeout);
configuration.setInt(BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, readRowStreamAttemptTimeout);
BigtableDataSettings settings =
BigtableHBaseVeneerSettings.create(configuration).getDataSettings();

BigtableHBaseVeneerSettings settings = BigtableHBaseVeneerSettings
.create(configuration);
BigtableDataSettings dataSettings =
settings.getDataSettings();

assertTrue(settings.getClientTimeouts().getUseTimeouts());
assertEquals(Optional.of(Duration.ofMillis(rpcTimeoutMs)), settings.getClientTimeouts().getUnaryTimeouts().getOperationTimeout());
assertEquals(Optional.of(Duration.ofMillis(rpcAttemptTimeoutMs)), settings.getClientTimeouts().getUnaryTimeouts().getAttemptTimeout());
assertEquals(Optional.of(Duration.ofMillis(readRowStreamTimeout)), settings.getClientTimeouts().getScanTimeouts().getOperationTimeout());
assertEquals(Optional.of(Duration.ofMillis(readRowStreamAttemptTimeout)), settings.getClientTimeouts().getScanTimeouts().getAttemptTimeout());
assertEquals(Optional.of(Duration.ofMillis(perRowTimeoutMs)), settings.getClientTimeouts().getScanTimeouts().getResponseTimeout());

RetrySettings readRowRetrySettings =
settings.getStubSettings().readRowSettings().getRetrySettings();
dataSettings.getStubSettings().readRowSettings().getRetrySettings();
assertEquals(initialElapsedMs, readRowRetrySettings.getInitialRetryDelay().toMillis());
assertEquals(rpcTimeoutMs, readRowRetrySettings.getTotalTimeout().toMillis());
assertEquals(rpcAttemptTimeoutMs, readRowRetrySettings.getInitialRpcTimeout().toMillis());
assertEquals(rpcAttemptTimeoutMs, readRowRetrySettings.getMaxRpcTimeout().toMillis());

RetrySettings checkAndMutateRetrySettings =
settings.getStubSettings().checkAndMutateRowSettings().getRetrySettings();
dataSettings.getStubSettings().checkAndMutateRowSettings().getRetrySettings();
assertEquals(rpcTimeoutMs, checkAndMutateRetrySettings.getTotalTimeout().toMillis());
// CheckAndMutate is non-retriable so its rpc timeout = overall timeout
assertEquals(rpcTimeoutMs, checkAndMutateRetrySettings.getInitialRpcTimeout().toMillis());
assertEquals(rpcTimeoutMs, checkAndMutateRetrySettings.getMaxRpcTimeout().toMillis());

RetrySettings readRowsRetrySettings =
settings.getStubSettings().readRowsSettings().getRetrySettings();
dataSettings.getStubSettings().readRowsSettings().getRetrySettings();
assertEquals(initialElapsedMs, readRowsRetrySettings.getInitialRetryDelay().toMillis());
assertEquals(perRowTimeoutMs, readRowsRetrySettings.getInitialRpcTimeout().toMillis());
assertEquals(perRowTimeoutMs, readRowsRetrySettings.getMaxRpcTimeout().toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void tearDown() throws InterruptedException {
@Test
public void testAdd() throws Exception {
dataClient = BigtableDataClient.create(settingsBuilder.build());
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID);
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID, null);

ApiFuture<Result> resultFuture1_1 = bulkReadWrapper.add(ByteString.copyFromUtf8("one"), null);
ApiFuture<Result> resultFuture1_2 = bulkReadWrapper.add(ByteString.copyFromUtf8("two"), null);
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testAddWithoutSendOutstanding() throws Exception {
.setRequestByteThreshold(10L * 1024L)
.build());
dataClient = BigtableDataClient.create(settingsBuilder.build());
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID);
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID, null);

ApiFuture<Result> row = bulkReadWrapper.add(ROW_KEY, Filters.FILTERS.key().regex("row"));
row.get();
Expand All @@ -147,7 +147,7 @@ public void testSendOutstanding() throws Exception {
.setRequestByteThreshold(10L * 1024L)
.build());
dataClient = BigtableDataClient.create(settingsBuilder.build());
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID);
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID, null);

List<ApiFuture<Result>> results = new ArrayList<>();

Expand Down Expand Up @@ -183,7 +183,7 @@ public void testWhenAutoFlushIsOff() throws Exception {
.setRequestByteThreshold(10L * 1024L)
.build());
dataClient = BigtableDataClient.create(settingsBuilder.build());
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID);
BulkReadWrapper bulkReadWrapper = new BulkReadVeneerApi(dataClient, TABLE_ID, null);

long startTime = System.currentTimeMillis();
ApiFuture<Result> resultFuture = bulkReadWrapper.add(ROW_KEY, null);
Expand Down
Loading