Skip to content

Commit b60edad

Browse files
committed
HBASE-22267 Implement client push back for async client
1 parent f30d6c9 commit b60edad

File tree

11 files changed

+472
-247
lines changed

11 files changed

+472
-247
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 80 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import org.apache.hadoop.hbase.TableName;
5555
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
5656
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
57+
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
58+
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
5759
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
5860
import org.apache.hadoop.hbase.util.Bytes;
5961
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -134,6 +136,10 @@ public void addAction(HRegionLocation loc, Action action) {
134136
() -> new RegionRequest(loc)).actions.add(action);
135137
}
136138

139+
public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
140+
actionsByRegion.put(regionName, regionReq);
141+
}
142+
137143
public int getPriority() {
138144
return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
139145
.mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
@@ -298,6 +304,8 @@ private void onComplete(Action action, RegionRequest regionReq, int tries, Serve
298304

299305
private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
300306
ServerName serverName, MultiResponse resp) {
307+
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
308+
serverName, resp);
301309
List<Action> failedActions = new ArrayList<>();
302310
MutableBoolean retryImmediately = new MutableBoolean(false);
303311
actionsByRegion.forEach((rn, regionReq) -> {
@@ -333,55 +341,88 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
333341
}
334342
}
335343

336-
private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {
344+
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
337345
long remainingNs;
338346
if (operationTimeoutNs > 0) {
339347
remainingNs = remainingTimeNs();
340348
if (remainingNs <= 0) {
341-
failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
342-
.flatMap(r -> r.actions.stream()), tries);
349+
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
350+
tries);
343351
return;
344352
}
345353
} else {
346354
remainingNs = Long.MAX_VALUE;
347355
}
348-
actionsByServer.forEach((sn, serverReq) -> {
349-
ClientService.Interface stub;
350-
try {
351-
stub = conn.getRegionServerStub(sn);
352-
} catch (IOException e) {
353-
onError(serverReq.actionsByRegion, tries, e, sn);
354-
return;
355-
}
356-
ClientProtos.MultiRequest req;
357-
List<CellScannable> cells = new ArrayList<>();
358-
// Map from a created RegionAction to the original index for a RowMutations within
359-
// the original list of actions. This will be used to process the results when there
360-
// is RowMutations in the action list.
361-
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
362-
try {
363-
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
364-
} catch (IOException e) {
365-
onError(serverReq.actionsByRegion, tries, e, sn);
366-
return;
367-
}
368-
HBaseRpcController controller = conn.rpcControllerFactory.newController();
369-
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
370-
calcPriority(serverReq.getPriority(), tableName));
371-
if (!cells.isEmpty()) {
372-
controller.setCellScanner(createCellScanner(cells));
356+
ClientService.Interface stub;
357+
try {
358+
stub = conn.getRegionServerStub(serverName);
359+
} catch (IOException e) {
360+
onError(serverReq.actionsByRegion, tries, e, serverName);
361+
return;
362+
}
363+
ClientProtos.MultiRequest req;
364+
List<CellScannable> cells = new ArrayList<>();
365+
// Map from a created RegionAction to the original index for a RowMutations within
366+
// the original list of actions. This will be used to process the results when there
367+
// is RowMutations in the action list.
368+
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
369+
try {
370+
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
371+
} catch (IOException e) {
372+
onError(serverReq.actionsByRegion, tries, e, serverName);
373+
return;
374+
}
375+
HBaseRpcController controller = conn.rpcControllerFactory.newController();
376+
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
377+
calcPriority(serverReq.getPriority(), tableName));
378+
if (!cells.isEmpty()) {
379+
controller.setCellScanner(createCellScanner(cells));
380+
}
381+
stub.multi(controller, req, resp -> {
382+
if (controller.failed()) {
383+
onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
384+
} else {
385+
try {
386+
onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
387+
rowMutationsIndexMap, resp, controller.cellScanner()));
388+
} catch (Exception e) {
389+
onError(serverReq.actionsByRegion, tries, e, serverName);
390+
return;
391+
}
373392
}
374-
stub.multi(controller, req, resp -> {
375-
if (controller.failed()) {
376-
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
393+
});
394+
}
395+
396+
// We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
397+
// based on the load of the region server and the region.
398+
private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
399+
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
400+
Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
401+
if (!optStats.isPresent()) {
402+
actionsByServer.forEach((serverName, serverReq) -> {
403+
metrics.ifPresent(MetricsConnection::incrNormalRunners);
404+
sendToServer(serverName, serverReq, tries);
405+
});
406+
return;
407+
}
408+
ServerStatisticTracker stats = optStats.get();
409+
ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
410+
actionsByServer.forEach((serverName, serverReq) -> {
411+
ServerStatistics serverStats = stats.getStats(serverName);
412+
Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
413+
serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
414+
long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
415+
groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
416+
.setRegionRequest(regionName, regionReq);
417+
});
418+
groupByBackoff.forEach((backoff, sr) -> {
419+
if (backoff > 0) {
420+
metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
421+
retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
422+
TimeUnit.MILLISECONDS);
377423
} else {
378-
try {
379-
onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
380-
rowMutationsIndexMap, resp, controller.cellScanner()));
381-
} catch (Exception e) {
382-
onError(serverReq.actionsByRegion, tries, e, sn);
383-
return;
384-
}
424+
metrics.ifPresent(MetricsConnection::incrNormalRunners);
425+
sendToServer(serverName, sr, tries);
385426
}
386427
});
387428
});
@@ -454,7 +495,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
454495
}))
455496
.toArray(CompletableFuture[]::new)), (v, r) -> {
456497
if (!actionsByServer.isEmpty()) {
457-
send(actionsByServer, tries);
498+
sendOrDelay(actionsByServer, tries);
458499
}
459500
if (!locateFailed.isEmpty()) {
460501
tryResubmit(locateFailed.stream(), tries, false);

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.hadoop.hbase.MasterNotRunningException;
3939
import org.apache.hadoop.hbase.ServerName;
4040
import org.apache.hadoop.hbase.TableName;
41+
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
42+
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
4143
import org.apache.hadoop.hbase.ipc.RpcClient;
4244
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
4345
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -101,6 +103,9 @@ class AsyncConnectionImpl implements AsyncConnection {
101103
private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
102104
new AtomicReference<>();
103105

106+
private final Optional<ServerStatisticTracker> stats;
107+
private final ClientBackoffPolicy backoffPolicy;
108+
104109
private ChoreService authService;
105110

106111
private volatile boolean closed = false;
@@ -133,6 +138,8 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
133138
} else {
134139
nonceGenerator = NO_NONCE_GENERATOR;
135140
}
141+
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
142+
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
136143
}
137144

138145
private void spawnRenewalChore(final UserGroupInformation user) {
@@ -233,6 +240,14 @@ void clearMasterStubCache(MasterService.Interface stub) {
233240
masterStub.compareAndSet(stub, null);
234241
}
235242

243+
Optional<ServerStatisticTracker> getStatisticsTracker() {
244+
return stats;
245+
}
246+
247+
ClientBackoffPolicy getBackoffPolicy() {
248+
return backoffPolicy;
249+
}
250+
236251
@Override
237252
public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
238253
return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Optional;
3132
import java.util.Set;
3233
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.concurrent.ExecutorService;
@@ -55,9 +56,6 @@
5556

5657
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
5758

58-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
59-
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
60-
6159
/**
6260
* The context, and return value, for a single submit/submitAll call.
6361
* Note on how this class (one AP submit) works. Initially, all requests are split into groups
@@ -614,8 +612,8 @@ private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName serv
614612
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
615613
runnable = runner;
616614
if (asyncProcess.connection.getConnectionMetrics() != null) {
617-
asyncProcess.connection.getConnectionMetrics().incrDelayRunners();
618-
asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
615+
asyncProcess.connection.getConnectionMetrics()
616+
.incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
619617
}
620618
} else {
621619
if (asyncProcess.connection.getConnectionMetrics() != null) {
@@ -802,19 +800,16 @@ private void logNoResubmit(ServerName oldServer, int numAttempt,
802800
* @param responses - the response, if any
803801
* @param numAttempt - the attempt
804802
*/
805-
private void receiveMultiAction(MultiAction multiAction,
806-
ServerName server, MultiResponse responses, int numAttempt) {
803+
private void receiveMultiAction(MultiAction multiAction, ServerName server,
804+
MultiResponse responses, int numAttempt) {
807805
assert responses != null;
808-
809-
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
810-
updateStats(server, results);
811-
806+
updateStats(server, responses);
812807
// Success or partial success
813808
// Analyze detailed results. We can still have individual failures to be redo.
814809
// two specific throwables are managed:
815810
// - DoNotRetryIOException: we continue to retry for other actions
816811
// - RegionMovedException: we update the cache with the new region location
817-
812+
Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
818813
List<Action> toReplay = new ArrayList<>();
819814
Throwable lastException = null;
820815
int failureCount = 0;
@@ -926,26 +921,9 @@ private void cleanServerCache(ServerName server, Throwable regionException) {
926921
}
927922

928923
@VisibleForTesting
929-
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
930-
boolean metrics = asyncProcess.connection.getConnectionMetrics() != null;
931-
boolean stats = asyncProcess.connection.getStatisticsTracker() != null;
932-
if (!stats && !metrics) {
933-
return;
934-
}
935-
for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
936-
byte[] regionName = regionStats.getKey();
937-
ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
938-
if (stat == null) {
939-
LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
940-
+ ", region=" + Bytes.toStringBinary(regionName));
941-
continue;
942-
}
943-
RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat);
944-
ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server,
945-
regionName, regionLoadstats);
946-
ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
947-
server, regionName, regionLoadstats);
948-
}
924+
protected void updateStats(ServerName server, MultiResponse resp) {
925+
ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
926+
Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
949927
}
950928

