Skip to content

Commit b6596d3

Browse files
committed
HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x
1 parent 923ba77 commit b6596d3

File tree

5 files changed

+159
-37
lines changed

5 files changed

+159
-37
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,31 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
347347
}
348348
}
349349

350+
// find all the sync futures between these two txids to see if we need to issue a hsync, if no
351+
// sync futures then just use the default one.
352+
private boolean shouldUseHsync(long beginTxid, long endTxid) {
353+
SortedSet<SyncFuture> futures =
354+
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
355+
if (futures.isEmpty()) {
356+
return useHsync;
357+
}
358+
for (SyncFuture future : futures) {
359+
if (future.isForceSync()) {
360+
return true;
361+
}
362+
}
363+
return false;
364+
}
365+
350366
private void sync(AsyncWriter writer) {
351367
fileLengthAtLastSync = writer.getLength();
352368
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
369+
boolean shouldUseHsync =
370+
shouldUseHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
353371
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
354372
final long startTimeNs = System.nanoTime();
355373
final long epoch = (long) epochAndState >>> 2L;
356-
addListener(writer.sync(useHsync), (result, error) -> {
374+
addListener(writer.sync(shouldUseHsync), (result, error) -> {
357375
if (error != null) {
358376
syncFailed(epoch, error);
359377
} else {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -577,10 +577,9 @@ public void run() {
577577
//TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
578578
long start = System.nanoTime();
579579
Throwable lastException = null;
580-
boolean wasRollRequested = false;
581580
try {
582581
TraceUtil.addTimelineAnnotation("syncing writer");
583-
writer.sync(useHsync);
582+
writer.sync(takeSyncFuture.isForceSync());
584583
TraceUtil.addTimelineAnnotation("writer synced");
585584
currentSequence = updateHighestSyncedSequence(currentSequence);
586585
} catch (IOException e) {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

2020
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.fs.FileSystem;
2324
import org.apache.hadoop.fs.Path;
2425
import org.apache.hadoop.hbase.HBaseClassTestRule;
2526
import org.apache.hadoop.hbase.HConstants;
2627
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
2728
import org.apache.hadoop.hbase.testclassification.MediumTests;
29+
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
2830
import org.junit.AfterClass;
2931
import org.junit.BeforeClass;
3032
import org.junit.ClassRule;
@@ -72,18 +74,54 @@ protected void resetSyncFlag(CustomAsyncFSWAL wal) {
7274
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
7375
return wal.getSyncFlag();
7476
}
77+
78+
@Override
79+
protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) {
80+
return wal.getWriterSyncFlag();
81+
}
7582
}
7683

7784
class CustomAsyncFSWAL extends AsyncFSWAL {
85+
7886
private Boolean syncFlag;
7987

88+
private Boolean writerSyncFlag;
89+
8090
public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
8191
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
8292
throws FailedLogCloseException, IOException {
8393
super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null,
8494
eventLoopGroup, channelClass);
8595
}
8696

97+
@Override
98+
protected AsyncWriter createWriterInstance(Path path) throws IOException {
99+
AsyncWriter writer = super.createWriterInstance(path);
100+
return new AsyncWriter() {
101+
102+
@Override
103+
public void close() throws IOException {
104+
writer.close();
105+
}
106+
107+
@Override
108+
public long getLength() {
109+
return writer.getLength();
110+
}
111+
112+
@Override
113+
public CompletableFuture<Long> sync(boolean forceSync) {
114+
writerSyncFlag = forceSync;
115+
return writer.sync(forceSync);
116+
}
117+
118+
@Override
119+
public void append(Entry entry) {
120+
writer.append(entry);
121+
}
122+
};
123+
}
124+
87125
@Override
88126
public void sync(boolean forceSync) throws IOException {
89127
syncFlag = forceSync;
@@ -98,9 +136,14 @@ public void sync(long txid, boolean forceSync) throws IOException {
98136

99137
void resetSyncFlag() {
100138
this.syncFlag = null;
139+
this.writerSyncFlag = null;
101140
}
102141

103142
Boolean getSyncFlag() {
104143
return syncFlag;
105144
}
145+
146+
Boolean getWriterSyncFlag() {
147+
return writerSyncFlag;
148+
}
106149
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.hbase.HBaseClassTestRule;
2525
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
2626
import org.apache.hadoop.hbase.testclassification.MediumTests;
27+
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
2728
import org.junit.ClassRule;
2829
import org.junit.experimental.categories.Category;
2930

@@ -51,16 +52,51 @@ protected void resetSyncFlag(CustomFSHLog wal) {
5152
protected Boolean getSyncFlag(CustomFSHLog wal) {
5253
return wal.getSyncFlag();
5354
}
55+
56+
@Override
57+
protected Boolean getWriterSyncFlag(CustomFSHLog wal) {
58+
return wal.getWriterSyncFlag();
59+
}
5460
}
5561

5662
class CustomFSHLog extends FSHLog {
5763
private Boolean syncFlag;
5864

65+
private Boolean writerSyncFlag;
66+
5967
public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
6068
throws IOException {
6169
super(fs, root, logDir, conf);
6270
}
6371

72+
@Override
73+
protected Writer createWriterInstance(Path path) throws IOException {
74+
Writer writer = super.createWriterInstance(path);
75+
return new Writer() {
76+
77+
@Override
78+
public void close() throws IOException {
79+
writer.close();
80+
}
81+
82+
@Override
83+
public long getLength() {
84+
return writer.getLength();
85+
}
86+
87+
@Override
88+
public void sync(boolean forceSync) throws IOException {
89+
writerSyncFlag = forceSync;
90+
writer.sync(forceSync);
91+
}
92+
93+
@Override
94+
public void append(Entry entry) throws IOException {
95+
writer.append(entry);
96+
}
97+
};
98+
}
99+
64100
@Override
65101
public void sync(boolean forceSync) throws IOException {
66102
syncFlag = forceSync;
@@ -75,9 +111,14 @@ public void sync(long txid, boolean forceSync) throws IOException {
75111

76112
void resetSyncFlag() {
77113
this.syncFlag = null;
114+
this.writerSyncFlag = null;
78115
}
79116

80117
Boolean getSyncFlag() {
81118
return syncFlag;
82119
}
120+
121+
Boolean getWriterSyncFlag() {
122+
return writerSyncFlag;
123+
}
83124
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

20-
import static org.junit.Assert.assertEquals;
2120
import static org.junit.Assert.assertFalse;
2221
import static org.junit.Assert.assertNull;
2322
import static org.junit.Assert.assertTrue;
@@ -77,54 +76,76 @@ protected abstract T getWAL(FileSystem fs, Path root, String logDir, Configurati
7776

7877
protected abstract Boolean getSyncFlag(T wal);
7978

79+
protected abstract Boolean getWriterSyncFlag(T wal);
80+
8081
@Test
8182
public void testWALDurability() throws IOException {
83+
byte[] bytes = Bytes.toBytes(getName());
84+
Put put = new Put(bytes);
85+
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
86+
8287
// global hbase.wal.hsync false, no override in put call - hflush
8388
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
8489
FileSystem fs = FileSystem.get(conf);
8590
Path rootDir = new Path(dir + getName());
8691
T wal = getWAL(fs, rootDir, getName(), conf);
8792
HRegion region = initHRegion(tableName, null, null, wal);
88-
byte[] bytes = Bytes.toBytes(getName());
89-
Put put = new Put(bytes);
90-
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
91-
92-
resetSyncFlag(wal);
93-
assertNull(getSyncFlag(wal));
94-
region.put(put);
95-
assertFalse(getSyncFlag(wal));
96-
97-
region.close();
98-
wal.close();
93+
try {
94+
resetSyncFlag(wal);
95+
assertNull(getSyncFlag(wal));
96+
assertNull(getWriterSyncFlag(wal));
97+
region.put(put);
98+
assertFalse(getSyncFlag(wal));
99+
assertFalse(getWriterSyncFlag(wal));
100+
101+
// global hbase.wal.hsync false, durability set in put call - fsync
102+
put.setDurability(Durability.FSYNC_WAL);
103+
resetSyncFlag(wal);
104+
assertNull(getSyncFlag(wal));
105+
assertNull(getWriterSyncFlag(wal));
106+
region.put(put);
107+
assertTrue(getSyncFlag(wal));
108+
assertTrue(getWriterSyncFlag(wal));
109+
} finally {
110+
HBaseTestingUtility.closeRegionAndWAL(region);
111+
}
99112

100113
// global hbase.wal.hsync true, no override in put call
101114
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
102115
fs = FileSystem.get(conf);
103116
wal = getWAL(fs, rootDir, getName(), conf);
104117
region = initHRegion(tableName, null, null, wal);
105118

106-
resetSyncFlag(wal);
107-
assertNull(getSyncFlag(wal));
108-
region.put(put);
109-
assertEquals(getSyncFlag(wal), true);
110-
111-
// global hbase.wal.hsync true, durability set in put call - fsync
112-
put.setDurability(Durability.FSYNC_WAL);
113-
resetSyncFlag(wal);
114-
assertNull(getSyncFlag(wal));
115-
region.put(put);
116-
assertTrue(getSyncFlag(wal));
117-
118-
// global hbase.wal.hsync true, durability set in put call - sync
119-
put = new Put(bytes);
120-
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
121-
put.setDurability(Durability.SYNC_WAL);
122-
resetSyncFlag(wal);
123-
assertNull(getSyncFlag(wal));
124-
region.put(put);
125-
assertFalse(getSyncFlag(wal));
126-
127-
HBaseTestingUtility.closeRegionAndWAL(region);
119+
try {
120+
resetSyncFlag(wal);
121+
assertNull(getSyncFlag(wal));
122+
assertNull(getWriterSyncFlag(wal));
123+
region.put(put);
124+
assertTrue(getSyncFlag(wal));
125+
assertTrue(getWriterSyncFlag(wal));
126+
127+
// global hbase.wal.hsync true, durability set in put call - fsync
128+
put.setDurability(Durability.FSYNC_WAL);
129+
resetSyncFlag(wal);
130+
assertNull(getSyncFlag(wal));
131+
assertNull(getWriterSyncFlag(wal));
132+
region.put(put);
133+
assertTrue(getSyncFlag(wal));
134+
assertTrue(getWriterSyncFlag(wal));
135+
136+
// global hbase.wal.hsync true, durability set in put call - sync
137+
put = new Put(bytes);
138+
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
139+
put.setDurability(Durability.SYNC_WAL);
140+
resetSyncFlag(wal);
141+
assertNull(getSyncFlag(wal));
142+
assertNull(getWriterSyncFlag(wal));
143+
region.put(put);
144+
assertFalse(getSyncFlag(wal));
145+
assertFalse(getWriterSyncFlag(wal));
146+
} finally {
147+
HBaseTestingUtility.closeRegionAndWAL(region);
148+
}
128149
}
129150

130151
private String getName() {

0 commit comments

Comments
 (0)