Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,19 +27,21 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
Expand Down Expand Up @@ -85,6 +87,8 @@ class AsyncClientScanner {

private final ScanResultCache resultCache;

private final Span span;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
Expand Down Expand Up @@ -112,6 +116,21 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
} else {
this.scanMetrics = null;
}

/*
* Assumes that the `start()` method is called immediately after construction. If this is no
* longer the case, for tracing correctness, we should move the start of the span into the
* `start()` method. The cost of doing so would be making access to the `span` safe for
* concurrent threads.
*/
span = new TableOperationSpanBuilder(conn)
.setTableName(tableName)
.setOperation(scan)
.build();
if (consumer instanceof AsyncTableResultScanner) {
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
scanner.setSpan(span);
}
}

private static final class OpenScannerResponse {
Expand Down Expand Up @@ -140,26 +159,35 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In

private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
try (Scope ignored = span.makeCurrent()) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(
loc.getRegion().getRegionName(), scan, scan.getCaching(), false);
stub.scan(controller, request, resp -> {
try (Scope ignored1 = span.makeCurrent()) {
if (controller.failed()) {
final IOException e = controller.getFailed();
future.completeExceptionally(e);
TraceUtil.setError(span, e);
span.end();
return;
}
future.complete(new OpenScannerResponse(
loc, isRegionServerRemote, stub, controller, resp));
}
});
} catch (IOException e) {
// span is closed by listener attached to the Future in `openScanner()`
future.completeExceptionally(e);
}
return future;
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}

private void startScan(OpenScannerResponse resp) {
Expand All @@ -173,26 +201,40 @@ private void startScan(OpenScannerResponse resp) {
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
if (hasMore) {
openScanner();
} else {
try {
consumer.onComplete();
} finally {
span.setStatus(StatusCode.OK);
span.end();
}
}
}
});
}

private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
try (Scope ignored = span.makeCurrent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to also maintain your volatile usage discipline here and copy 'span' to a 'localSpan' first, as you have done at other call sites?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do so here because the member field is accessed only once in the method lifetime, while the other call sites made multiple references to the field. I can use the same pattern here, sure, it won't hurt anything.

return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}
}

private long getPrimaryTimeoutNs() {
Expand All @@ -206,15 +248,24 @@ private void openScanner() {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
startScan(resp);
}
startScan(resp);
});
}

public void start() {
openScanner();
try (Scope ignored = span.makeCurrent()) {
openScanner();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -50,11 +51,9 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
Expand Down Expand Up @@ -573,7 +572,12 @@ private void call() {
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
final Context context = Context.current();
stub.scan(controller, req, resp -> {
try (Scope ignored = context.makeCurrent()) {
onComplete(controller, resp);
}
});
}

private void next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.client;

import static java.util.stream.Collectors.toList;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -231,22 +233,29 @@ public ResultScanner getScanner(Scan scan) {
}

private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Span span = null;
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
Copy link
Contributor

@apurtell apurtell Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be null? I guess not as long as someone is aware of getter/setter for span in AsyncTableResultScanner and the expected convention. I suppose the resulting NPE would clear enough if not.
And ditto other call sites. I might have thrown something with an explicit message about failure to maintain the code discipline but it seems fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an earlier version of this patch, I hade null-protection sprinkled throughout this method. IntelliJ static analysis annotated it all as unnecessary. If you're also wondering... I'll trust two devs over the tool. Let me bring it back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to throw because a disruption in tracing should not hinder application execution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm simplifying this according to the hints provided by the IDE. I cannot make the field final because the compiler errors in the catch block that the field hasn't been initialized, but static analysis agrees with @Apache9 that the field can never be null.

try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null; ) {
if (!consumer.onNext(result)) {
break;
}
}
consumer.onComplete();
}
consumer.onComplete();
} catch (IOException e) {
consumer.onError(e);
try (Scope ignored = span.makeCurrent()) {
consumer.onError(e);
}
}
}

@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
final Context context = Context.current();
pool.execute(context.wrap(() -> scan0(scan, consumer)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;

import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum

private ScanResumer resumer;

// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
private Span span = null;

public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
this.tableName = tableName;
this.maxCacheSize = maxCacheSize;
Expand All @@ -79,6 +82,14 @@ private void stopPrefetch(ScanController controller) {
resumer = controller.suspend();
}

Span getSpan() {
return span;
}

void setSpan(final Span span) {
this.span = span;
}

@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,30 +640,26 @@ public AsyncTableResultScanner getScanner(Scan scan) {

@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {

@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}

@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}

@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}, supplier);
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}

@Override
Expand Down
Loading