Skip to content

Commit f36e153

Browse files
committed
HBASE-25778 The tracinig implementation for AsyncConnectionImpl.getHbck is incorrect (#3165)
Signed-off-by: meiyi <myimeiyi@gmail.com>
1 parent be4503d commit f36e153

File tree

3 files changed

+145
-18
lines changed

3 files changed

+145
-18
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
2929

3030
import io.opentelemetry.api.trace.Span;
31-
import io.opentelemetry.context.Scope;
3231
import java.io.IOException;
3332
import java.net.SocketAddress;
3433
import java.util.Optional;
@@ -40,6 +39,7 @@
4039
import java.util.concurrent.TimeUnit;
4140
import java.util.concurrent.atomic.AtomicBoolean;
4241
import java.util.concurrent.atomic.AtomicReference;
42+
import java.util.function.Supplier;
4343
import org.apache.commons.io.IOUtils;
4444
import org.apache.hadoop.conf.Configuration;
4545
import org.apache.hadoop.hbase.AuthUtil;
@@ -408,6 +408,15 @@ public Connection toConnection() {
408408
return c;
409409
}
410410

411+
private Hbck getHbckInternal(ServerName masterServer) {
412+
Span.current().setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
413+
// we will not create a new connection when creating a new protobuf stub, and for hbck there
414+
// will be no performance consideration, so for simplification we will create a new stub every
415+
// time instead of caching the stub here.
416+
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
417+
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
418+
}
419+
411420
@Override
412421
public CompletableFuture<Hbck> getHbck() {
413422
return TraceUtil.tracedFuture(() -> {
@@ -416,30 +425,22 @@ public CompletableFuture<Hbck> getHbck() {
416425
if (error != null) {
417426
future.completeExceptionally(error);
418427
} else {
419-
try {
420-
future.complete(getHbck(sn));
421-
} catch (IOException e) {
422-
future.completeExceptionally(e);
423-
}
428+
future.complete(getHbckInternal(sn));
424429
}
425430
});
426431
return future;
427432
}, "AsyncConnection.getHbck");
428433
}
429434

430435
@Override
431-
public Hbck getHbck(ServerName masterServer) throws IOException {
432-
Span span = TraceUtil.createSpan("AsyncConnection.getHbck")
433-
.setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
434-
try (Scope scope = span.makeCurrent()) {
435-
// we will not create a new connection when creating a new protobuf stub, and for hbck there
436-
// will be no performance consideration, so for simplification we will create a new stub every
437-
// time instead of caching the stub here.
438-
return new HBaseHbck(
439-
MasterProtos.HbckService
440-
.newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
441-
rpcControllerFactory);
442-
}
436+
public Hbck getHbck(ServerName masterServer) {
437+
return TraceUtil.trace(new Supplier<Hbck>() {
438+
439+
@Override
440+
public Hbck get() {
441+
return getHbckInternal(masterServer);
442+
}
443+
}, "AsyncConnection.getHbck");
443444
}
444445

445446
Optional<MetricsConnection> getConnectionMetrics() {
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import io.opentelemetry.api.trace.SpanKind;
23+
import io.opentelemetry.api.trace.StatusCode;
24+
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
25+
import io.opentelemetry.sdk.trace.data.SpanData;
26+
import java.io.IOException;
27+
import java.util.concurrent.CompletableFuture;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.hbase.HBaseClassTestRule;
30+
import org.apache.hadoop.hbase.HBaseConfiguration;
31+
import org.apache.hadoop.hbase.ServerName;
32+
import org.apache.hadoop.hbase.Waiter;
33+
import org.apache.hadoop.hbase.security.UserProvider;
34+
import org.apache.hadoop.hbase.testclassification.ClientTests;
35+
import org.apache.hadoop.hbase.testclassification.MediumTests;
36+
import org.apache.hadoop.hbase.trace.TraceUtil;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.ClassRule;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
import org.junit.experimental.categories.Category;
43+
44+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
45+
46+
@Category({ ClientTests.class, MediumTests.class })
47+
public class TestAsyncConnectionTracing {
48+
49+
@ClassRule
50+
public static final HBaseClassTestRule CLASS_RULE =
51+
HBaseClassTestRule.forClass(TestAsyncConnectionTracing.class);
52+
53+
private static Configuration CONF = HBaseConfiguration.create();
54+
55+
private ServerName masterServer =
56+
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
57+
58+
private AsyncConnection conn;
59+
60+
@Rule
61+
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
62+
63+
@Before
64+
public void setUp() throws IOException {
65+
ConnectionRegistry registry = new DoNothingConnectionRegistry(CONF) {
66+
67+
@Override
68+
public CompletableFuture<ServerName> getActiveMaster() {
69+
return CompletableFuture.completedFuture(masterServer);
70+
}
71+
};
72+
conn = new AsyncConnectionImpl(CONF, registry, "test", null,
73+
UserProvider.instantiate(CONF).getCurrent());
74+
}
75+
76+
@After
77+
public void tearDown() throws IOException {
78+
Closeables.close(conn, true);
79+
}
80+
81+
private void assertTrace(String methodName, ServerName serverName) {
82+
Waiter.waitFor(CONF, 1000,
83+
() -> traceRule.getSpans().stream()
84+
.anyMatch(span -> span.getName().equals("AsyncConnection." + methodName) &&
85+
span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
86+
SpanData data = traceRule.getSpans().stream()
87+
.filter(s -> s.getName().equals("AsyncConnection." + methodName)).findFirst().get();
88+
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
89+
if (serverName != null) {
90+
assertEquals(serverName.getServerName(), data.getAttributes().get(TraceUtil.SERVER_NAME_KEY));
91+
}
92+
}
93+
94+
@Test
95+
public void testHbck() {
96+
conn.getHbck().join();
97+
assertTrace("getHbck", masterServer);
98+
}
99+
100+
@Test
101+
public void testHbckWithServerName() throws IOException {
102+
ServerName serverName = ServerName.valueOf("localhost", 23456, System.currentTimeMillis());
103+
conn.getHbck(serverName);
104+
assertTrace("getHbck", serverName);
105+
}
106+
107+
@Test
108+
public void testClose() throws IOException {
109+
conn.close();
110+
assertTrace("close", null);
111+
}
112+
}

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,20 @@ public static void trace(Runnable action, Supplier<Span> creator) {
189189
}
190190
}
191191

192+
public static <T> T trace(Supplier<T> action, String spanName) {
193+
Span span = createSpan(spanName);
194+
try (Scope scope = span.makeCurrent()) {
195+
T ret = action.get();
196+
span.setStatus(StatusCode.OK);
197+
return ret;
198+
} catch (Throwable e) {
199+
setError(span, e);
200+
throw e;
201+
} finally {
202+
span.end();
203+
}
204+
}
205+
192206
@FunctionalInterface
193207
public interface IOExceptionCallable<V> {
194208
V call() throws IOException;

0 commit comments

Comments
 (0)