Skip to content

Commit ca09643

Browse files
authored
HBASE-26125 Backport HBASE-25401 "Add trace support for async call in rpc client" to branch-2 (#3543)
2/17 commits of HBASE-22120 Co-authored-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Peter Somogyi <psomogyi@apache.org>
1 parent da9bcb6 commit ca09643

File tree

14 files changed

+299
-174
lines changed

14 files changed

+299
-174
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
2222
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
2323

24+
import io.opentelemetry.api.trace.Span;
25+
import io.opentelemetry.api.trace.StatusCode;
26+
import io.opentelemetry.context.Scope;
2427
import java.io.IOException;
2528
import java.net.SocketAddress;
2629
import java.util.Collection;
@@ -38,6 +41,7 @@
3841
import org.apache.hadoop.hbase.net.Address;
3942
import org.apache.hadoop.hbase.security.User;
4043
import org.apache.hadoop.hbase.security.UserProvider;
44+
import org.apache.hadoop.hbase.trace.TraceUtil;
4145
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4246
import org.apache.hadoop.hbase.util.PoolMap;
4347
import org.apache.hadoop.hbase.util.Threads;
@@ -365,7 +369,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
365369
protected abstract T createConnection(ConnectionId remoteId) throws IOException;
366370

367371
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
368-
RpcCallback<Message> callback) {
372+
RpcCallback<Message> callback) {
369373
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
370374
if (metrics != null) {
371375
metrics.updateRpc(call.md, call.param, call.callStats);
@@ -388,44 +392,59 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
388392
}
389393
}
390394

391-
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
395+
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
392396
final Message param, Message returnType, final User ticket,
393397
final Address addr, final RpcCallback<Message> callback) {
394-
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
395-
cs.setStartTime(EnvironmentEdgeManager.currentTime());
396-
397-
if (param instanceof ClientProtos.MultiRequest) {
398-
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
399-
int numActions = 0;
400-
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
401-
numActions += regionAction.getActionCount();
402-
}
398+
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
399+
.startSpan();
400+
try (Scope scope = span.makeCurrent()) {
401+
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
402+
cs.setStartTime(EnvironmentEdgeManager.currentTime());
403+
404+
if (param instanceof ClientProtos.MultiRequest) {
405+
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
406+
int numActions = 0;
407+
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
408+
numActions += regionAction.getActionCount();
409+
}
403410

404-
cs.setNumActionsPerServer(numActions);
405-
}
411+
cs.setNumActionsPerServer(numActions);
412+
}
406413

407-
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
408-
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
414+
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
415+
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
409416
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
410417
@Override
411418
public void run(Call call) {
412-
counter.decrementAndGet();
413-
onCallFinished(call, hrc, addr, callback);
419+
try (Scope scope = call.span.makeCurrent()) {
420+
counter.decrementAndGet();
421+
onCallFinished(call, hrc, addr, callback);
422+
} finally {
423+
if (hrc.failed()) {
424+
span.setStatus(StatusCode.ERROR);
425+
span.recordException(hrc.getFailed());
426+
} else {
427+
span.setStatus(StatusCode.OK);
428+
}
429+
span.end();
430+
}
414431
}
415432
}, cs);
416-
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
417-
int count = counter.incrementAndGet();
418-
try {
419-
if (count > maxConcurrentCallsPerServer) {
420-
throw new ServerTooBusyException(addr, count);
433+
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
434+
int count = counter.incrementAndGet();
435+
try {
436+
if (count > maxConcurrentCallsPerServer) {
437+
throw new ServerTooBusyException(addr, count);
438+
}
439+
cs.setConcurrentCallsPerServer(count);
440+
T connection = getConnection(remoteId);
441+
connection.sendRequest(call, hrc);
442+
} catch (Exception e) {
443+
call.setException(toIOE(e));
444+
span.end();
421445
}
422-
cs.setConcurrentCallsPerServer(count);
423-
T connection = getConnection(remoteId);
424-
connection.sendRequest(call, hrc);
425-
} catch (Exception e) {
426-
call.setException(toIOE(e));
446+
return call;
427447
}
428-
return call;
429448
}
430449

431450
private static Address createAddr(ServerName sn) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
2525
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
2626

