Skip to content

Commit 41ba62c

Browse files
Apache9Jenkins
authored andcommitted
HBASE-23181 Blocked WAL archive: "LogRoller: Failed to schedule flush of XXXX, because it is not online on us" (apache#753)
Signed-off-by: Lijin Bin <binlijin@apache.org> Signed-off-by: stack <stack@apache.org> (cherry picked from commit 4771fe5) Change-Id: I1dfba87698366b6b0643222caa83e84218980999
1 parent 9c020f2 commit 41ba62c

33 files changed

+398
-221
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) {
4848
return new ImmutableByteArray(b);
4949
}
5050

51-
public String toStringUtf8() {
52-
return Bytes.toString(b);
51+
public String toString() {
52+
return Bytes.toStringBinary(b);
5353
}
5454
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ public void testPartialRead() throws Exception {
134134
long ts = System.currentTimeMillis();
135135
WALEdit edit = new WALEdit();
136136
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
137-
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
137+
log.appendData(info, getWalKeyImpl(ts, scopes), edit);
138138
edit = new WALEdit();
139139
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
140-
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
140+
log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
141141
log.sync();
142142
LOG.info("Before 1st WAL roll " + log.toString());
143143
log.rollWriter();
@@ -148,10 +148,10 @@ public void testPartialRead() throws Exception {
148148

149149
edit = new WALEdit();
150150
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
151-
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
151+
log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
152152
edit = new WALEdit();
153153
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
154-
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
154+
log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
155155
log.sync();
156156
log.shutdown();
157157
walfactory.shutdown();
@@ -192,17 +192,16 @@ public void testWALRecordReader() throws Exception {
192192
WALEdit edit = new WALEdit();
193193
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
194194
System.currentTimeMillis(), value));
195-
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
195+
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
196196
log.sync(txid);
197197

198198
Thread.sleep(1); // make sure 2nd log gets a later timestamp
199199
long secondTs = System.currentTimeMillis();
200200
log.rollWriter();
201201

202202
edit = new WALEdit();
203-
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
204-
System.currentTimeMillis(), value));
205-
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
203+
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
204+
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
206205
log.sync(txid);
207206
log.shutdown();
208207
walfactory.shutdown();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7936,7 +7936,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
79367936
}
79377937
WriteEntry writeEntry = null;
79387938
try {
7939-
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
7939+
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
79407940
// Call sync on our edit.
79417941
if (txid != 0) {
79427942
sync(txid, durability);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,7 @@ protected void atHeadOfRingBufferEventHandlerAppend() {
907907
// Noop
908908
}
909909

910-
protected final boolean append(W writer, FSWALEntry entry) throws IOException {
910+
protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
911911
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
912912
atHeadOfRingBufferEventHandlerAppend();
913913
long start = EnvironmentEdgeManager.currentTime();
@@ -931,8 +931,13 @@ protected final boolean append(W writer, FSWALEntry entry) throws IOException {
931931
doAppend(writer, entry);
932932
assert highestUnsyncedTxid < entry.getTxid();
933933
highestUnsyncedTxid = entry.getTxid();
934-
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
935-
entry.isInMemStore());
934+
if (entry.isCloseRegion()) {
935+
// let's clean all the records of this region
936+
sequenceIdAccounting.onRegionClose(encodedRegionName);
937+
} else {
938+
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
939+
entry.isInMemStore());
940+
}
936941
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
937942
// Update metrics.
938943
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
@@ -968,11 +973,11 @@ protected final void postSync(final long timeInNanos, final int handlerSyncs) {
968973
}
969974

970975
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
971-
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
972-
throws IOException {
976+
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
977+
throws IOException {
973978
if (this.closed) {
974979
throw new IOException(
975-
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
980+
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
976981
}
977982
MutableLong txidHolder = new MutableLong();
978983
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
@@ -982,7 +987,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
982987
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
983988
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
984989
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
985-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
990+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
986991
entry.stampRegionSequenceId(we);
987992
ringBuffer.get(txid).load(entry);
988993
} finally {
@@ -1018,7 +1023,24 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
10181023
}
10191024
}
10201025

1026+
@Override
1027+
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1028+
return append(info, key, edits, true, false);
1029+
}
1030+
1031+
@Override
1032+
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
1033+
throws IOException {
1034+
return append(info, key, edits, false, closeRegion);
1035+
}
1036+
10211037
/**
1038+
* Append a set of edits to the WAL.
1039+
* <p/>
1040+
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1041+
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
1042+
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1043+
* <p/>
10221044
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
10231045
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
10241046
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
@@ -1029,10 +1051,21 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
10291051
* passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
10301052
* immediately available on return from this method. It WILL be available subsequent to a sync of
10311053
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1054+
* @param info the regioninfo associated with append
1055+
* @param key Modified by this call; we add to it this edits region edit/sequence id.
1056+
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1057+
* sequence id that is after all currently appended edits.
1058+
* @param inMemstore Always true except for case where we are writing a region event marker, for
1059+
* example, a compaction completion record into the WAL; in this case the entry is just
1060+
* so we can finish an unfinished compaction -- it is not an edit for memstore.
1061+
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
1062+
* region on this region server. The WAL implementation should remove all the related
1063+
* stuff, for example, the sequence id accounting.
1064+
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1065+
* in it.
10321066
*/
1033-
@Override
1034-
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1035-
throws IOException;
1067+
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
1068+
boolean closeRegion) throws IOException;
10361069

10371070
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
10381071

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ private void appendAndSync() {
428428
FSWALEntry entry = iter.next();
429429
boolean appended;
430430
try {
431-
appended = append(writer, entry);
431+
appended = appendEntry(writer, entry);
432432
} catch (IOException e) {
433433
throw new AssertionError("should not happen", e);
434434
}
@@ -558,10 +558,10 @@ private boolean shouldScheduleConsumer() {
558558
}
559559

560560
@Override
561-
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
562-
throws IOException {
563-
long txid =
564-
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
561+
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
562+
boolean closeRegion) throws IOException {
563+
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
564+
waitingConsumePayloads);
565565
if (shouldScheduleConsumer()) {
566566
consumeExecutor.execute(consumer);
567567
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,12 +429,10 @@ protected void doShutdown() throws IOException {
429429
}
430430
}
431431

432-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
433-
justification = "Will never be null")
434432
@Override
435-
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
436-
final boolean inMemstore) throws IOException {
437-
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
433+
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
434+
final boolean inMemstore, boolean closeRegion) throws IOException {
435+
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
438436
disruptor.getRingBuffer());
439437
}
440438

