Skip to content

Commit

Permalink
fix: Add timeout to bulkMutation's close() (#4140)
Browse files Browse the repository at this point in the history
- [x] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable-hbase/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #4139 ☕️
  • Loading branch information
kongweihan authored Aug 4, 2023
1 parent 3e1de74 commit 79eb7c7
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ public BigtableBufferedMutatorHelper(
this.adapter = adapter;
this.settings = settings;
this.dataClient = bigtableApi.getDataClient();
this.bulkMutation = dataClient.createBulkMutation(this.adapter.getTableId());
this.bulkMutation =
dataClient.createBulkMutation(
this.adapter.getTableId(), settings.getBulkMutationCloseTimeoutMilliseconds());
this.operationAccountant = new OperationAccountant();
}

public void close() throws IOException {
closedWriteLock.lock();
try {
flush();
bulkMutation.close();
closed = true;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.util.Logger;
import com.google.cloud.bigtable.hbase.wrappers.veneer.BigtableHBaseVeneerSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
Expand All @@ -39,6 +40,15 @@ public abstract class BigtableHBaseSettings {
private final String instanceId;
private final int ttlSecondsForBackup;

@VisibleForTesting
// This is used to override the default closing timeout of 10 minutes in the test so the test can
// run faster
public static final String BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS =
"bulk.mutation.close.timeout.milliseconds";

// Must be non-negative. Set to 0 to disable timeout.
private final long bulkMutationCloseTimeoutMilliseconds;

public static BigtableHBaseSettings create(Configuration configuration) throws IOException {
return BigtableHBaseVeneerSettings.create(configuration);
}
Expand All @@ -51,6 +61,10 @@ public BigtableHBaseSettings(Configuration configuration) {
configuration.getInt(
BigtableOptionsFactory.BIGTABLE_SNAPSHOT_DEFAULT_TTL_SECS_KEY,
BigtableOptionsFactory.BIGTABLE_SNAPSHOT_DEFAULT_TTL_SECS_VALUE);

this.bulkMutationCloseTimeoutMilliseconds =
configuration.getLong(BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS, 0);
Preconditions.checkArgument(this.bulkMutationCloseTimeoutMilliseconds >= 0);
}

public Configuration getConfiguration() {
Expand Down Expand Up @@ -82,6 +96,10 @@ public int getTtlSecondsForBackup() {
// This is equivalent to allow server-side timestamp.
public abstract boolean isRetriesWithoutTimestampAllowed();

public long getBulkMutationCloseTimeoutMilliseconds() {
return bulkMutationCloseTimeoutMilliseconds;
}

protected String getRequiredValue(String key, String displayName) {
String value = configuration.get(key);
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface DataClientWrapper extends AutoCloseable {
/** Creates instance of bulkMutation with specified table ID. */
BulkMutationWrapper createBulkMutation(String tableId);

BulkMutationWrapper createBulkMutation(String tableId, long closeTimeoutMilliseconds);

/**
* Creates {@link BulkReadWrapper} with specified table ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public class BigtableHBaseVeneerSettings extends BigtableHBaseSettings {
private final long batchingMaxMemory;
private final boolean isChannelPoolCachingEnabled;
private final boolean allowRetriesWithoutTimestamp;

private final ClientOperationTimeouts clientTimeouts;

public static BigtableHBaseVeneerSettings create(Configuration configuration) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.cloud.bigtable.metrics.Meter;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
Expand All @@ -34,8 +37,14 @@ public class BulkMutationVeneerApi implements BulkMutationWrapper {
BigtableClientMetrics.meter(MetricLevel.Info, "bulk-mutator.mutations.added");
private final Batcher<RowMutationEntry, Void> bulkMutateBatcher;

BulkMutationVeneerApi(Batcher<RowMutationEntry, Void> bulkMutateBatcher) {
// If set to 0, timeout is disabled. Negative value is not accepted.
private final long closeTimeoutMilliseconds;

BulkMutationVeneerApi(
Batcher<RowMutationEntry, Void> bulkMutateBatcher, long closeTimeoutMilliseconds) {
this.bulkMutateBatcher = bulkMutateBatcher;
Preconditions.checkArgument(closeTimeoutMilliseconds >= 0);
this.closeTimeoutMilliseconds = closeTimeoutMilliseconds;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -65,9 +74,16 @@ public synchronized void flush() {
@Override
public synchronized void close() throws IOException {
try {
bulkMutateBatcher.close();
} catch (InterruptedException e) {
ApiFuture future = bulkMutateBatcher.closeAsync();
if (closeTimeoutMilliseconds > 0) {
future.get(closeTimeoutMilliseconds, TimeUnit.MILLISECONDS);
} else {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException("Could not close the bulk mutation Batcher", e);
} catch (TimeoutException e) {
throw new IOException("Cloud not close the bulk mutation Batcher, timed out in close()", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ public class DataClientVeneerApi implements DataClientWrapper {

@Override
public BulkMutationWrapper createBulkMutation(String tableId) {
return new BulkMutationVeneerApi(delegate.newBulkMutationBatcher(tableId));
return new BulkMutationVeneerApi(delegate.newBulkMutationBatcher(tableId), 0);
}

@Override
public BulkMutationWrapper createBulkMutation(String tableId, long closeTimeoutMilliseconds) {
return new BulkMutationVeneerApi(
delegate.newBulkMutationBatcher(tableId), closeTimeoutMilliseconds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public BulkMutationWrapper createBulkMutation(String tableId) {
return delegate.createBulkMutation(tableId);
}

@Override
public BulkMutationWrapper createBulkMutation(String tableId, long closeTimeoutMilliseconds) {
return delegate.createBulkMutation(tableId, closeTimeoutMilliseconds);
}

@Override
public BulkReadWrapper createBulkRead(String tableId) {
return delegate.createBulkRead(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public void setup() throws IOException {
when(mockBigtableApi.getDataClient()).thenReturn(mockDataClientWrapper);
when(mockDataClientWrapper.createBulkRead("table")).thenReturn(mockBulkRead);
when(mockDataClientWrapper.createBulkMutation(any(String.class))).thenReturn(mockBulkMutation);
when(mockDataClientWrapper.createBulkMutation(any(String.class), any(Long.class)))
.thenReturn(mockBulkMutation);

when(mockBulkMutation.add(any(RowMutationEntry.class))).thenReturn(mockFuture);
when(mockDataClientWrapper.readModifyWriteRowAsync(any(ReadModifyWriteRow.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class TestBigtableBufferedMutator {
public void setUp() {
when(mockBigtableApi.getDataClient()).thenReturn(mockDataClient);
when(mockDataClient.createBulkMutation(Mockito.anyString())).thenReturn(mockBulkMutation);
when(mockDataClient.createBulkMutation(Mockito.anyString(), Mockito.anyLong()))
.thenReturn(mockBulkMutation);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
Expand All @@ -60,10 +60,15 @@ public class TestBulkMutationVeneerApi {

@Mock private Batcher<RowMutationEntry, Void> batcher;

@InjectMocks private BulkMutationVeneerApi bulkMutationWrapper;
private BulkMutationVeneerApi bulkMutationWrapper;

private RowMutationEntry rowMutation = RowMutationEntry.create("fake-key");

@Before
public void setup() {
bulkMutationWrapper = new BulkMutationVeneerApi(batcher, 0);
}

@Test
public void testAdd() {
SettableApiFuture<Void> future = SettableApiFuture.create();
Expand Down Expand Up @@ -144,7 +149,7 @@ public void testWhenBatcherIsClosed() throws IOException {
new Object(),
batchingSettings,
mock(ScheduledExecutorService.class));
BulkMutationWrapper underTest = new BulkMutationVeneerApi(actualBatcher);
BulkMutationWrapper underTest = new BulkMutationVeneerApi(actualBatcher, 0);
underTest.close();

Exception actualEx = null;
Expand Down
6 changes: 6 additions & 0 deletions bigtable-dataflow-parent/bigtable-hbase-beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ limitations under the License.
</dependency>

<!-- Test Group -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.bigtable.repackaged.com.google.common.collect.ImmutableMap;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -217,7 +218,8 @@ public String getAppProfileId() {
}

/**
* Converts the {@link CloudBigtableConfiguration} to an HBase {@link Configuration}.
* Converts the {@link CloudBigtableConfiguration} to an HBase {@link Configuration}. This should
* be called only in runtime so that we can call get() from ValueProvider.
*
* @return The {@link Configuration}.
*/
Expand Down Expand Up @@ -247,6 +249,16 @@ public Configuration toHBaseConfig() {
}
}
setUserAgent(config);

ValueProvider<String> timeout =
configuration.get(BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS);

if (timeout == null) {
config.set(BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS, "600000");
} else {
config.set(BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS, timeout.get());
}

return config;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.beam;

import com.google.bigtable.repackaged.com.google.bigtable.v2.BigtableGrpc.BigtableImplBase;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.repackaged.io.grpc.ServerBuilder;
import com.google.bigtable.repackaged.io.grpc.stub.StreamObserver;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class BulkMutationCloseTimeoutTest {

@Rule public TestPipeline writePipeline = TestPipeline.create();

@Rule public final TemporaryFolder workDir = new TemporaryFolder();

@Rule public ExpectedException thrown = ExpectedException.none();

@Test(timeout = 20000)
public void testBulkMutationCloseTimeout() throws Throwable {
thrown.expect(PipelineExecutionException.class);
thrown.expectMessage("Cloud not close the bulk mutation Batcher, timed out in close()");

int port = startFakeBigtableService();

List<Mutation> data = new ArrayList<>();
for (int i = 0; i < 1; i++) {
Put row = new Put(Bytes.toBytes("key-123"));
row.addColumn(
Bytes.toBytes("column-family"), Bytes.toBytes("column"), Bytes.toBytes("value"));
data.add(row);
}

CloudBigtableTableConfiguration config =
new CloudBigtableTableConfiguration.Builder()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withTableId("test-table")
.withConfiguration(
BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY, "localhost:" + port)
.withConfiguration(
BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS, "10000")
.build();

CloudBigtableIO.writeToTable(config);
writePipeline.apply(Create.of(data)).apply(CloudBigtableIO.writeToTable(config));
writePipeline.run().waitUntilFinish();
}

private int startFakeBigtableService() throws IOException {
int port;
try (ServerSocket ss = new ServerSocket(0)) {
port = ss.getLocalPort();
}
ServerBuilder builder = ServerBuilder.forPort(port);
builder.addService(new FakeBigtableService());
builder.build().start();

System.out.println("Starting FakeBigtableService on port: " + port);
return port;
}

public static class FakeBigtableService extends BigtableImplBase {

@Override
public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsResponse> observer) {
// This test intentionally skips calling observer.onCompleted() to this operation hang.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.hbase2_x;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -40,6 +41,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
Expand Down Expand Up @@ -67,7 +69,8 @@ public class TestBigtableAsyncBufferedMutator {
public void setUp() {
when(mockBigtableApi.getDataClient()).thenReturn(mockDataClient);
when(mockRequestAdapter.getTableId()).thenReturn(TABLE_ID);
when(mockDataClient.createBulkMutation(TABLE_ID)).thenReturn(mockBulkMutation);
when(mockDataClient.createBulkMutation(eq(TABLE_ID), ArgumentMatchers.anyLong()))
.thenReturn(mockBulkMutation);
asyncBufferedMutator =
new BigtableAsyncBufferedMutator(mockBigtableApi, mockBigtableSettings, mockRequestAdapter);
}
Expand All @@ -86,7 +89,6 @@ public void testClose() throws Exception {
asyncBufferedMutator.close();

verify(mockBulkMutation).sendUnsent();
verify(mockBulkMutation).flush();
verify(mockBulkMutation).close();
}

Expand Down

0 comments on commit 79eb7c7

Please sign in to comment.