Skip to content

Commit 5f15e07

Browse files
committed
HBASE-23181 Blocked WAL archive: "LogRoller: Failed to schedule flush of XXXX, because it is not online on us"
1 parent b14ba58 commit 5f15e07

34 files changed

+403
-230
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,8 @@ public static ImmutableByteArray wrap(byte[] b) {
5151
public String toStringUtf8() {
5252
return Bytes.toString(b);
5353
}
54+
55+
public String toStringBinary() {
56+
return Bytes.toStringBinary(b);
57+
}
5458
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ public void testPartialRead() throws Exception {
135135
long ts = System.currentTimeMillis();
136136
WALEdit edit = new WALEdit();
137137
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
138-
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
138+
log.appendData(info, getWalKeyImpl(ts, scopes), edit);
139139
edit = new WALEdit();
140140
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
141-
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
141+
log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
142142
log.sync();
143143
LOG.info("Before 1st WAL roll " + log.toString());
144144
log.rollWriter();
@@ -149,10 +149,10 @@ public void testPartialRead() throws Exception {
149149

150150
edit = new WALEdit();
151151
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
152-
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
152+
log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
153153
edit = new WALEdit();
154154
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
155-
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
155+
log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
156156
log.sync();
157157
log.shutdown();
158158
walfactory.shutdown();
@@ -193,17 +193,16 @@ public void testWALRecordReader() throws Exception {
193193
WALEdit edit = new WALEdit();
194194
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
195195
System.currentTimeMillis(), value));
196-
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
196+
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
197197
log.sync(txid);
198198

199199
Thread.sleep(1); // make sure 2nd log gets a later timestamp
200200
long secondTs = System.currentTimeMillis();
201201
log.rollWriter();
202202

203203
edit = new WALEdit();
204-
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
205-
System.currentTimeMillis(), value));
206-
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
204+
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
205+
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
207206
log.sync(txid);
208207
log.shutdown();
209208
walfactory.shutdown();
@@ -253,17 +252,15 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
253252
WAL log = walfactory.getWAL(info);
254253
byte [] value = Bytes.toBytes("value");
255254
WALEdit edit = new WALEdit();
256-
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
257-
System.currentTimeMillis(), value));
258-
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
255+
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value));
256+
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
259257
log.sync(txid);
260258

261259
Thread.sleep(10); // make sure 2nd edit gets a later timestamp
262260

263261
edit = new WALEdit();
264-
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
265-
System.currentTimeMillis(), value));
266-
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
262+
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
263+
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
267264
log.sync(txid);
268265
log.shutdown();
269266

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7995,7 +7995,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
79957995
}
79967996
WriteEntry writeEntry = null;
79977997
try {
7998-
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
7998+
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
79997999
// Call sync on our edit.
80008000
if (txid != 0) {
80018001
sync(txid, durability);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,7 @@ protected void atHeadOfRingBufferEventHandlerAppend() {
977977
// Noop
978978
}
979979

980-
protected final boolean append(W writer, FSWALEntry entry) throws IOException {
980+
protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
981981
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
982982
atHeadOfRingBufferEventHandlerAppend();
983983
long start = EnvironmentEdgeManager.currentTime();
@@ -1001,8 +1001,13 @@ protected final boolean append(W writer, FSWALEntry entry) throws IOException {
10011001
doAppend(writer, entry);
10021002
assert highestUnsyncedTxid < entry.getTxid();
10031003
highestUnsyncedTxid = entry.getTxid();
1004-
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1005-
entry.isInMemStore());
1004+
if (entry.isCloseRegion()) {
1005+
// let's clean all the records of this region
1006+
sequenceIdAccounting.onRegionClose(encodedRegionName);
1007+
} else {
1008+
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1009+
entry.isInMemStore());
1010+
}
10061011
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
10071012
// Update metrics.
10081013
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
@@ -1052,11 +1057,11 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
10521057
}
10531058

10541059
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1055-
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
1056-
throws IOException {
1060+
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
1061+
throws IOException {
10571062
if (this.closed) {
10581063
throw new IOException(
1059-
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1064+
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
10601065
}
10611066
MutableLong txidHolder = new MutableLong();
10621067
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
@@ -1066,7 +1071,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
10661071
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
10671072
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
10681073
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
1069-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1074+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
10701075
entry.stampRegionSequenceId(we);
10711076
ringBuffer.get(txid).load(entry);
10721077
} finally {
@@ -1102,7 +1107,24 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
11021107
}
11031108
}
11041109

1110+
@Override
1111+
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1112+
return append(info, key, edits, true, false);
1113+
}
1114+
1115+
@Override
1116+
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
1117+
throws IOException {
1118+
return append(info, key, edits, false, closeRegion);
1119+
}
1120+
11051121
/**
1122+
* Append a set of edits to the WAL.
1123+
* <p/>
1124+
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1125+
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
1126+
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1127+
* <p/>
11061128
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
11071129
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
11081130
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
@@ -1113,10 +1135,21 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
11131135
* passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
11141136
* immediately available on return from this method. It WILL be available subsequent to a sync of
11151137
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1138+
* @param info the regioninfo associated with append
1139+
* @param key Modified by this call; we add to it this edits region edit/sequence id.
1140+
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1141+
* sequence id that is after all currently appended edits.
1142+
* @param inMemstore Always true except for case where we are writing a region event marker, for
1143+
* example, a compaction completion record into the WAL; in this case the entry is just
1144+
* so we can finish an unfinished compaction -- it is not an edit for memstore.
1145+
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
1146+
* region on this region server. The WAL implementation should remove all the related
1147+
* stuff, for example, the sequence id accounting.
1148+
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1149+
* in it.
11161150
*/
1117-
@Override
1118-
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1119-
throws IOException;
1151+
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
1152+
boolean closeRegion) throws IOException;
11201153

