Skip to content

Commit b197a63

Browse files
committed
HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used
1 parent 8cfc46d commit b197a63

File tree

11 files changed

+378
-22
lines changed

11 files changed

+378
-22
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Optional;
26-
26+
import java.util.concurrent.atomic.AtomicInteger;
2727
import org.apache.hadoop.hbase.CellScanner;
2828
import org.apache.hadoop.hbase.DoNotRetryIOException;
2929
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
@@ -51,7 +51,7 @@
5151
* the result.
5252
*/
5353
@InterfaceAudience.Private
54-
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
54+
public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
5555

5656
protected final int id; // the client's call id
5757
protected final BlockingService service;
@@ -91,6 +91,12 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
9191
private long exceptionSize = 0;
9292
private final boolean retryImmediatelySupported;
9393

94+
// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
95+
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
96+
// why we can not use a general reference counting is that, we may call cleanup multiple times in
97+
// the current implementation. We should fix this in the future.
98+
private final AtomicInteger reference = new AtomicInteger(0b01);
99+
94100
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
95101
justification = "Can't figure why this complaint is happening... see below")
96102
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
@@ -141,14 +147,43 @@ public void done() {
141147
cleanup();
142148
}
143149

150+
private void release(int mask) {
151+
for (;;) {
152+
int ref = reference.get();
153+
if ((ref & mask) == 0) {
154+
return;
155+
}
156+
int nextRef = ref & (~mask);
157+
if (reference.compareAndSet(ref, nextRef)) {
158+
if (nextRef == 0) {
159+
if (this.reqCleanup != null) {
160+
this.reqCleanup.run();
161+
}
162+
}
163+
return;
164+
}
165+
}
166+
}
167+
144168
@Override
145169
public void cleanup() {
146-
if (this.reqCleanup != null) {
147-
this.reqCleanup.run();
148-
this.reqCleanup = null;
170+
release(0b01);
171+
}
172+
173+
public void retainByWAL() {
174+
for (;;) {
175+
int ref = reference.get();
176+
int nextRef = ref | 0b10;
177+
if (reference.compareAndSet(ref, nextRef)) {
178+
return;
179+
}
149180
}
150181
}
151182

183+
public void releaseByWAL() {
184+
release(0b10);
185+
}
186+
152187
@Override
153188
public String toString() {
154189
return toShortString() + " param: " +

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@
5959
import org.apache.hadoop.hbase.client.RegionInfo;
6060
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
6161
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
62+
import org.apache.hadoop.hbase.ipc.RpcCall;
63+
import org.apache.hadoop.hbase.ipc.RpcServer;
64+
import org.apache.hadoop.hbase.ipc.ServerCall;
6265
import org.apache.hadoop.hbase.log.HBaseMarkers;
6366
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
6467
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -971,7 +974,7 @@ boolean isUnflushedEntries() {
971974
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
972975
*/
973976
@VisibleForTesting
974-
void atHeadOfRingBufferEventHandlerAppend() {
977+
protected void atHeadOfRingBufferEventHandlerAppend() {
975978
// Noop
976979
}
977980

@@ -1061,8 +1064,10 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
10611064
txidHolder.setValue(ringBuffer.next());
10621065
});
10631066
long txid = txidHolder.longValue();
1067+
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
1068+
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
10641069
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
1065-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
1070+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
10661071
entry.stampRegionSequenceId(we);
10671072
ringBuffer.get(txid).load(entry);
10681073
} finally {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
5151
import org.apache.hadoop.hbase.client.RegionInfo;
5252
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
53+
import org.apache.hadoop.hbase.ipc.ServerCall;
5354
import org.apache.hadoop.hbase.trace.TraceUtil;
5455
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
5556
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -323,7 +324,9 @@ private void syncFailed(long epochWhenSync, Throwable error) {
323324
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
324325
highestSyncedTxid.set(processedTxid);
325326
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
326-
if (iter.next().getTxid() <= processedTxid) {
327+
FSWALEntry entry = iter.next();
328+
if (entry.getTxid() <= processedTxid) {
329+
entry.getRpcCall().ifPresent(ServerCall::releaseByWAL);
327330
iter.remove();
328331
} else {
329332
break;
@@ -487,6 +490,7 @@ private void drainNonMarkerEditsAndFailSyncs() {
487490
while (iter.hasNext()) {
488491
FSWALEntry entry = iter.next();
489492
if (!entry.getEdit().isMetaEdit()) {
493+
entry.getRpcCall().ifPresent(ServerCall::releaseByWAL);
490494
hasNonMarkerEdits = true;
491495
break;
492496
}
@@ -497,7 +501,10 @@ private void drainNonMarkerEditsAndFailSyncs() {
497501
if (!iter.hasNext()) {
498502
break;
499503
}
500-
iter.next();
504+
iter.next().getRpcCall().ifPresent(ServerCall::releaseByWAL);
505+
}
506+
for (FSWALEntry entry : unackedAppends) {
507+
entry.getRpcCall().ifPresent(ServerCall::releaseByWAL);
501508
}
502509
unackedAppends.clear();
503510
// fail the sync futures which are under the txid of the first remaining edit, if none, fail

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.lmax.disruptor.TimeoutException;
3030
import com.lmax.disruptor.dsl.Disruptor;
3131
import com.lmax.disruptor.dsl.ProducerType;
32-
3332
import java.io.IOException;
3433
import java.io.OutputStream;
3534
import java.util.Arrays;
@@ -39,13 +38,13 @@
3938
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.TimeUnit;
4140
import java.util.concurrent.atomic.AtomicInteger;
42-
4341
import org.apache.hadoop.conf.Configuration;
4442
import org.apache.hadoop.fs.FSDataOutputStream;
4543
import org.apache.hadoop.fs.FileSystem;
4644
import org.apache.hadoop.fs.Path;
4745
import org.apache.hadoop.hbase.HConstants;
4846
import org.apache.hadoop.hbase.client.RegionInfo;
47+
import org.apache.hadoop.hbase.ipc.ServerCall;
4948
import org.apache.hadoop.hbase.regionserver.HRegion;
5049
import org.apache.hadoop.hbase.trace.TraceUtil;
5150
import org.apache.hadoop.hbase.util.Bytes;
@@ -64,6 +63,7 @@
6463
import org.apache.yetus.audience.InterfaceAudience;
6564
import org.slf4j.Logger;
6665
import org.slf4j.LoggerFactory;
66+
6767
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
6868

6969
/**
@@ -985,7 +985,6 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
985985
//TODO handle htrace API change, see HBASE-18895
986986
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
987987
try {
988-
989988
if (this.exception != null) {
990989
// Return to keep processing events coming off the ringbuffer
991990
return;
@@ -1002,6 +1001,8 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
10021001
: new DamagedWALException("On sync", this.exception));
10031002
// Return to keep processing events coming off the ringbuffer
10041003
return;
1004+
} finally {
1005+
entry.getRpcCall().ifPresent(ServerCall::releaseByWAL);
10051006
}
10061007
} else {
10071008
// What is this if not an append or sync. Fail all up to this!!!

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.util.Collections;
2424
import java.util.List;
25+
import java.util.Optional;
2526
import java.util.Set;
2627
import java.util.TreeSet;
2728

@@ -30,6 +31,7 @@
3031
import org.apache.hadoop.hbase.CellUtil;
3132
import org.apache.hadoop.hbase.PrivateCellUtil;
3233
import org.apache.hadoop.hbase.client.RegionInfo;
34+
import org.apache.hadoop.hbase.ipc.ServerCall;
3335
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
3436
import org.apache.hadoop.hbase.util.Bytes;
3537
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -56,19 +58,24 @@ class FSWALEntry extends Entry {
5658
private final transient boolean inMemstore;
5759
private final transient RegionInfo regionInfo;
5860
private final transient Set<byte[]> familyNames;
61+
private final transient Optional<ServerCall<?>> rpcCall;
5962

60-
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
61-
final RegionInfo regionInfo, final boolean inMemstore) {
63+
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
64+
final boolean inMemstore, ServerCall<?> rpcCall) {
6265
super(key, edit);
6366
this.inMemstore = inMemstore;
6467
this.regionInfo = regionInfo;
6568
this.txid = txid;
6669
if (inMemstore) {
6770
// construct familyNames here to reduce the work of log sinker.
68-
Set<byte []> families = edit.getFamilies();
69-
this.familyNames = families != null? families: collectFamilies(edit.getCells());
71+
Set<byte[]> families = edit.getFamilies();
72+
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
7073
} else {
71-
this.familyNames = Collections.<byte[]>emptySet();
74+
this.familyNames = Collections.<byte[]> emptySet();
75+
}
76+
this.rpcCall = Optional.ofNullable(rpcCall);
77+
if (rpcCall != null) {
78+
rpcCall.retainByWAL();
7279
}
7380
}
7481

@@ -129,4 +136,8 @@ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws
129136
Set<byte[]> getFamilyNames() {
130137
return familyNames;
131138
}
139+
140+
Optional<ServerCall<?>> getRpcCall() {
141+
return rpcCall;
142+
}
132143
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,9 +1156,8 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ
11561156
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
11571157
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
11581158
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1159-
FSWALEntry entry =
1160-
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
1161-
rowName, family, ee, index), hri, true);
1159+
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1160+
createWALEdit(rowName, family, ee, index), hri, true, null);
11621161
entry.stampRegionSequenceId(mvcc.begin());
11631162
return entry;
11641163
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir
113113
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
114114

115115
@Override
116-
void atHeadOfRingBufferEventHandlerAppend() {
116+
protected void atHeadOfRingBufferEventHandlerAppend() {
117117
action.run();
118118
super.atHeadOfRingBufferEventHandlerAppend();
119119
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir
8989
prefix, suffix) {
9090

9191
@Override
92-
void atHeadOfRingBufferEventHandlerAppend() {
92+
protected void atHeadOfRingBufferEventHandlerAppend() {
9393
action.run();
9494
super.atHeadOfRingBufferEventHandlerAppend();
9595
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
27+
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
28+
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
29+
import org.apache.hadoop.hbase.testclassification.MediumTests;
30+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
31+
import org.apache.hadoop.hbase.util.CommonFSUtils;
32+
import org.apache.hadoop.hbase.util.Pair;
33+
import org.junit.AfterClass;
34+
import org.junit.BeforeClass;
35+
import org.junit.ClassRule;
36+
import org.junit.experimental.categories.Category;
37+
38+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
39+
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
40+
41+
/**
42+
* Testcase for HBASE-22539
43+
*/
44+
@Category({ RegionServerTests.class, MediumTests.class })
45+
public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer
46+
extends WALCorruptionDueToDanglingByteBufferTestBase {
47+
48+
@ClassRule
49+
public static final HBaseClassTestRule CLASS_RULE =
50+
HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
51+
52+
public static final class PauseWAL extends AsyncFSWAL {
53+
54+
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
55+
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
56+
String prefix, String suffix, EventLoopGroup eventLoopGroup,
57+
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
58+
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
59+
eventLoopGroup, channelClass);
60+
}
61+
62+
@Override
63+
protected void atHeadOfRingBufferEventHandlerAppend() {
64+
if (ARRIVE != null) {
65+
ARRIVE.countDown();
66+
try {
67+
RESUME.await();
68+
} catch (InterruptedException e) {
69+
}
70+
}
71+
}
72+
}
73+
74+
public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
75+
76+
private EventLoopGroup eventLoopGroup;
77+
78+
private Class<? extends Channel> channelClass;
79+
80+
@Override
81+
protected PauseWAL createWAL() throws IOException {
82+
return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
83+
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
84+
conf, listeners, true, logPrefix,
85+
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
86+
channelClass);
87+
}
88+
89+
@Override
90+
protected void doInit(Configuration conf) throws IOException {
91+
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
92+
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
93+
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
94+
channelClass = eventLoopGroupAndChannelClass.getSecond();
95+
}
96+
}
97+
98+
@BeforeClass
99+
public static void setUp() throws Exception {
100+
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
101+
WALProvider.class);
102+
UTIL.startMiniCluster(1);
103+
UTIL.createTable(TABLE_NAME, CF);
104+
UTIL.waitTableAvailable(TABLE_NAME);
105+
}
106+
107+
@AfterClass
108+
public static void tearDown() throws Exception {
109+
UTIL.shutdownMiniCluster();
110+
}
111+
}

0 commit comments

Comments
 (0)