Skip to content

Commit 2be2c63

Browse files
committed
HBASE-25484 Add trace support for WAL sync (#2892)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
1 parent 03e12bf commit 2be2c63

File tree

8 files changed

+109
-149
lines changed

8 files changed

+109
-149
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public final class TraceUtil {
6262
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
6363
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
6464

65+
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
66+
6567
private TraceUtil() {
6668
}
6769

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import com.lmax.disruptor.RingBuffer;
2828
import io.opentelemetry.api.trace.Span;
29-
import io.opentelemetry.context.Scope;
3029
import java.io.FileNotFoundException;
3130
import java.io.IOException;
3231
import java.io.InterruptedIOException;
@@ -571,6 +570,35 @@ public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IO
571570
return rollWriter(false);
572571
}
573572

573+
@Override
574+
public final void sync() throws IOException {
575+
sync(useHsync);
576+
}
577+
578+
@Override
579+
public final void sync(long txid) throws IOException {
580+
sync(txid, useHsync);
581+
}
582+
583+
@Override
584+
public final void sync(boolean forceSync) throws IOException {
585+
TraceUtil.trace(() -> {
586+
doSync(forceSync);
587+
return null;
588+
}, () -> createSpan("WAL.sync"));
589+
}
590+
591+
@Override
592+
public final void sync(long txid, boolean forceSync) throws IOException {
593+
TraceUtil.trace(() -> {
594+
doSync(txid, forceSync);
595+
return null;
596+
}, () -> createSpan("WAL.sync"));
597+
}
598+
599+
protected abstract void doSync(boolean forceSync) throws IOException;
600+
601+
protected abstract void doSync(long txid, boolean forceSync) throws IOException;
574602
/**
575603
* This is a convenience method that computes a new filename with a given file-number.
576604
* @param filenum to use
@@ -672,7 +700,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
672700
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
673701
}
674702
if (regions != null) {
675-
List<String> listForPrint = new ArrayList();
703+
List<String> listForPrint = new ArrayList<>();
676704
for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
677705
StringBuilder families = new StringBuilder();
678706
for (int i = 0; i < r.getValue().size(); i++) {
@@ -815,6 +843,10 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
815843
}
816844
}
817845

846+
private Span createSpan(String name) {
847+
return TraceUtil.createSpan(name).setAttribute(TraceUtil.WAL_IMPL, implClassName);
848+
}
849+
818850
/**
819851
* Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
820852
* <p/>
@@ -832,13 +864,10 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
832864
* @throws IOException if there is a problem flushing or closing the underlying FS
833865
*/
834866
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
835-
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHFile.replaceWriter").startSpan();
836-
try (Scope scope = span.makeCurrent()) {
867+
return TraceUtil.trace(() -> {
837868
doReplaceWriter(oldPath, newPath, nextWriter);
838869
return newPath;
839-
} finally {
840-
span.end();
841-
}
870+
}, () -> createSpan("WAL.replaceWriter"));
842871
}
843872

844873
protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
@@ -876,8 +905,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
876905
return ioe;
877906
}
878907

879-
@Override
880-
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
908+
private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
881909
rollWriterLock.lock();
882910
try {
883911
if (this.closed) {
@@ -888,8 +916,7 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
888916
return null;
889917
}
890918
Map<byte[], List<byte[]>> regionsToFlush = null;
891-
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.rollWriter").startSpan();
892-
try (Scope scope = span.makeCurrent()) {
919+
try {
893920
Path oldPath = getOldPath();
894921
Path newPath = getNewPath();
895922
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
@@ -914,17 +941,20 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
914941
// If the underlying FileSystem can't do what we ask, treat as IO failure so
915942
// we'll abort.
916943
throw new IOException(
917-
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
918-
exception);
919-
} finally {
920-
span.end();
944+
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
945+
exception);
921946
}
922947
return regionsToFlush;
923948
} finally {
924949
rollWriterLock.unlock();
925950
}
926951
}
927952

