Skip to content

Commit 9413660

Browse files
committed
HubSpot Backport: HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner next calls (apache#5402)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 533fa3a commit 9413660

File tree

10 files changed

+247
-62
lines changed

10 files changed

+247
-62
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN
6666
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
6767
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
6868
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
69-
Map<String, byte[]> requestAttributes) throws IOException {
69+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
70+
throws IOException {
7071
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
71-
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
72+
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
73+
connectionConfiguration, requestAttributes);
7274
exceptionsQueue = new ConcurrentLinkedQueue<>();
7375
final Context context = Context.current();
7476
final Runnable runnable = context.wrap(new PrefetchRunnable());

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.commons.lang3.mutable.MutableBoolean;
3434
import org.apache.hadoop.conf.Configuration;
3535
import org.apache.hadoop.hbase.DoNotRetryIOException;
36-
import org.apache.hadoop.hbase.HConstants;
3736
import org.apache.hadoop.hbase.HRegionInfo;
3837
import org.apache.hadoop.hbase.NotServingRegionException;
3938
import org.apache.hadoop.hbase.TableName;
@@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
7877
protected final TableName tableName;
7978
protected final int readRpcTimeout;
8079
protected final int scannerTimeout;
80+
private final boolean useScannerTimeoutForNextCalls;
8181
protected boolean scanMetricsPublished = false;
8282
protected RpcRetryingCaller<Result[]> caller;
8383
protected RpcControllerFactory rpcControllerFactory;
@@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
104104
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
105105
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
106106
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
107-
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
107+
int scannerTimeout, int primaryOperationTimeout,
108+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
108109
throws IOException {
109110
if (LOG.isTraceEnabled()) {
110111
LOG.trace(
@@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
116117
this.connection = connection;
117118
this.pool = pool;
118119
this.primaryOperationTimeout = primaryOperationTimeout;
119-
this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
120-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
120+
this.retries = connectionConfiguration.getRetriesNumber();
121121
if (scan.getMaxResultSize() > 0) {
122122
this.maxScannerResultSize = scan.getMaxResultSize();
123123
} else {
124-
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
125-
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
124+
this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize();
126125
}
127126
this.readRpcTimeout = scanReadRpcTimeout;
128127
this.scannerTimeout = scannerTimeout;
128+
this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls();
129129
this.requestAttributes = requestAttributes;
130130

131131
// check if application wants to collect scan metrics
@@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
135135
if (this.scan.getCaching() > 0) {
136136
this.caching = this.scan.getCaching();
137137
} else {
138-
this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
139-
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
138+
this.caching = connectionConfiguration.getScannerCaching();
140139
}
141140

142141
this.caller = rpcFactory.<Result[]> newCaller();
@@ -255,7 +254,7 @@ protected boolean moveToNextRegion() {
255254
this.currentRegion = null;
256255
this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
257256
createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
258-
scannerTimeout, caching, conf, caller);
257+
scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
259258
this.callable.setCaching(this.caching);
260259
incRegionCountMetrics(scanMetrics);
261260
return true;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam
3939
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
4040
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
4141
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
42-
Map<String, byte[]> requestAttributes) throws IOException {
42+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
43+
throws IOException {
4344
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
44-
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
45+
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
46+
connectionConfiguration, requestAttributes);
4547
}
4648

4749
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ public class ConnectionConfiguration {
7676
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
7777
"hbase.client.meta.scanner.timeout.period";
7878

79+
public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS =
80+
"hbase.client.use.scanner.timeout.period.for.next.calls";
81+
82+
public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT =
83+
false;
84+
7985
private final long writeBufferSize;
8086
private final long writeBufferPeriodicFlushTimeoutMs;
8187
private final long writeBufferPeriodicFlushTimerTickMs;
@@ -99,6 +105,7 @@ public class ConnectionConfiguration {
99105
private final boolean clientScannerAsyncPrefetch;
100106
private final long pauseMs;
101107
private final long pauseMsForServerOverloaded;
108+
private final boolean useScannerTimeoutForNextCalls;
102109

103110
/**
104111
* Constructor
@@ -158,6 +165,9 @@ public class ConnectionConfiguration {
158165
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
159166

160167
this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);
168+
this.useScannerTimeoutForNextCalls =
169+
conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
170+
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT);
161171

162172
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
163173
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
@@ -201,6 +211,8 @@ protected ConnectionConfiguration() {
201211
this.metaScanTimeout = scanTimeout;
202212
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
203213
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
214+
this.useScannerTimeoutForNextCalls =
215+
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT;
204216
}
205217

206218
public int getReadRpcTimeout() {
@@ -275,6 +287,10 @@ public int getScanTimeout() {
275287
return scanTimeout;
276288
}
277289

290+
public boolean isUseScannerTimeoutForNextCalls() {
291+
return useScannerTimeoutForNextCalls;
292+
}
293+
278294
public int getMetaScanTimeout() {
279295
return metaScanTimeout;
280296
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,7 +986,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
986986
ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME,
987987
this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
988988
connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(),
989-
metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) {
989+
metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) {
990990
boolean tableNotFound = true;
991991
for (;;) {
992992
Result regionInfoRow = rcs.next();

hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,16 +325,16 @@ public ResultScanner getScanner(Scan scan) throws IOException {
325325
if (scan.isReversed()) {
326326
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
327327
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
328-
replicaTimeout, requestAttributes);
328+
replicaTimeout, connConfiguration, requestAttributes);
329329
} else {
330330
if (async) {
331331
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
332332
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
333-
replicaTimeout, requestAttributes);
333+
replicaTimeout, connConfiguration, requestAttributes);
334334
} else {
335335
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
336336
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
337-
replicaTimeout, requestAttributes);
337+
replicaTimeout, connConfiguration, requestAttributes);
338338
}
339339
}
340340
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner {
4040
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
4141
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
4242
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
43-
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
43+
int scannerTimeout, int primaryOperationTimeout,
44+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
4445
throws IOException {
4546
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
46-
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes);
47+
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration,
48+
requestAttributes);
4749
}
4850

4951
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
5757
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
5858
final ClusterConnection cConnection;
5959
protected final ExecutorService pool;
60+
private final boolean useScannerTimeoutForNextCalls;
6061
protected final int timeBeforeReplicas;
6162
private final Scan scan;
6263
private final int retries;
@@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
7273

7374
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
7475
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
75-
int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
76-
RpcRetryingCaller<Result[]> caller) {
76+
int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls,
77+
int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
7778
this.currentScannerCallable = baseCallable;
7879
this.cConnection = cConnection;
7980
this.pool = pool;
81+
this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
8082
if (timeBeforeReplicas < 0) {
8183
throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
8284
}
@@ -184,9 +186,12 @@ public Result[] call(int timeout) throws IOException {
184186
pool, regionReplication * 5);
185187

186188
AtomicBoolean done = new AtomicBoolean(false);
189+
// make sure we use the same rpcTimeout for current and other replicas
190+
int rpcTimeoutForCall = getRpcTimeout();
191+
187192
replicaSwitched.set(false);
188193
// submit call for the primary replica or user specified replica
189-
addCallsForCurrentReplica(cs);
194+
addCallsForCurrentReplica(cs, rpcTimeoutForCall);
190195
int startIndex = 0;
191196

192197
try {
@@ -231,7 +236,7 @@ public Result[] call(int timeout) throws IOException {
231236
endIndex = 1;
232237
} else {
233238
// TODO: this may be an overkill for large region replication
234-
addCallsForOtherReplicas(cs, 0, regionReplication - 1);
239+
addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall);
235240
}
236241

237242
try {
@@ -323,15 +328,41 @@ public Cursor getCursor() {
323328
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
324329
}
325330

326-
private void
327-
addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
331+
private void addCallsForCurrentReplica(
332+
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) {
328333
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
329334
outstandingCallables.add(currentScannerCallable);
330-
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
335+
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id);
336+
}
337+
338+
/**
339+
* As we have a call sequence for scan, it is useless to have a different rpc timeout which is
340+
* less than the scan timeout. If the server does not respond in time(usually this will not happen
341+
* as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the
342+
* next request and the only way to fix this is to close the scanner and open a new one.
343+
* <p>
344+
* The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If
345+
* using legacy behavior, we always use that.
346+
* <p>
347+
* If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is
348+
* open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
349+
*/
350+
private int getRpcTimeout() {
351+
if (useScannerTimeoutForNextCalls) {
352+
return isNextCall() ? scannerTimeout : readRpcTimeout;
353+
} else {
354+
return readRpcTimeout;
355+
}
356+
}
357+
358+
private boolean isNextCall() {
359+
return currentScannerCallable != null && currentScannerCallable.scannerId != -1
360+
&& !currentScannerCallable.renew && !currentScannerCallable.closed;
331361
}
332362

333363
private void addCallsForOtherReplicas(
334-
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
364+
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max,
365+
int rpcTimeout) {
335366

336367
for (int id = min; id <= max; id++) {
337368
if (currentScannerCallable.id == id) {
@@ -341,7 +372,7 @@ private void addCallsForOtherReplicas(
341372
setStartRowForReplicaCallable(s);
342373
outstandingCallables.add(s);
343374
RetryingRPC retryingOnReplica = new RetryingRPC(s);
344-
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
375+
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
345376
}
346377
}
347378

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ private static class MockClientScanner extends ClientSimpleScanner {
105105

106106
public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
107107
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
108-
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
109-
throws IOException {
108+
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
109+
ConnectionConfiguration connectionConfig) throws IOException {
110110
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
111111
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
112112
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout,
113-
Collections.emptyMap());
113+
connectionConfig, Collections.emptyMap());
114114
}
115115

116116
@Override
@@ -175,7 +175,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
175175

176176
try (MockClientScanner scanner =
177177
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
178-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
178+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
179179

180180
scanner.setRpcFinished(true);
181181

@@ -239,7 +239,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
239239

240240
try (MockClientScanner scanner =
241241
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
242-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
242+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
243243
InOrder inOrder = Mockito.inOrder(caller);
244244

245245
scanner.loadCache();
@@ -302,7 +302,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
302302

303303
try (MockClientScanner scanner =
304304
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
305-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
305+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
306306
InOrder inOrder = Mockito.inOrder(caller);
307307

308308
scanner.loadCache();
@@ -373,7 +373,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
373373

374374
try (MockClientScanner scanner =
375375
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
376-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
376+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
377377
scanner.setRpcFinished(true);
378378

379379
InOrder inOrder = Mockito.inOrder(caller);
@@ -440,7 +440,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
440440

441441
try (MockClientScanner scanner =
442442
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
443-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
443+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
444444
InOrder inOrder = Mockito.inOrder(caller);
445445
scanner.setRpcFinished(true);
446446

@@ -485,7 +485,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException {
485485

486486
try (MockClientScanner scanner =
487487
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
488-
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
488+
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) {
489489
Iterator<Result> iter = scanner.iterator();
490490
while (iter.hasNext()) {
491491
iter.next();

0 commit comments

Comments
 (0)