11211154
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
11221155

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ private void appendAndSync() {
434434
FSWALEntry entry = iter.next();
435435
boolean appended;
436436
try {
437-
appended = append(writer, entry);
437+
appended = appendEntry(writer, entry);
438438
} catch (IOException e) {
439439
throw new AssertionError("should not happen", e);
440440
}
@@ -615,13 +615,13 @@ protected boolean markerEditOnly() {
615615
}
616616

617617
@Override
618-
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
619-
throws IOException {
618+
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
619+
boolean closeRegion) throws IOException {
620620
if (markerEditOnly() && !edits.isMetaEdit()) {
621621
throw new IOException("WAL is closing, only marker edit is allowed");
622622
}
623-
long txid =
624-
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
623+
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
624+
waitingConsumePayloads);
625625
if (shouldScheduleConsumer()) {
626626
consumeExecutor.execute(consumer);
627627
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -433,12 +433,10 @@ protected void doShutdown() throws IOException {
433433
}
434434
}
435435

436-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
437-
justification = "Will never be null")
438436
@Override
439-
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
440-
final boolean inMemstore) throws IOException {
441-
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
437+
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
438+
final boolean inMemstore, boolean closeRegion) throws IOException {
439+
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
442440
disruptor.getRingBuffer());
443441
}
444442

@@ -1100,7 +1098,7 @@ private void attainSafePoint(final long currentSequence) {
11001098
*/
11011099
void append(final FSWALEntry entry) throws Exception {
11021100
try {
1103-
FSHLog.this.append(writer, entry);
1101+
FSHLog.this.appendEntry(writer, entry);
11041102
} catch (Exception e) {
11051103
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
11061104
+ ", requesting roll of WAL";

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
5151
// they are only in memory and held here while passing over the ring buffer.
5252
private final transient long txid;
5353
private final transient boolean inMemstore;
54+
private final transient boolean closeRegion;
5455
private final transient RegionInfo regionInfo;
5556
private final transient Set<byte[]> familyNames;
5657
private final transient ServerCall<?> rpcCall;
5758

5859
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
59-
final boolean inMemstore, ServerCall<?> rpcCall) {
60+
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
6061
super(key, edit);
6162
this.inMemstore = inMemstore;
63+
this.closeRegion = closeRegion;
6264
this.regionInfo = regionInfo;
6365
this.txid = txid;
6466
if (inMemstore) {
@@ -98,6 +100,10 @@ boolean isInMemStore() {
98100
return this.inMemstore;
99101
}
100102

103+
boolean isCloseRegion() {
104+
return closeRegion;
105+
}
106+
101107
RegionInfo getRegionInfo() {
102108
return this.regionInfo;
103109
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.ConcurrentMap;
31+
import java.util.stream.Collectors;
3132
import org.apache.hadoop.hbase.HConstants;
3233
import org.apache.hadoop.hbase.util.Bytes;
3334
import org.apache.hadoop.hbase.util.ImmutableByteArray;
@@ -184,6 +185,30 @@ void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
184185
}
185186
}
186187

188+
/**
189+
* Clear all the records of the given region as it is going to be closed.
190+
* <p/>
191+
* We will call this once we get the region close marker. We need this because that, if we use
192+
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
193+
* that has not been processed yet, this will lead to orphan records in the
194+
* lowestUnflushedSequenceIds and then cause too many WAL files.
195+
* <p/>
196+
* See HBASE-23157 for more details.
197+
*/
198+
void onRegionClose(byte[] encodedRegionName) {
199+
synchronized (tieLock) {
200+
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
201+
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
202+
if (flushing != null) {
203+
LOG.warn("Still have flushing records when closing {}, {}",
204+
Bytes.toString(encodedRegionName),
205+
flushing.entrySet().stream().map(e -> e.getKey().toStringBinary() + "->" + e.getValue())
206+
.collect(Collectors.joining(",", "{", "}")));
207+
}
208+
}
209+
this.highestSequenceIds.remove(encodedRegionName);
210+
}
211+
187212
/**
188213
* Update the store sequence id, e.g., upon executing in-memory compaction
189214
*/

0 commit comments

Comments
 (0)