951929

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@
6262
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
6363
import org.apache.hbase.thirdparty.io.netty.util.Timer;
6464

65+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
6566
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
6667
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
68+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
6769
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
6870
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
6971
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -672,4 +674,25 @@ static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
672674
}
673675
}
674676
}
677+
678+
static void updateStats(Optional<ServerStatisticTracker> optStats,
679+
Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) {
680+
if (!optStats.isPresent() && !optMetrics.isPresent()) {
681+
// ServerStatisticTracker and MetricsConnection are both not present, just return
682+
return;
683+
}
684+
resp.getResults().forEach((regionName, regionResult) -> {
685+
ClientProtos.RegionLoadStats stat = regionResult.getStat();
686+
if (stat == null) {
687+
LOG.error("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName,
688+
Bytes.toStringBinary(regionName));
689+
return;
690+
}
691+
RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat);
692+
optStats.ifPresent(
693+
stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats));
694+
optMetrics.ifPresent(
695+
metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats));
696+
});
697+
}
675698
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,9 @@ public void incrNormalRunners() {
421421
this.runnerStats.incrNormalRunners();
422422
}
423423

424-
/** Increment the number of delay runner counts. */
425-
public void incrDelayRunners() {
424+
/** Increment the number of delay runner counts and update delay interval of delay runner. */
425+
public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
426426
this.runnerStats.incrDelayRunners();
427-
}
428-
429-
/** Update delay interval of delay runner. */
430-
public void updateDelayInterval(long interval) {
431427
this.runnerStats.updateDelayInterval(interval);
432428
}
433429

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
357357
preCheck();
358358
return RawAsyncTableImpl.this
359359
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
360-
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
361-
stub, mutation,
360+
.action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
361+
loc, stub, mutation,
362362
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
363363
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
364364
resp -> resp.getExists()))
@@ -373,7 +373,7 @@ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
373373

374374
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
375375
// so here I write a new method as I do not want to change the abstraction of call method.
376-
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
376+
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
377377
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
378378
Converter<MultiRequest, byte[], RowMutations> reqConvert,
379379
Function<Result, RESP> respConverter) {
@@ -391,6 +391,8 @@ public void run(MultiResponse resp) {
391391
try {
392392
org.apache.hadoop.hbase.client.MultiResponse multiResp =
393393
ResponseConverter.getResults(req, resp, controller.cellScanner());
394+
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
395+
loc.getServerName(), multiResp);
394396
Throwable ex = multiResp.getException(regionName);
395397
if (ex != null) {
396398
future.completeExceptionally(ex instanceof IOException ? ex
@@ -415,8 +417,8 @@ public void run(MultiResponse resp) {
415417
@Override
416418
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
417419
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
418-
.action((controller, loc, stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub,
419-
mutation, (rn, rm) -> {
420+
.action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation,
421+
(rn, rm) -> {
420422
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
421423
regionMutationBuilder.setAtomic(true);
422424
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();

0 commit comments

Comments
 (0)