Skip to content

Commit d9cc51f

Browse files
comnetworkbbeaudreault
authored andcommitted
HBASE-26679 Wait on the future returned by FanOutOneBlockAsyncDFSOutput.flush would stuck (apache#4077)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 973492a commit d9cc51f

File tree

2 files changed

+225
-4
lines changed

2 files changed

+225
-4
lines changed

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
2323
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
2424
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
25+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
2526
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
2627
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
27-
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
2828

29+
import com.google.errorprone.annotations.RestrictedApi;
2930
import java.io.IOException;
3031
import java.io.InterruptedIOException;
3132
import java.nio.ByteBuffer;
@@ -40,7 +41,6 @@
4041
import java.util.concurrent.ExecutionException;
4142
import java.util.concurrent.TimeUnit;
4243
import java.util.function.Supplier;
43-
4444
import org.apache.hadoop.conf.Configuration;
4545
import org.apache.hadoop.crypto.Encryptor;
4646
import org.apache.hadoop.fs.Path;
@@ -218,7 +218,11 @@ private void completed(Channel channel) {
218218
// so that the implementation will not burn up our brain as there are multiple state changes and
219219
// checks.
220220
private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
221-
if (state == State.BROKEN || state == State.CLOSED) {
221+
if (state == State.CLOSED) {
222+
return;
223+
}
224+
if (state == State.BROKEN) {
225+
failWaitingAckQueue(channel, errorSupplier);
222226
return;
223227
}
224228
if (state == State.CLOSING) {
@@ -230,6 +234,11 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
230234
}
231235
// disable further write, and fail all pending ack.
232236
state = State.BROKEN;
237+
failWaitingAckQueue(channel, errorSupplier);
238+
datanodeList.forEach(ch -> ch.close());
239+
}
240+
241+
private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
233242
Throwable error = errorSupplier.get();
234243
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
235244
Callback c = iter.next();
@@ -246,7 +255,6 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
246255
}
247256
break;
248257
}
249-
datanodeList.forEach(ch -> ch.close());
250258
}
251259