953+
@Override
954+
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
955+
return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
956+
}
957+
928958
// public only until class moves to o.a.h.h.wal
929959
/** @return the size of log files in use */
930960
public long getLogFileSize() {
@@ -1099,7 +1129,6 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
10991129
.append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
11001130
.append(" ms, current pipeline: ")
11011131
.append(Arrays.toString(getPipeline())).toString();
1102-
Span.current().addEvent(msg);
11031132
LOG.info(msg);
11041133
if (timeInNanos > this.rollOnSyncNs) {
11051134
// A single sync took too long.
@@ -1122,8 +1151,7 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
11221151
}
11231152

11241153
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1125-
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
1126-
throws IOException {
1154+
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
11271155
if (this.closed) {
11281156
throw new IOException(
11291157
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
@@ -1135,14 +1163,12 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
11351163
long txid = txidHolder.longValue();
11361164
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
11371165
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
1138-
Span span = TraceUtil.getGlobalTracer().spanBuilder(implClassName + ".append").startSpan();
1139-
try (Scope scope = span.makeCurrent()) {
1166+
try {
11401167
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
11411168
entry.stampRegionSequenceId(we);
11421169
ringBuffer.get(txid).load(entry);
11431170
} finally {
11441171
ringBuffer.publish(txid);
1145-
span.end();
11461172
}
11471173
return txid;
11481174
}
@@ -1176,13 +1202,14 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
11761202

11771203
@Override
11781204
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1179-
return append(info, key, edits, true);
1205+
return TraceUtil.trace(() -> append(info, key, edits, true),
1206+
() -> createSpan("WAL.appendData"));
11801207
}
11811208

11821209
@Override
1183-
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
1184-
throws IOException {
1185-
return append(info, key, edits, false);
1210+
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1211+
return TraceUtil.trace(() -> append(info, key, edits, false),
1212+
() -> createSpan("WAL.appendMarker"));
11861213
}
11871214

11881215
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 38 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import com.lmax.disruptor.RingBuffer;
2525
import com.lmax.disruptor.Sequence;
2626
import com.lmax.disruptor.Sequencer;
27-
import io.opentelemetry.api.trace.Span;
28-
import io.opentelemetry.context.Scope;
2927
import java.io.IOException;
3028
import java.lang.reflect.Field;
3129
import java.util.ArrayDeque;
@@ -53,7 +51,6 @@
5351
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
5452
import org.apache.hadoop.hbase.client.RegionInfo;
5553
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
56-
import org.apache.hadoop.hbase.trace.TraceUtil;
5754
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
5855
import org.apache.hadoop.hbase.wal.WALEdit;
5956
import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -345,7 +342,7 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
345342
break;
346343
}
347344
}
348-
postSync(System.nanoTime() - startTimeNs, finishSync(true));
345+
postSync(System.nanoTime() - startTimeNs, finishSync());
349346
if (trySetReadyForRolling()) {
350347
// we have just finished a roll, then do not need to check for log rolling, the writer will be
351348
// closed soon.
@@ -394,23 +391,14 @@ private void sync(AsyncWriter writer) {
394391
}, consumeExecutor);
395392
}
396393

397-
private void addTimeAnnotation(SyncFuture future, String annotation) {
398-
Span.current().addEvent(annotation);
399-
// TODO handle htrace API change, see HBASE-18895
400-
// future.setSpan(scope.getSpan());
401-
}
402-
403-
private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
394+
private int finishSyncLowerThanTxid(long txid) {
404395
int finished = 0;
405396
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
406397
SyncFuture sync = iter.next();
407398
if (sync.getTxid() <= txid) {
408399
sync.done(txid, null);
409400
iter.remove();
410401
finished++;
411-
if (addSyncTrace) {
412-
addTimeAnnotation(sync, "writer synced");
413-
}
414402
} else {
415403
break;
416404
}
@@ -419,7 +407,7 @@ private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
419407
}
420408

