Skip to content

Commit

Permalink
HubSpot Backport: HBASE-28352 HTable batch does not honor RpcThrottli…
Browse files Browse the repository at this point in the history
…ngException waitInterval (apache#5671)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
bbeaudreault committed Feb 11, 2024
1 parent daab7c8 commit 4a7a175
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -416,21 +417,25 @@ SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, i
* timeout against the appropriate tracker, or returns false if no tracker.
*/
private boolean isOperationTimeoutExceeded() {
// return value of 1 is special to mean exceeded, to differentiate from 0
// which is no timeout. see implementation of RetryingTimeTracker.getRemainingTime
return getRemainingTime() == 1;
}

private long getRemainingTime() {
RetryingTimeTracker currentTracker;
if (tracker != null) {
currentTracker = tracker;
} else if (currentCallable != null && currentCallable.getTracker() != null) {
currentTracker = currentCallable.getTracker();
} else {
return false;
return 0;
}

// no-op if already started, this is just to ensure it was initialized (usually true)
currentTracker.start();

// return value of 1 is special to mean exceeded, to differentiate from 0
// which is no timeout. see implementation of getRemainingTime
return currentTracker.getRemainingTime(operationTimeout) == 1;
return currentTracker.getRemainingTime(operationTimeout);
}

/**
Expand Down Expand Up @@ -826,6 +831,8 @@ private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttemp
long backOffTime;
if (retryImmediately) {
backOffTime = 0;
} else if (throwable instanceof RpcThrottlingException) {
backOffTime = ((RpcThrottlingException) throwable).getWaitInterval();
} else if (HBaseServerException.isServerOverloaded(throwable)) {
// Give a special check when encountering an exception indicating the server is overloaded.
// see #HBASE-17114 and HBASE-26807
Expand All @@ -848,6 +855,19 @@ private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttemp
backOffTime, true, null, -1, -1));
}

long remainingTime = getRemainingTime();
// 1 is a special value meaning exceeded and 0 means no timeout.
// throw if timeout already exceeded, or if backoff is larger than non-zero remaining
if (remainingTime == 1 || (remainingTime > 0 && backOffTime > remainingTime)) {
OperationTimeoutExceededException ex = new OperationTimeoutExceededException(
"Backoff time of " + backOffTime + "ms would exceed operation timeout", throwable);
for (Action actionToFail : toReplay) {
manageError(actionToFail.getOriginalIndex(), actionToFail.getAction(),
Retry.NO_NOT_RETRIABLE, ex, null);
}
return;
}

try {
if (backOffTime > 0) {
Thread.sleep(backOffTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
Expand All @@ -66,6 +67,7 @@
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -1738,16 +1740,30 @@ public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception {
testRetryPauseWhenServerIsOverloaded(new CallDroppedException());
}

@Test
public void testRetryPauseForRpcThrottling() throws IOException {
long waitInterval = 500L;
testRetryPause(new Configuration(CONF), waitInterval, new RpcThrottlingException(
RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For test"));
}

private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception)
throws IOException {
Configuration conf = new Configuration(CONF);
final long specialPause = 500L;
Configuration testConf = new Configuration(CONF);
long specialPause = 500L;
testConf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
specialPause);
testRetryPause(testConf, specialPause, exception);
}

private void testRetryPause(Configuration testConf, long expectedPause,
HBaseIOException exception) throws IOException {

final int retries = 1;
conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, specialPause);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);

ClusterConnection conn = new MyConnectionImpl(conf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, exception);
ClusterConnection conn = new MyConnectionImpl(testConf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, exception);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);

Expand All @@ -1767,18 +1783,18 @@ private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception
long actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
long expectedSleep = 0L;
for (int i = 0; i < retries; i++) {
expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
expectedSleep += ConnectionUtils.getPauseTime(expectedPause, i);
// Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result
actualSleep += (long) (specialPause * 0.01f);
actualSleep += (long) (expectedPause * 0.01f);
}
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
actualSleep >= expectedSleep);

// check and confirm normal IOE will use the normal pause
final long normalPause =
conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
ap = new AsyncProcessWithFailure(conn, conf, new IOException());
testConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
ap = new AsyncProcessWithFailure(conn, testConf, new IOException());
bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
Expand All @@ -1802,6 +1818,38 @@ private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
}

@Test
public void testFastFailIfBackoffGreaterThanRemaining() throws IOException {
Configuration testConf = new Configuration(CONF);
testConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
long waitInterval = 500L;
HBaseIOException exception = new RpcThrottlingException(
RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For test");

final int retries = 1;
testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);

ClusterConnection conn = new MyConnectionImpl(testConf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, exception);
BufferedMutatorParams bufferParam =
createBufferedMutatorParams(ap, DUMMY_TABLE).operationTimeout(100);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);

Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());

Put p = createPut(1, true);
mutator.mutate(p);

try {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getCause(0) instanceof OperationTimeoutExceededException);
assertTrue(expected.getCause(0).getMessage().startsWith("Backoff"));
}
}

/**
* Tests that we properly recover from exceptions that DO NOT go through receiveGlobalFailure, due
* to updating the meta cache for the region which failed. Successful multigets can include region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.function.ThrowingRunnable;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
Expand Down Expand Up @@ -161,8 +162,9 @@ public void testPutTimeout() {
}

/**
* Tests that a batch mutate and batch get on a table throws {@link SocketTimeoutException} when
* the operation takes longer than 'hbase.client.operation.timeout'.
* Tests that a batch mutate and batch get on a table throws {@link SocketTimeoutException} or
* {@link OperationTimeoutExceededException} when the operation takes longer than
* 'hbase.client.operation.timeout'.
*/
@Test
public void testMultiTimeout() {
Expand All @@ -174,12 +176,7 @@ public void testMultiTimeout() {
List<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
try {
TABLE.batch(puts, new Object[2]);
Assert.fail("should not reach here");
} catch (Exception e) {
Assert.assertTrue(e instanceof SocketTimeoutException);
}
assertMultiException(() -> TABLE.batch(puts, new Object[2]));

Get get1 = new Get(ROW);
get1.addColumn(FAMILY, QUALIFIER);
Expand All @@ -189,11 +186,27 @@ public void testMultiTimeout() {
List<Get> gets = new ArrayList<>();
gets.add(get1);
gets.add(get2);
try {
TABLE.batch(gets, new Object[2]);
Assert.fail("should not reach here");
} catch (Exception e) {
Assert.assertTrue(e instanceof SocketTimeoutException);
assertMultiException(() -> TABLE.batch(gets, new Object[2]));
}

/**
* AsyncProcess has an overall waitUntilDone with a timeout, and if all callables dont finish by
* then it throws a SocketTimeoutException. The callables themselves also try to honor the
* operation timeout and result in OperationTimeoutExceededException (wrapped in
* RetriesExhausted). The latter is the more user-friendly exception because it contains details
* about which server has issues, etc. For now we need to account for both because it's sort of a
* race to see which timeout exceeds first. Maybe we can replace the waitUntilDone behavior with
* an interrupt in the future so we can further unify.
*/
private void assertMultiException(ThrowingRunnable runnable) {
IOException e = Assert.assertThrows(IOException.class, runnable);
if (e instanceof SocketTimeoutException) {
return;
}
Assert.assertTrue("Expected SocketTimeoutException or RetriesExhaustedWithDetailsException"
+ " but was " + e.getClass(), e instanceof RetriesExhaustedWithDetailsException);
for (Throwable cause : ((RetriesExhaustedWithDetailsException) e).getCauses()) {
Assert.assertEquals(OperationTimeoutExceededException.class, cause.getClass());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
request.run(table);
return false;
} catch (RetriesExhaustedWithDetailsException e) {
success = e.getCauses().stream().allMatch(t -> t instanceof RpcThrottlingException);
success = e.getCauses().stream().allMatch(t -> t instanceof RpcThrottlingException
|| t.getCause() instanceof RpcThrottlingException);
ex = e;
} catch (Exception e) {
success = e.getCause() instanceof RpcThrottlingException;
Expand Down

0 comments on commit 4a7a175

Please sign in to comment.