Skip to content

Commit c1cad7f

Browse files
committed
HBASE-27360 The trace related assertions are flaky for async client tests (#4767)
Signed-off-by: GeorryHuang <huangzhuoyue@apache.org> (cherry picked from commit f3f88ff)
1 parent 5454c80 commit c1cad7f

File tree

6 files changed

+89
-112
lines changed

6 files changed

+89
-112
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ private void scan0(Scan scan, ScanResultConsumer consumer) {
236236
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
237237
span = scanner.getSpan();
238238
try (Scope ignored = span.makeCurrent()) {
239-
consumer.onScanMetricsCreated(scanner.getScanMetrics());
239+
if (scan.isScanMetricsEnabled()) {
240+
consumer.onScanMetricsCreated(scanner.getScanMetrics());
241+
}
240242
for (Result result; (result = scanner.next()) != null;) {
241243
if (!consumer.onNext(result)) {
242244
break;

hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737
import java.io.UncheckedIOException;
3838
import java.util.Arrays;
3939
import java.util.List;
40+
import java.util.Objects;
4041
import java.util.concurrent.ExecutionException;
4142
import java.util.concurrent.ForkJoinPool;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.function.Supplier;
4445
import java.util.stream.Collectors;
4546
import java.util.stream.IntStream;
47+
import java.util.stream.Stream;
4648
import org.apache.hadoop.conf.Configuration;
4749
import org.apache.hadoop.hbase.ConnectionRule;
4850
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -70,18 +72,18 @@
7072

7173
public abstract class AbstractTestAsyncTableScan {
7274

73-
protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
74-
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
75+
protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
76+
protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder()
7577
.setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build();
7678

77-
protected static final ConnectionRule connectionRule =
78-
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
79+
protected static final ConnectionRule CONN_RULE =
80+
ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);
7981

8082
private static final class Setup extends ExternalResource {
8183
@Override
8284
protected void before() throws Throwable {
83-
final HBaseTestingUtility testingUtil = miniClusterRule.getTestingUtility();
84-
final AsyncConnection conn = connectionRule.getAsyncConnection();
85+
final HBaseTestingUtility testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
86+
final AsyncConnection conn = CONN_RULE.getAsyncConnection();
8587

8688
byte[][] splitKeys = new byte[8][];
8789
for (int i = 111; i < 999; i += 111) {
@@ -99,11 +101,11 @@ protected void before() throws Throwable {
99101
}
100102

101103
@ClassRule
102-
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
103-
.around(miniClusterRule).around(connectionRule).around(new Setup());
104+
public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
105+
.around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());
104106

105107
@Rule
106-
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
108+
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);
107109

108110
@Rule
109111
public final TestName testName = new TestName();
@@ -136,11 +138,11 @@ private static Scan createBatchSmallResultSizeScan() {
136138
}
137139

138140
private static AsyncTable<?> getRawTable() {
139-
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
141+
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
140142
}
141143

142144
private static AsyncTable<?> getTable() {
143-
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
145+
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
144146
}
145147

146148
private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
@@ -198,16 +200,20 @@ protected final List<Result> convertFromBatchResult(List<Result> results) {
198200
}
199201

200202
protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
201-
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
203+
final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
202204
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
203-
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
205+
"Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
206+
}
207+
208+
protected static Stream<SpanData> spanStream() {
209+
return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
204210
}
205211

206212
@Test
207213
public void testScanAll() throws Exception {
208214
List<Result> results = doScan(createScan(), -1);
209215
// make sure all scanners are closed at RS side
210-
miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
216+
MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
211217
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
212218
rs -> assertEquals(
213219
"The scanner count of " + rs.getServerName() + " is "

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@
2424
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
2525
import static org.hamcrest.MatcherAssert.assertThat;
2626
import static org.hamcrest.Matchers.allOf;
27-
import static org.hamcrest.Matchers.hasItem;
2827
import static org.hamcrest.Matchers.startsWith;
2928

3029
import io.opentelemetry.api.trace.StatusCode;
3130
import io.opentelemetry.sdk.trace.data.SpanData;
3231
import java.util.List;
33-
import java.util.Objects;
3432
import java.util.concurrent.ForkJoinPool;
3533
import java.util.function.Supplier;
3634
import java.util.stream.Collectors;
@@ -76,7 +74,7 @@ protected Scan createScan() {
7674
@Override
7775
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
7876
AsyncTable<ScanResultConsumer> table =
79-
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
77+
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
8078
List<Result> results;
8179
if (closeAfter > 0) {
8280
// these tests batch settings with the sample data result in each result being
@@ -108,38 +106,31 @@ protected void assertTraceContinuity() {
108106
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
109107
waitForSpan(parentSpanMatcher);
110108

111-
final List<SpanData> spans =
112-
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
113109
if (logger.isDebugEnabled()) {
114-
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
110+
StringTraceRenderer stringTraceRenderer =
111+
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
115112
stringTraceRenderer.render(logger::debug);
116113
}
117114

118-
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
119-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
115+
final String parentSpanId =
116+
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
120117

121118
final Matcher<SpanData> scanOperationSpanMatcher =
122119
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
123120
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
124-
assertThat(spans, hasItem(scanOperationSpanMatcher));
125-
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
126-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
127-
128-
final Matcher<SpanData> onScanMetricsCreatedMatcher =
129-
hasName("TracedScanResultConsumer#onScanMetricsCreated");
130-
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
131-
spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
132-
allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
121+
waitForSpan(scanOperationSpanMatcher);
122+
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
123+
.map(SpanData::getSpanId).findAny().get();
133124

134125
final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
135-
assertThat(spans, hasItem(onNextMatcher));
136-
spans.stream().filter(onNextMatcher::matches)
126+
waitForSpan(onNextMatcher);
127+
spanStream().filter(onNextMatcher::matches)
137128
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
138129
hasStatusWithCode(StatusCode.OK), hasEnded())));
139130

140131
final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
141-
assertThat(spans, hasItem(onCompleteMatcher));
142-
spans.stream().filter(onCompleteMatcher::matches)
132+
waitForSpan(onCompleteMatcher);
133+
spanStream().filter(onCompleteMatcher::matches)
143134
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
144135
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
145136
}
@@ -151,27 +142,26 @@ protected void assertTraceContinuity() {
151142
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
152143
waitForSpan(parentSpanMatcher);
153144

154-
final List<SpanData> spans =
155-
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
156145
if (logger.isDebugEnabled()) {
157-
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
146+
StringTraceRenderer stringTraceRenderer =
147+
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
158148
stringTraceRenderer.render(logger::debug);
159149
}
160150

161-
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
162-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
151+
final String parentSpanId =
152+
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
163153

164154
final Matcher<SpanData> scanOperationSpanMatcher =
165155
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
166156
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
167157
hasException(exceptionMatcher), hasEnded());
168-
assertThat(spans, hasItem(scanOperationSpanMatcher));
169-
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
170-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
158+
waitForSpan(scanOperationSpanMatcher);
159+
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
160+
.map(SpanData::getSpanId).findAny().get();
171161

172162
final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
173-
assertThat(spans, hasItem(onErrorMatcher));
174-
spans.stream().filter(onErrorMatcher::matches)
163+
waitForSpan(onErrorMatcher);
164+
spanStream().filter(onErrorMatcher::matches)
175165
.forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
176166
hasStatusWithCode(StatusCode.OK), hasEnded())));
177167
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@
2222
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
2323
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
2424
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
25-
import static org.hamcrest.MatcherAssert.assertThat;
2625
import static org.hamcrest.Matchers.allOf;
27-
import static org.hamcrest.Matchers.hasItem;
2826
import static org.hamcrest.Matchers.startsWith;
2927

3028
import io.opentelemetry.api.trace.StatusCode;
3129
import io.opentelemetry.sdk.trace.data.SpanData;
3230
import java.util.List;
33-
import java.util.Objects;
3431
import java.util.function.Supplier;
3532
import java.util.stream.Collectors;
3633
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -99,20 +96,19 @@ protected void assertTraceContinuity() {
9996
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
10097
waitForSpan(parentSpanMatcher);
10198

102-
final List<SpanData> spans =
103-
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
10499
if (logger.isDebugEnabled()) {
105-
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
100+
StringTraceRenderer stringTraceRenderer =
101+
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
106102
stringTraceRenderer.render(logger::debug);
107103
}
108104

109-
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
110-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
105+
final String parentSpanId =
106+
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
111107

112108
final Matcher<SpanData> scanOperationSpanMatcher =
113109
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
114110
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
115-
assertThat(spans, hasItem(scanOperationSpanMatcher));
111+
waitForSpan(scanOperationSpanMatcher);
116112
}
117113

118114
@Override
@@ -122,20 +118,19 @@ protected void assertTraceContinuity() {
122118
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
123119
waitForSpan(parentSpanMatcher);
124120

125-
final List<SpanData> spans =
126-
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
127121
if (logger.isDebugEnabled()) {
128-
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
122+
StringTraceRenderer stringTraceRenderer =
123+
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
129124
stringTraceRenderer.render(logger::debug);
130125
}
131126

132-
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
133-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
127+
final String parentSpanId =
128+
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
134129

135130
final Matcher<SpanData> scanOperationSpanMatcher =
136131
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
137132
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
138133
hasException(exceptionMatcher), hasEnded());
139-
assertThat(spans, hasItem(scanOperationSpanMatcher));
134+
waitForSpan(scanOperationSpanMatcher);
140135
}
141136
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@
2222
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
2323
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
2424
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
25-
import static org.hamcrest.MatcherAssert.assertThat;
2625
import static org.hamcrest.Matchers.allOf;
27-
import static org.hamcrest.Matchers.hasItem;
2826
import static org.hamcrest.Matchers.startsWith;
2927

3028
import io.opentelemetry.api.trace.StatusCode;
3129
import io.opentelemetry.sdk.trace.data.SpanData;
3230
import java.util.ArrayList;
3331
import java.util.List;
34-
import java.util.Objects;
3532
import java.util.concurrent.ForkJoinPool;
3633
import java.util.function.Supplier;
3734
import java.util.stream.Collectors;
@@ -83,7 +80,7 @@ protected Scan createScan() {
8380
@Override
8481
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
8582
AsyncTable<?> table =
86-
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
83+
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
8784
List<Result> results = new ArrayList<>();
8885
// these tests batch settings with the sample data result in each result being
8986
// split in two. so we must allow twice the expected results in order to reach
@@ -112,19 +109,17 @@ protected void assertTraceContinuity() {
112109
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
113110
waitForSpan(parentSpanMatcher);
114111

115-
final List<SpanData> spans =
116-
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
117112
if (logger.isDebugEnabled()) {
118-
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
113+
StringTraceRenderer stringTraceRenderer =
114+
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
119115
stringTraceRenderer.render(logger::debug);
120116
}
121117

122-
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
123-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
118+
final String parentSpanId =
119+
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
124120

125-
assertThat(spans,
126-
hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
127-
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
121+
waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
122+
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()));
128123
}
129124

130125
@Override
@@ -134,20 +129,19 @@ protected void assertTraceContinuity() {
134129
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
135130
waitForSpan(parentSpanMatcher);
136131

137-
final List<SpanData> spans =
138-
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
139132
if (logger.isDebugEnabled()) {
140-
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
133+
StringTraceRenderer stringTraceRenderer =
134+
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
141135
stringTraceRenderer.render(logger::debug);
142136
}
143137

144-
final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
145-
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
138+
final String parentSpanId =
139+
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
146140

147141
final Matcher<SpanData> scanOperationSpanMatcher =
148142
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
149143
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
150144
hasException(exceptionMatcher), hasEnded());
151-
assertThat(spans, hasItem(scanOperationSpanMatcher));
145+
waitForSpan(scanOperationSpanMatcher);
152146
}
153147
}

0 commit comments

Comments
 (0)