252260
@Sharable
@@ -579,4 +587,10 @@ public boolean isBroken() {
579587
public long getSyncedLength() {
580588
return this.ackedBlockLength;
581589
}
590+
591+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
592+
allowedOnPath = ".*/src/test/.*")
593+
List<Channel> getDatanodeList() {
594+
return this.datanodeList;
595+
}
582596
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io.asyncfs;
19+
20+
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.fail;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CyclicBarrier;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.ThreadLocalRandom;
30+
import org.apache.hadoop.fs.Path;
31+
import org.apache.hadoop.hbase.HBaseClassTestRule;
32+
import org.apache.hadoop.hbase.testclassification.MediumTests;
33+
import org.apache.hadoop.hbase.testclassification.MiscTests;
34+
import org.apache.hadoop.hdfs.DistributedFileSystem;
35+
import org.junit.AfterClass;
36+
import org.junit.BeforeClass;
37+
import org.junit.ClassRule;
38+
import org.junit.Rule;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
import org.junit.rules.TestName;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
45+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
46+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
47+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
48+
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
49+
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
50+
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
51+
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
52+
53+
54+
/**
55+
* Testcase for HBASE-26679, here we introduce a separate test class and not put the testcase in
56+
* {@link TestFanOutOneBlockAsyncDFSOutput} because we will send heartbeat to DN when there is no
57+
* out going packet, the timeout is controlled by
58+
* {@link TestFanOutOneBlockAsyncDFSOutput#READ_TIMEOUT_MS},which is 2 seconds, it will keep sending
59+
* package out and DN will respond immedately and then mess up the testing handler added by us. So
60+
* in this test class we use the default value for timeout which is 60 seconds and it is enough for
61+
* this test.
62+
*/
63+
@Category({ MiscTests.class, MediumTests.class })
64+
public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
65+
66+
@ClassRule
67+
public static final HBaseClassTestRule CLASS_RULE =
68+
HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutputHang.class);
69+
70+
private static final Logger LOG =
71+
LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class);
72+
73+
private static DistributedFileSystem FS;
74+
75+
private static EventLoopGroup EVENT_LOOP_GROUP;
76+
77+
private static Class<? extends Channel> CHANNEL_CLASS;
78+
79+
private static FanOutOneBlockAsyncDFSOutput OUT;
80+
81+
@Rule
82+
public TestName name = new TestName();
83+
84+
@BeforeClass
85+
public static void setUp() throws Exception {
86+
startMiniDFSCluster(2);
87+
FS = CLUSTER.getFileSystem();
88+
EVENT_LOOP_GROUP = new NioEventLoopGroup();
89+
CHANNEL_CLASS = NioSocketChannel.class;
90+
Path f = new Path("/testHang");
91+
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
92+
OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
93+
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
94+
}
95+
96+
@AfterClass
97+
public static void tearDown() throws Exception {
98+
if (OUT != null) {
99+
OUT.recoverAndClose(null);
100+
}
101+
if (EVENT_LOOP_GROUP != null) {
102+
EVENT_LOOP_GROUP.shutdownGracefully().sync().get();
103+
}
104+
shutdownMiniDFSCluster();
105+
}
106+
107+
/**
108+
* <pre>
109+
* This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
110+
* The threads sequence before HBASE-26679 is:
111+
* 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
112+
* {@link FanOutOneBlockAsyncDFSOutput.Callback} in
113+
* {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
114+
* 2.The ack from dn1 arrives firstly and triggers Netty to invoke
115+
* {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
116+
* {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
117+
* {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
118+
* 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
119+
* so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
120+
* and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
121+
* contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
122+
* {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
123+
* {@link FanOutOneBlockAsyncDFSOutput#state} is set to
124+
* {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
125+
* {@link FanOutOneBlockAsyncDFSOutput#failed}.
126+
* 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
127+
* but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
128+
* {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
129+
* {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
130+
* returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
131+
* After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
132+
* is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
133+
* {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
134+
* </pre>
135+
*/
136+
@Test
137+
public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception {
138+
final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2);
139+
List<Channel> channels = OUT.getDatanodeList();
140+
Channel dn1Channel = channels.get(0);
141+
final List<String> protobufDecoderNames = new ArrayList<String>();
142+
dn1Channel.pipeline().forEach((entry) -> {
143+
if (ProtobufDecoder.class.isInstance(entry.getValue())) {
144+
protobufDecoderNames.add(entry.getKey());
145+
}
146+
});
147+
assertTrue(protobufDecoderNames.size() == 1);
148+
dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler",
149+
new ChannelInboundHandlerAdapter() {
150+
@Override
151+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
152+
super.channelRead(ctx, msg);
153+
dn1AckReceivedCyclicBarrier.await();
154+
}
155+
});
156+
157+
Channel dn2Channel = channels.get(1);
158+
/**
159+
* Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a
160+
* slow dn2.
161+
*/
162+
dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
163+
164+
@Override
165+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
166+
if (!(msg instanceof ByteBuf)) {
167+
ctx.fireChannelRead(msg);
168+
}
169+
}
170+
});
171+
172+
byte[] b = new byte[10];
173+
ThreadLocalRandom.current().nextBytes(b);
174+
OUT.write(b, 0, b.length);
175+
CompletableFuture<Long> future = OUT.flush(false);
176+
/**
177+
* Wait for ack from dn1.
178+
*/
179+
dn1AckReceivedCyclicBarrier.await();
180+
/**
181+
* First ack is received from dn1,close dn1Channel to simulate dn1 shut down or have a
182+
* exception.
183+
*/
184+
dn1Channel.close().get();
185+
try {
186+
/**
187+
* Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with
188+
* {@link ExecutionException}.
189+
*/
190+
future.get();
191+
fail();
192+
} catch (ExecutionException e) {
193+
assertTrue(e != null);
194+
LOG.info("expected exception caught when get future", e);
195+
}
196+
/**
197+
* Make sure all the data node channel are closed.
198+
*/
199+
channels.forEach(ch -> {
200+
try {
201+
ch.closeFuture().get();
202+
} catch (InterruptedException | ExecutionException e) {
203+
throw new RuntimeException(e);
204+
}
205+
});
206+
}
207+
}

0 commit comments

Comments
 (0)