-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-26545 Implement tracing of scan #4106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| /** | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
|
|
@@ -27,19 +27,21 @@ | |
| import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; | ||
| import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; | ||
| import static org.apache.hadoop.hbase.util.FutureUtils.addListener; | ||
|
|
||
| import io.opentelemetry.api.trace.Span; | ||
| import io.opentelemetry.api.trace.StatusCode; | ||
| import io.opentelemetry.context.Scope; | ||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import org.apache.hadoop.hbase.HRegionLocation; | ||
| import org.apache.hadoop.hbase.TableName; | ||
| import org.apache.hadoop.hbase.client.metrics.ScanMetrics; | ||
| import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; | ||
| import org.apache.hadoop.hbase.ipc.HBaseRpcController; | ||
| import org.apache.hadoop.hbase.trace.TraceUtil; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
|
|
||
| import org.apache.hbase.thirdparty.io.netty.util.Timer; | ||
|
|
||
| import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; | ||
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; | ||
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; | ||
|
|
@@ -85,6 +87,8 @@ class AsyncClientScanner { | |
|
|
||
| private final ScanResultCache resultCache; | ||
|
|
||
| private final Span span; | ||
|
|
||
| public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, | ||
| AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, | ||
| int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { | ||
|
|
@@ -112,6 +116,21 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN | |
| } else { | ||
| this.scanMetrics = null; | ||
| } | ||
|
|
||
| /* | ||
| * Assumes that the `start()` method is called immediately after construction. If this is no | ||
| * longer the case, for tracing correctness, we should move the start of the span into the | ||
| * `start()` method. The cost of doing so would be making access to the `span` safe for | ||
| * concurrent threads. | ||
| */ | ||
| span = new TableOperationSpanBuilder(conn) | ||
| .setTableName(tableName) | ||
| .setOperation(scan) | ||
| .build(); | ||
| if (consumer instanceof AsyncTableResultScanner) { | ||
| AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer; | ||
| scanner.setSpan(span); | ||
| } | ||
| } | ||
|
|
||
| private static final class OpenScannerResponse { | ||
|
|
@@ -140,26 +159,35 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In | |
|
|
||
| private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller, | ||
| HRegionLocation loc, ClientService.Interface stub) { | ||
| boolean isRegionServerRemote = isRemote(loc.getHostname()); | ||
| incRPCCallsMetrics(scanMetrics, isRegionServerRemote); | ||
| if (openScannerTries.getAndIncrement() > 1) { | ||
| incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); | ||
| try (Scope ignored = span.makeCurrent()) { | ||
| boolean isRegionServerRemote = isRemote(loc.getHostname()); | ||
| incRPCCallsMetrics(scanMetrics, isRegionServerRemote); | ||
| if (openScannerTries.getAndIncrement() > 1) { | ||
| incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); | ||
| } | ||
| CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); | ||
| try { | ||
| ScanRequest request = RequestConverter.buildScanRequest( | ||
| loc.getRegion().getRegionName(), scan, scan.getCaching(), false); | ||
| stub.scan(controller, request, resp -> { | ||
| try (Scope ignored1 = span.makeCurrent()) { | ||
| if (controller.failed()) { | ||
| final IOException e = controller.getFailed(); | ||
| future.completeExceptionally(e); | ||
| TraceUtil.setError(span, e); | ||
| span.end(); | ||
| return; | ||
| } | ||
| future.complete(new OpenScannerResponse( | ||
| loc, isRegionServerRemote, stub, controller, resp)); | ||
| } | ||
| }); | ||
| } catch (IOException e) { | ||
| // span is closed by listener attached to the Future in `openScanner()` | ||
| future.completeExceptionally(e); | ||
ndimiduk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| return future; | ||
| } | ||
| CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); | ||
| try { | ||
| ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan, | ||
| scan.getCaching(), false); | ||
| stub.scan(controller, request, resp -> { | ||
| if (controller.failed()) { | ||
| future.completeExceptionally(controller.getFailed()); | ||
| return; | ||
| } | ||
| future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); | ||
| }); | ||
| } catch (IOException e) { | ||
| future.completeExceptionally(e); | ||
| } | ||
| return future; | ||
| } | ||
|
|
||
| private void startScan(OpenScannerResponse resp) { | ||
|
|
@@ -173,26 +201,40 @@ private void startScan(OpenScannerResponse resp) { | |
| .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) | ||
| .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), | ||
| (hasMore, error) -> { | ||
| if (error != null) { | ||
| consumer.onError(error); | ||
| return; | ||
| } | ||
| if (hasMore) { | ||
| openScanner(); | ||
| } else { | ||
| consumer.onComplete(); | ||
| try (Scope ignored = span.makeCurrent()) { | ||
| if (error != null) { | ||
| try { | ||
| consumer.onError(error); | ||
| return; | ||
| } finally { | ||
| TraceUtil.setError(span, error); | ||
| span.end(); | ||
| } | ||
| } | ||
| if (hasMore) { | ||
| openScanner(); | ||
| } else { | ||
| try { | ||
| consumer.onComplete(); | ||
| } finally { | ||
| span.setStatus(StatusCode.OK); | ||
| span.end(); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) { | ||
| return conn.callerFactory.<OpenScannerResponse> single().table(tableName) | ||
| .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) | ||
| .priority(scan.getPriority()) | ||
| .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) | ||
| .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) | ||
| .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) | ||
| .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); | ||
| try (Scope ignored = span.makeCurrent()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want to also maintain your volatile usage discipline here and copy 'span' to a 'localSpan' first, as you have done at other call sites?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't do so here because the member field is accessed only once in the method lifetime, while the other call sites made multiple references to the field. I can use the same pattern here, sure, it won't hurt anything. |
||
| return conn.callerFactory.<OpenScannerResponse> single().table(tableName) | ||
| .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) | ||
| .priority(scan.getPriority()) | ||
| .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) | ||
| .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) | ||
| .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) | ||
| .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); | ||
| } | ||
| } | ||
|
|
||
| private long getPrimaryTimeoutNs() { | ||
|
|
@@ -206,15 +248,24 @@ private void openScanner() { | |
| addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), | ||
| getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, | ||
| conn.getConnectionMetrics()), (resp, error) -> { | ||
| if (error != null) { | ||
| consumer.onError(error); | ||
| return; | ||
| try (Scope ignored = span.makeCurrent()) { | ||
| if (error != null) { | ||
| try { | ||
| consumer.onError(error); | ||
| return; | ||
| } finally { | ||
| TraceUtil.setError(span, error); | ||
| span.end(); | ||
| } | ||
| } | ||
| startScan(resp); | ||
| } | ||
| startScan(resp); | ||
| }); | ||
| } | ||
|
|
||
| public void start() { | ||
| openScanner(); | ||
| try (Scope ignored = span.makeCurrent()) { | ||
| openScanner(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,9 @@ | |
| package org.apache.hadoop.hbase.client; | ||
|
|
||
| import static java.util.stream.Collectors.toList; | ||
| import io.opentelemetry.api.trace.Span; | ||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.Scope; | ||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
@@ -231,22 +233,29 @@ public ResultScanner getScanner(Scan scan) { | |
| } | ||
|
|
||
| private void scan0(Scan scan, ScanResultConsumer consumer) { | ||
| try (ResultScanner scanner = getScanner(scan)) { | ||
| consumer.onScanMetricsCreated(scanner.getScanMetrics()); | ||
| for (Result result; (result = scanner.next()) != null;) { | ||
| if (!consumer.onNext(result)) { | ||
| break; | ||
| Span span = null; | ||
| try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { | ||
| span = scanner.getSpan(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be null? I guess not as long as someone is aware of getter/setter for span in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In an earlier version of this patch, I hade null-protection sprinkled throughout this method. IntelliJ static analysis annotated it all as unnecessary. If you're also wondering... I'll trust two devs over the tool. Let me bring it back.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't want to throw because a disruption in tracing should not hinder application execution.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I'm simplifying this according to the hints provided by the IDE. I cannot make the field |
||
| try (Scope ignored = span.makeCurrent()) { | ||
| consumer.onScanMetricsCreated(scanner.getScanMetrics()); | ||
| for (Result result; (result = scanner.next()) != null; ) { | ||
| if (!consumer.onNext(result)) { | ||
| break; | ||
| } | ||
| } | ||
| consumer.onComplete(); | ||
| } | ||
| consumer.onComplete(); | ||
| } catch (IOException e) { | ||
| consumer.onError(e); | ||
| try (Scope ignored = span.makeCurrent()) { | ||
ndimiduk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| consumer.onError(e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void scan(Scan scan, ScanResultConsumer consumer) { | ||
| pool.execute(() -> scan0(scan, consumer)); | ||
| final Context context = Context.current(); | ||
| pool.execute(context.wrap(() -> scan0(scan, consumer))); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.