Skip to content

Commit a35c96f

Browse files
authored
HBASE-27657 Connection and Request Attributes (#5332)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
1 parent 0f2d5c1 commit a35c96f

File tree

86 files changed

+1033
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+1033
-299
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
2121

2222
import java.io.IOException;
23+
import java.util.Collections;
2324
import java.util.concurrent.CompletableFuture;
2425
import org.apache.hadoop.hbase.ServerName;
2526
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -44,7 +45,7 @@ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl con
4445
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
4546
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
4647
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
47-
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
48+
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
4849
this.serverName = serverName;
4950
this.callable = callable;
5051
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ class AsyncBatchRpcRetryingCaller<T> {
114114

115115
private final HBaseServerExceptionPauseManager pauseManager;
116116

117+
private final Map<String, byte[]> requestAttributes;
118+
117119
// we can not use HRegionLocation as the map key because the hashCode and equals method of
118120
// HRegionLocation only consider serverName.
119121
private static final class RegionRequest {
@@ -149,7 +151,8 @@ public int getPriority() {
149151

150152
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
151153
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
152-
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
154+
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
155+
Map<String, byte[]> requestAttributes) {
153156
this.retryTimer = retryTimer;
154157
this.conn = conn;
155158
this.tableName = tableName;
@@ -180,6 +183,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
180183
}
181184
this.action2Errors = new IdentityHashMap<>();
182185
this.startNs = System.nanoTime();
186+
this.requestAttributes = requestAttributes;
183187
}
184188

185189
private static boolean hasIncrementOrAppend(Row action) {
@@ -392,6 +396,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
392396
HBaseRpcController controller = conn.rpcControllerFactory.newController();
393397
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
394398
calcPriority(serverReq.getPriority(), tableName));
399+
controller.setRequestAttributes(requestAttributes);
395400
if (!cells.isEmpty()) {
396401
controller.setCellScanner(createCellScanner(cells));
397402
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.opentelemetry.api.trace.StatusCode;
3333
import io.opentelemetry.context.Scope;
3434
import java.io.IOException;
35+
import java.util.Map;
3536
import java.util.concurrent.CompletableFuture;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicInteger;
@@ -92,9 +93,12 @@ class AsyncClientScanner {
9293

9394
private final Span span;
9495

96+
private final Map<String, byte[]> requestAttributes;
97+
9598
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
9699
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
97-
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
100+
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
101+
Map<String, byte[]> requestAttributes) {
98102
if (scan.getStartRow() == null) {
99103
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
100104
}
@@ -113,6 +117,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
113117
this.rpcTimeoutNs = rpcTimeoutNs;
114118
this.startLogErrorsCnt = startLogErrorsCnt;
115119
this.resultCache = createScanResultCache(scan);
120+
this.requestAttributes = requestAttributes;
116121
if (scan.isScanMetricsEnabled()) {
117122
this.scanMetrics = new ScanMetrics();
118123
consumer.onScanMetricsCreated(scanMetrics);
@@ -191,15 +196,17 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
191196
}
192197

193198
private void startScan(OpenScannerResponse resp) {
194-
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
195-
.location(resp.loc).remote(resp.isRegionServerRemote)
196-
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
197-
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
198-
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
199-
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
200-
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
201-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
202-
.start(resp.controller, resp.resp), (hasMore, error) -> {
199+
addListener(
200+
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
201+
.remote(resp.isRegionServerRemote)
202+
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
203+
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
204+
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
205+
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
206+
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
207+
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
208+
.setRequestAttributes(requestAttributes).start(resp.controller, resp.resp),
209+
(hasMore, error) -> {
203210
try (Scope ignored = span.makeCurrent()) {
204211
if (error != null) {
205212
try {
@@ -231,8 +238,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
231238
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
232239
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
233240
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
234-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
235-
.call();
241+
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
242+
.setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
236243
}
237244
}
238245

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import io.opentelemetry.api.trace.Span;
3232
import java.io.IOException;
33+
import java.util.Collections;
34+
import java.util.Map;
3335
import java.util.Optional;
3436
import java.util.concurrent.CompletableFuture;
3537
import java.util.concurrent.ConcurrentHashMap;
@@ -122,6 +124,11 @@ public class AsyncConnectionImpl implements AsyncConnection {
122124

123125
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
124126
User user) {
127+
this(conf, registry, clusterId, user, Collections.emptyMap());
128+
}
129+
130+
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
131+
User user, Map<String, byte[]> connectionAttributes) {
125132
this.conf = conf;
126133
this.user = user;
127134
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
@@ -137,7 +144,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
137144
} else {
138145
this.metrics = Optional.empty();
139146
}
140-
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
147+
this.rpcClient =
148+
RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null), connectionAttributes);
141149
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
142150
this.rpcTimeout =
143151
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
2121

22+
import java.util.Collections;
2223
import java.util.concurrent.CompletableFuture;
2324
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
2425
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -47,7 +48,7 @@ public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl
4748
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
4849
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
4950
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
50-
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
51+
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
5152
this.callable = callable;
5253
}
5354

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Collections;
2021
import java.util.Iterator;
2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.concurrent.ExecutorService;
2325
import org.apache.hadoop.hbase.TableName;
2426
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -63,6 +65,7 @@ public static class Builder<T> {
6365
private int operationTimeout;
6466
private CancellableRegionServerCallable callable;
6567
private Object[] results;
68+
private Map<String, byte[]> requestAttributes = Collections.emptyMap();
6669

6770
private Builder() {
6871
}
@@ -124,9 +127,14 @@ Builder<T> setCallable(CancellableRegionServerCallable callable) {
124127
return this;
125128
}
126129

130+
Builder<T> setRequestAttributes(Map<String, byte[]> requestAttributes) {
131+
this.requestAttributes = requestAttributes;
132+
return this;
133+
}
134+
127135
public AsyncProcessTask<T> build() {
128136
return new AsyncProcessTask<>(pool, tableName, rows, submittedRows, callback, callable,
129-
needResults, rpcTimeout, operationTimeout, results);
137+
needResults, rpcTimeout, operationTimeout, results, requestAttributes);
130138
}
131139
}
132140

@@ -140,16 +148,18 @@ public AsyncProcessTask<T> build() {
140148
private final int rpcTimeout;
141149
private final int operationTimeout;
142150
private final Object[] results;
151+
private final Map<String, byte[]> requestAttributes;
143152

144153
AsyncProcessTask(AsyncProcessTask<T> task) {
145154
this(task.getPool(), task.getTableName(), task.getRowAccess(), task.getSubmittedRows(),
146155
task.getCallback(), task.getCallable(), task.getNeedResults(), task.getRpcTimeout(),
147-
task.getOperationTimeout(), task.getResults());
156+
task.getOperationTimeout(), task.getResults(), task.getRequestAttributes());
148157
}
149158

150159
AsyncProcessTask(ExecutorService pool, TableName tableName, RowAccess<? extends Row> rows,
151160
SubmittedRows size, Batch.Callback<T> callback, CancellableRegionServerCallable callable,
152-
boolean needResults, int rpcTimeout, int operationTimeout, Object[] results) {
161+
boolean needResults, int rpcTimeout, int operationTimeout, Object[] results,
162+
Map<String, byte[]> requestAttributes) {
153163
this.pool = pool;
154164
this.tableName = tableName;
155165
this.rows = rows;
@@ -160,6 +170,7 @@ public AsyncProcessTask<T> build() {
160170
this.rpcTimeout = rpcTimeout;
161171
this.operationTimeout = operationTimeout;
162172
this.results = results;
173+
this.requestAttributes = requestAttributes;
163174
}
164175

165176
public int getOperationTimeout() {
@@ -190,6 +201,10 @@ CancellableRegionServerCallable getCallable() {
190201
return callable;
191202
}
192203

204+
public Map<String, byte[]> getRequestAttributes() {
205+
return requestAttributes;
206+
}
207+
193208
Object[] getResults() {
194209
return results;
195210
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ public void run() {
284284
private final int operationTimeout;
285285
private final int rpcTimeout;
286286
private final AsyncProcess asyncProcess;
287+
private final Map<String, byte[]> requestAttributes;
287288

288289
/**
289290
* For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
@@ -398,6 +399,7 @@ public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long
398399
if (task.getCallable() == null) {
399400
tracker = new RetryingTimeTracker().start();
400401
}
402+
this.requestAttributes = task.getRequestAttributes();
401403
}
402404

403405
protected Set<CancellableRegionServerCallable> getCallsInProgress() {
@@ -1316,7 +1318,8 @@ private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
13161318
private MultiServerCallable createCallable(final ServerName server, TableName tableName,
13171319
final MultiAction multi) {
13181320
return new MultiServerCallable(asyncProcess.connection, tableName, server, multi,
1319-
asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
1321+
asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority(),
1322+
requestAttributes);
13201323
}
13211324

13221325
private void updateResult(int index, Object result) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Optional;
2627
import java.util.OptionalLong;
2728
import java.util.concurrent.CompletableFuture;
@@ -78,7 +79,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
7879

7980
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
8081
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
81-
long rpcTimeoutNs, int startLogErrorsCnt) {
82+
long rpcTimeoutNs, int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
8283
this.retryTimer = retryTimer;
8384
this.conn = conn;
8485
this.priority = priority;
@@ -89,6 +90,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
8990
this.future = new CompletableFuture<>();
9091
this.controller = conn.rpcControllerFactory.newController();
9192
this.controller.setPriority(priority);
93+
this.controller.setRequestAttributes(requestAttributes);
9294
this.exceptions = new ArrayList<>();
9395
this.startNs = System.nanoTime();
9496
this.pauseManager =

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
2424
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
2525

26+
import java.util.Collections;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.TimeUnit;
2931
import org.apache.hadoop.hbase.HRegionLocation;
@@ -83,6 +85,8 @@ public class SingleRequestCallerBuilder<T> extends BuilderBase {
8385

8486
private int priority = PRIORITY_UNSET;
8587

88+
private Map<String, byte[]> requestAttributes = Collections.emptyMap();
89+
8690
public SingleRequestCallerBuilder<T> table(TableName tableName) {
8791
this.tableName = tableName;
8892
return this;
@@ -144,6 +148,12 @@ public SingleRequestCallerBuilder<T> priority(int priority) {
144148
return this;
145149
}
146150

151+
public SingleRequestCallerBuilder<T>
152+
setRequestAttributes(Map<String, byte[]> requestAttributes) {
153+
this.requestAttributes = requestAttributes;
154+
return this;
155+
}
156+
147157
private void preCheck() {
148158
checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
149159
checkNotNull(tableName, "tableName is null");
@@ -157,7 +167,7 @@ public AsyncSingleRequestRpcRetryingCaller<T> build() {
157167
preCheck();
158168
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
159169
locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
160-
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
170+
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
161171
}
162172

163173
/**
@@ -201,6 +211,8 @@ public class ScanSingleRegionCallerBuilder extends BuilderBase {
201211

202212
private int priority = PRIORITY_UNSET;
203213

214+
private Map<String, byte[]> requestAttributes = Collections.emptyMap();
215+
204216
public ScanSingleRegionCallerBuilder id(long scannerId) {
205217
this.scannerId = scannerId;
206218
return this;
@@ -278,6 +290,12 @@ public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
278290
return this;
279291
}
280292

293+
public ScanSingleRegionCallerBuilder
294+
setRequestAttributes(Map<String, byte[]> requestAttributes) {
295+
this.requestAttributes = requestAttributes;
296+
return this;
297+
}
298+
281299
private void preCheck() {
282300
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
283301
checkNotNull(scan, "scan is null");
@@ -293,7 +311,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() {
293311
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
294312
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
295313
scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
296-
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
314+
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
297315
}
298316

299317
/**
@@ -322,6 +340,8 @@ public class BatchCallerBuilder extends BuilderBase {
322340

323341
private long rpcTimeoutNs = -1L;
324342

343+
private Map<String, byte[]> requestAttributes = Collections.emptyMap();
344+
325345
public BatchCallerBuilder table(TableName tableName) {
326346
this.tableName = tableName;
327347
return this;
@@ -362,10 +382,15 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
362382
return this;
363383
}
364384

385+
public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
386+
this.requestAttributes = requestAttributes;
387+
return this;
388+
}
389+
365390
public <T> AsyncBatchRpcRetryingCaller<T> build() {
366391
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
367392
pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
368-
startLogErrorsCnt);
393+
startLogErrorsCnt, requestAttributes);
369394
}
370395

371396
public <T> List<CompletableFuture<T>> call() {

0 commit comments

Comments
 (0)