@@ -1078,7 +1076,7 @@ private void attainSafePoint(final long currentSequence) {
10781076
*/
10791077
void append(final FSWALEntry entry) throws Exception {
10801078
try {
1081-
FSHLog.this.append(writer, entry);
1079+
FSHLog.this.appendEntry(writer, entry);
10821080
} catch (Exception e) {
10831081
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
10841082
+ ", requesting roll of WAL";

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
5151
// they are only in memory and held here while passing over the ring buffer.
5252
private final transient long txid;
5353
private final transient boolean inMemstore;
54+
private final transient boolean closeRegion;
5455
private final transient RegionInfo regionInfo;
5556
private final transient Set<byte[]> familyNames;
5657
private final transient ServerCall<?> rpcCall;
5758

5859
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
59-
final boolean inMemstore, ServerCall<?> rpcCall) {
60+
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
6061
super(key, edit);
6162
this.inMemstore = inMemstore;
63+
this.closeRegion = closeRegion;
6264
this.regionInfo = regionInfo;
6365
this.txid = txid;
6466
if (inMemstore) {
@@ -98,6 +100,10 @@ boolean isInMemStore() {
98100
return this.inMemstore;
99101
}
100102

103+
boolean isCloseRegion() {
104+
return closeRegion;
105+
}
106+
101107
RegionInfo getRegionInfo() {
102108
return this.regionInfo;
103109
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Set;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ConcurrentMap;
30+
import java.util.stream.Collectors;
3031
import org.apache.hadoop.hbase.HConstants;
3132
import org.apache.hadoop.hbase.util.Bytes;
3233
import org.apache.hadoop.hbase.util.ImmutableByteArray;
@@ -183,6 +184,30 @@ void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
183184
}
184185
}
185186

187+
/**
188+
* Clear all the records of the given region as it is going to be closed.
189+
* <p/>
190+
* We will call this once we get the region close marker. We need this because that, if we use
191+
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
192+
* that has not been processed yet, this will lead to orphan records in the
193+
* lowestUnflushedSequenceIds and then cause too many WAL files.
194+
* <p/>
195+
* See HBASE-23157 for more details.
196+
*/
197+
void onRegionClose(byte[] encodedRegionName) {
198+
synchronized (tieLock) {
199+
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
200+
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
201+
if (flushing != null) {
202+
LOG.warn("Still have flushing records when closing {}, {}",
203+
Bytes.toString(encodedRegionName),
204+
flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue())
205+
.collect(Collectors.joining(",", "{", "}")));
206+
}
207+
}
208+
this.highestSequenceIds.remove(encodedRegionName);
209+
}
210+
186211
/**
187212
* Update the store sequence id, e.g., upon executing in-memory compaction
188213
*/
@@ -363,7 +388,7 @@ void abortCacheFlush(final byte[] encodedRegionName) {
363388
Long currentId = tmpMap.get(e.getKey());
364389
if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
365390
String errorStr = Bytes.toString(encodedRegionName) + " family "
366-
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
391+
+ e.getKey().toString() + " acquired edits out of order current memstore seq="
367392
+ currentId + ", previous oldest unflushed id=" + e.getValue();
368393
LOG.error(errorStr);
369394
Runtime.getRuntime().halt(1);

0 commit comments

Comments
 (0)