27-
import io.opentelemetry.api.trace.Span;
28-
import io.opentelemetry.context.Context;
2927
import io.opentelemetry.context.Scope;
3028
import java.io.BufferedInputStream;
3129
import java.io.BufferedOutputStream;
@@ -57,7 +55,6 @@
5755
import org.apache.hadoop.hbase.security.SaslUtil;
5856
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
5957
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
60-
import org.apache.hadoop.hbase.trace.TraceUtil;
6158
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
6259
import org.apache.hadoop.hbase.util.ExceptionUtil;
6360
import org.apache.hadoop.io.IOUtils;
@@ -192,8 +189,8 @@ public void run() {
192189
if (call.isDone()) {
193190
continue;
194191
}
195-
try {
196-
tracedWriteRequest(call);
192+
try (Scope scope = call.span.makeCurrent()) {
193+
writeRequest(call);
197194
} catch (IOException e) {
198195
// exception here means the call has not been added to the pendingCalls yet, so we need
199196
// to fail it by our own.
@@ -594,16 +591,6 @@ private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
594591
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
595592
}
596593

597-
private void tracedWriteRequest(Call call) throws IOException {
598-
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
599-
.setParent(Context.current().with(call.span)).startSpan();
600-
try (Scope scope = span.makeCurrent()) {
601-
writeRequest(call);
602-
} finally {
603-
span.end();
604-
}
605-
}
606-
607594
/**
608595
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
609596
* the Connection thread, but by other threads.
@@ -811,7 +798,9 @@ public void run(boolean cancelled) throws IOException {
811798
if (callSender != null) {
812799
callSender.sendCall(call);
813800
} else {
814-
tracedWriteRequest(call);
801+
// this is in the same thread with the caller so do not need to attach the trace context
802+
// again.
803+
writeRequest(call);
815804
}
816805
}
817806
});

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class Call {
6161
final Span span;
6262
Timeout timeoutTask;
6363

64-
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
64+
Call(int id, final Descriptors.MethodDescriptor md, Message param,
6565
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
6666
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
6767
this.param = param;

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.api.GlobalOpenTelemetry;
21+
import io.opentelemetry.context.Context;
2022
import java.io.IOException;
2123
import java.io.OutputStream;
2224
import java.lang.reflect.InvocationTargetException;
@@ -49,6 +51,7 @@
4951
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
5052
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
5153
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
54+
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
5255

5356
/**
5457
* Utility to help ipc'ing.
@@ -112,11 +115,10 @@ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
112115
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
113116
RequestHeader.Builder builder = RequestHeader.newBuilder();
114117
builder.setCallId(call.id);
115-
//TODO handle htrace API change, see HBASE-18895
116-
/*if (call.span != null) {
117-
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
118-
.setTraceId(call.span.getTracerId()));
119-
}*/
118+
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
119+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
120+
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
121+
builder.setTraceInfo(traceBuilder.build());
120122
builder.setMethodName(call.md.getName());
121123
builder.setRequestParam(call.param != null);
122124
if (cellBlockMeta != null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.context.Scope;
2021
import java.io.IOException;
2122
import java.util.HashMap;
2223
import java.util.Map;
@@ -114,9 +115,12 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p
114115

115116
@Override
116117
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
117-
throws Exception {
118+
throws Exception {
118119
if (msg instanceof Call) {
119-
writeRequest(ctx, (Call) msg, promise);
120+
Call call = (Call) msg;
121+
try (Scope scope = call.span.makeCurrent()) {
122+
writeRequest(ctx, call, promise);
123+
}
120124
} else {
121125
ctx.write(msg, promise);
122126
}

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.trace;
1919

20-
import io.opentelemetry.api.OpenTelemetry;
20+
import io.opentelemetry.api.GlobalOpenTelemetry;
2121
import io.opentelemetry.api.trace.Tracer;
2222
import org.apache.yetus.audience.InterfaceAudience;
2323

@@ -30,6 +30,6 @@ private TraceUtil() {
3030
}
3131

3232
public static Tracer getGlobalTracer() {
33-
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
33+
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
3434
}
3535
}

hbase-protocol-shaded/src/main/protobuf/Tracing.proto

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
2323
option java_generate_equals_and_hash = true;
2424
option optimize_for = SPEED;
2525

26-
//Used to pass through the information necessary to continue
27-
//a trace after an RPC is made. All we need is the traceid
28-
//(so we know the overarching trace this message is a part of), and
29-
//the id of the current span when this message was sent, so we know
30-
//what span caused the new span we will create when this message is received.
26+
// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
27+
// is a text-based approach that passes properties with http headers. Here we will also use this
28+
// approach so we just need a map to store the key value pair.
29+
3130
message RPCTInfo {
32-
optional int64 trace_id = 1;
33-
optional int64 parent_id = 2;
31+
optional int64 trace_id = 1 [deprecated = true];
32+
optional int64 parent_id = 2 [deprecated = true];
33+
map<string, string> headers = 3;
3434
}

hbase-server/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,16 @@
440440
<artifactId>hamcrest-core</artifactId>
441441
<scope>test</scope>
442442
</dependency>
443+
<dependency>
444+
<groupId>io.opentelemetry</groupId>
445+
<artifactId>opentelemetry-sdk</artifactId>
446+
<scope>test</scope>
447+
</dependency>
448+
<dependency>
449+
<groupId>io.opentelemetry</groupId>
450+
<artifactId>opentelemetry-sdk-testing</artifactId>
451+
<scope>test</scope>
452+
</dependency>
443453
<dependency>
444454
<groupId>org.hamcrest</groupId>
445455
<artifactId>hamcrest-library</artifactId>

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import io.opentelemetry.api.trace.Span;
21+
import io.opentelemetry.api.trace.StatusCode;
22+
import io.opentelemetry.context.Context;
2123
import io.opentelemetry.context.Scope;
2224
import java.net.InetSocketAddress;
2325
import java.nio.channels.ClosedChannelException;
@@ -73,15 +75,6 @@ public RpcCall getRpcCall() {
7375
return call;
7476
}
7577

76-
/**
77-
* Keep for backward compatibility.
78-
* @deprecated As of release 2.0, this will be removed in HBase 3.0
79-
*/
80-
@Deprecated
81-
public ServerCall<?> getCall() {
82-
return (ServerCall<?>) call;
83-
}
84-
8578
public void setStatus(MonitoredRPCHandler status) {
8679
this.status = status;
8780
}
@@ -130,7 +123,8 @@ public void run() {
130123
String serviceName = getServiceName();
131124
String methodName = getMethodName();
132125
String traceString = serviceName + "." + methodName;
133-
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
126+
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
127+
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
134128
try (Scope traceScope = span.makeCurrent()) {
135129
if (!this.rpcServer.isStarted()) {
136130
InetSocketAddress address = rpcServer.getListenerAddress();
@@ -141,8 +135,12 @@ public void run() {
141135
resultPair = this.rpcServer.call(call, this.status);
142136
} catch (TimeoutIOException e){
143137
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
138+
span.recordException(e);
139+
span.setStatus(StatusCode.ERROR);
144140
return;
145141
} catch (Throwable e) {
142+
span.recordException(e);
143+
span.setStatus(StatusCode.ERROR);
146144
if (e instanceof ServerNotRunningYetException) {
147145
// If ServerNotRunningYetException, don't spew stack trace.
148146
if (RpcServer.LOG.isTraceEnabled()) {
@@ -161,6 +159,7 @@ public void run() {
161159
RpcServer.CurCall.set(null);
162160
if (resultPair != null) {
163161
this.rpcServer.addCallSize(call.getSize() * -1);
162+
span.setStatus(StatusCode.OK);
164163
sucessful = true;
165164
}
166165
span.end();

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.api.trace.Span;
21+
import io.opentelemetry.api.trace.StatusCode;
2022
import java.io.IOException;
2123
import java.net.InetAddress;
2224
import java.nio.ByteBuffer;
@@ -102,6 +104,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
102104
// from WAL side on release
103105
private final AtomicInteger reference = new AtomicInteger(0x80000000);
104106

107+
private final Span span;
108+
105109
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
106110
justification = "Can't figure why this complaint is happening... see below")
107111
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
@@ -132,6 +136,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
132136
this.bbAllocator = byteBuffAllocator;
133137
this.cellBlockBuilder = cellBlockBuilder;
134138
this.reqCleanup = reqCleanup;
139+
this.span = Span.current();
135140
}
136141

137142
/**
@@ -150,6 +155,7 @@ public void done() {
150155
// If the call was run successfuly, we might have already returned the BB
151156
// back to pool. No worries..Then inputCellBlock will be null
152157
cleanup();
158+
span.end();
153159
}
154160

155161
@Override
@@ -226,6 +232,10 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab
226232
}
227233
if (t != null) {
228234
this.isError = true;
235+
span.recordException(t);
236+
span.setStatus(StatusCode.ERROR);
237+
} else {
238+
span.setStatus(StatusCode.OK);
229239
}
230240
BufferChain bc = null;
231241
try {
@@ -560,4 +570,8 @@ public synchronized BufferChain getResponse() {
560570
return response;
561571
}
562572
}
573+
574+
public Span getSpan() {
575+
return span;
576+
}
563577
}

0 commit comments

Comments
 (0)