421409
// try advancing the highestSyncedTxid as much as possible
422-
private int finishSync(boolean addSyncTrace) {
410+
private int finishSync() {
423411
if (unackedAppends.isEmpty()) {
424412
// All outstanding appends have been acked.
425413
if (toWriteAppends.isEmpty()) {
@@ -428,9 +416,6 @@ private int finishSync(boolean addSyncTrace) {
428416
for (SyncFuture sync : syncFutures) {
429417
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
430418
sync.done(maxSyncTxid, null);
431-
if (addSyncTrace) {
432-
addTimeAnnotation(sync, "writer synced");
433-
}
434419
}
435420
highestSyncedTxid.set(maxSyncTxid);
436421
int finished = syncFutures.size();
@@ -444,23 +429,23 @@ private int finishSync(boolean addSyncTrace) {
444429
assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
445430
long doneTxid = lowestUnprocessedAppendTxid - 1;
446431
highestSyncedTxid.set(doneTxid);
447-
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
432+
return finishSyncLowerThanTxid(doneTxid);
448433
}
449434
} else {
450435
// There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
451436
// first unacked append minus 1.
452437
long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
453438
long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
454439
highestSyncedTxid.set(doneTxid);
455-
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
440+
return finishSyncLowerThanTxid(doneTxid);
456441
}
457442
}
458443

459444
private void appendAndSync() {
460445
final AsyncWriter writer = this.writer;
461446
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
462447
// finish some.
463-
finishSync(false);
448+
finishSync();
464449
long newHighestProcessedAppendTxid = -1L;
465450
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
466451
FSWALEntry entry = iter.next();
@@ -501,7 +486,7 @@ private void appendAndSync() {
501486
// stamped some region sequence id.
502487
if (unackedAppends.isEmpty()) {
503488
highestSyncedTxid.set(highestProcessedAppendTxid);
504-
finishSync(false);
489+
finishSync();
505490
trySetReadyForRolling();
506491
}
507492
return;
@@ -648,74 +633,54 @@ protected boolean markerEditOnly() {
648633

649634
@Override
650635
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
651-
throws IOException {
652-
if (markerEditOnly() && !edits.isMetaEdit()) {
653-
throw new IOException("WAL is closing, only marker edit is allowed");
654-
}
655-
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
656-
waitingConsumePayloads);
636+
throws IOException {
637+
if (markerEditOnly() && !edits.isMetaEdit()) {
638+
throw new IOException("WAL is closing, only marker edit is allowed");
639+
}
640+
long txid =
641+
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
657642
if (shouldScheduleConsumer()) {
658643
consumeExecutor.execute(consumer);
659644
}
660645
return txid;
661646
}
662647

663648
@Override
664-
public void sync() throws IOException {
665-
sync(useHsync);
666-
}
667-
668-
@Override
669-
public void sync(long txid) throws IOException {
670-
sync(txid, useHsync);
671-
}
672-
673-
@Override
674-
public void sync(boolean forceSync) throws IOException {
675-
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
676-
try (Scope scope = span.makeCurrent()) {
677-
long txid = waitingConsumePayloads.next();
678-
SyncFuture future;
679-
try {
680-
future = getSyncFuture(txid, forceSync);
681-
RingBufferTruck truck = waitingConsumePayloads.get(txid);
682-
truck.load(future);
683-
} finally {
684-
waitingConsumePayloads.publish(txid);
685-
}
686-
if (shouldScheduleConsumer()) {
687-
consumeExecutor.execute(consumer);
688-
}
689-
blockOnSync(future);
649+
protected void doSync(boolean forceSync) throws IOException {
650+
long txid = waitingConsumePayloads.next();
651+
SyncFuture future;
652+
try {
653+
future = getSyncFuture(txid, forceSync);
654+
RingBufferTruck truck = waitingConsumePayloads.get(txid);
655+
truck.load(future);
690656
} finally {
691-
span.end();
657+
waitingConsumePayloads.publish(txid);
692658
}
659+
if (shouldScheduleConsumer()) {
660+
consumeExecutor.execute(consumer);
661+
}
662+
blockOnSync(future);
693663
}
694664

695665
@Override
696-
public void sync(long txid, boolean forceSync) throws IOException {
666+
protected void doSync(long txid, boolean forceSync) throws IOException {
697667
if (highestSyncedTxid.get() >= txid) {
698668
return;
699669
}
700-
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
701-
try (Scope scope = span.makeCurrent()) {
702-
// here we do not use ring buffer sequence as txid
703-
long sequence = waitingConsumePayloads.next();
704-
SyncFuture future;
705-
try {
706-
future = getSyncFuture(txid, forceSync);
707-
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
708-
truck.load(future);
709-
} finally {
710-
waitingConsumePayloads.publish(sequence);
711-
}
712-
if (shouldScheduleConsumer()) {
713-
consumeExecutor.execute(consumer);
714-
}
715-
blockOnSync(future);
670+
// here we do not use ring buffer sequence as txid
671+
long sequence = waitingConsumePayloads.next();
672+
SyncFuture future;
673+
try {
674+
future = getSyncFuture(txid, forceSync);
675+
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
676+
truck.load(future);
716677
} finally {
717-
span.end();
678+
waitingConsumePayloads.publish(sequence);
679+
}
680+
if (shouldScheduleConsumer()) {
681+
consumeExecutor.execute(consumer);
718682
}
683+
blockOnSync(future);
719684
}
720685

721686
protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {

0 commit comments

Comments
 (0)