Skip to content

Commit a8a57ad

Browse files
authored
HBASE-29397 Deadlock in BufferedMuratorOverAsyncBufferedMutator (#7107)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent 28c757d commit a8a57ad

File tree

2 files changed

+79
-17
lines changed

2 files changed

+79
-17
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,23 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
135135
long heapSize = mutation.heapSize();
136136
bufferedSize.addAndGet(heapSize);
137137
addListener(fs.get(i), (r, e) -> {
138-
synchronized (this) {
139-
futures.remove(toComplete);
140-
bufferedSize.addAndGet(-heapSize);
141-
if (e != null) {
142-
errors.add(Pair.newPair(mutation, e));
143-
toComplete.completeExceptionally(e);
144-
} else {
145-
toComplete.complete(r);
146-
}
138+
bufferedSize.addAndGet(-heapSize);
139+
if (e != null) {
140+
errors.add(Pair.newPair(mutation, e));
141+
toComplete.completeExceptionally(e);
142+
} else {
143+
toComplete.complete(r);
147144
}
145+
// Only remove future after completing, and add the error to errors field before completing,
146+
// this is used as a guard in internalFlush method, which is used to make sure that
147+
// 1. If the future is already completed and removed from futures, the error should have
148+
// already been in errors field, so in internalFlush method, even if we do not wait on the
149+
// future, we should still get this error in errors field at the end of the internalFlush
150+
// method
151+
// 2. If we get this future in the internalFlush method for waiting, then after the future
152+
// complete, we should get this error in the errors field at the end of the internalFlush
153+
// method
154+
futures.remove(toComplete);
148155
});
149156
}
150157
synchronized (this) {

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.junit.Assert.assertArrayEquals;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.fail;
2123

2224
import java.io.IOException;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.concurrent.TimeUnit;
2329
import java.util.stream.Collectors;
2430
import java.util.stream.IntStream;
2531
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -28,12 +34,16 @@
2834
import org.apache.hadoop.hbase.testclassification.ClientTests;
2935
import org.apache.hadoop.hbase.testclassification.MediumTests;
3036
import org.apache.hadoop.hbase.util.Bytes;
37+
import org.junit.After;
3138
import org.junit.AfterClass;
39+
import org.junit.Before;
3240
import org.junit.BeforeClass;
3341
import org.junit.ClassRule;
3442
import org.junit.Test;
3543
import org.junit.experimental.categories.Category;
3644

45+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
46+
3747
@Category({ MediumTests.class, ClientTests.class })
3848
public class TestBufferedMutator {
3949

@@ -49,40 +59,85 @@ public class TestBufferedMutator {
4959

5060
private static byte[] CQ = Bytes.toBytes("cq");
5161

52-
private static int COUNT = 1024;
53-
5462
private static byte[] VALUE = new byte[1024];
5563

5664
@BeforeClass
5765
public static void setUp() throws Exception {
5866
TEST_UTIL.startMiniCluster(1);
59-
TEST_UTIL.createTable(TABLE_NAME, CF);
67+
ThreadLocalRandom.current().nextBytes(VALUE);
6068
}
6169

6270
@AfterClass
6371
public static void tearDown() throws Exception {
6472
TEST_UTIL.shutdownMiniCluster();
6573
}
6674

75+
@Before
76+
public void setUpBeforeTest() throws IOException {
77+
TEST_UTIL.createTable(TABLE_NAME, CF);
78+
}
79+
80+
@After
81+
public void tearDownAfterTest() throws IOException {
82+
TEST_UTIL.deleteTable(TABLE_NAME);
83+
}
84+
6785
@Test
6886
public void test() throws Exception {
87+
int count = 1024;
6988
try (BufferedMutator mutator = TEST_UTIL.getConnection()
7089
.getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) {
71-
mutator.mutate(IntStream.range(0, COUNT / 2)
90+
mutator.mutate(IntStream.range(0, count / 2)
7291
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
7392
.collect(Collectors.toList()));
7493
mutator.flush();
75-
mutator.mutate(IntStream.range(COUNT / 2, COUNT)
94+
mutator.mutate(IntStream.range(count / 2, count)
7695
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
7796
.collect(Collectors.toList()));
7897
mutator.close();
79-
verifyData();
98+
verifyData(count);
99+
}
100+
}
101+
102+
@Test
103+
public void testMultiThread() throws Exception {
104+
ExecutorService executor =
105+
Executors.newFixedThreadPool(16, new ThreadFactoryBuilder().setDaemon(true).build());
106+
// use a greater count and less write buffer size to trigger auto flush when mutate
107+
int count = 16384;
108+
try (BufferedMutator mutator = TEST_UTIL.getConnection()
109+
.getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(4 * 1024))) {
110+
IntStream.range(0, count / 2)
111+
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
112+
.forEach(put -> executor.execute(() -> {
113+
try {
114+
mutator.mutate(put);
115+
} catch (IOException e) {
116+
fail("failed to mutate: " + e.getMessage());
117+
}
118+
}));
119+
mutator.flush();
120+
IntStream.range(count / 2, count)
121+
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
122+
.forEach(put -> executor.execute(() -> {
123+
try {
124+
mutator.mutate(put);
125+
} catch (IOException e) {
126+
fail("failed to mutate: " + e.getMessage());
127+
}
128+
}));
129+
executor.shutdown();
130+
assertTrue(executor.awaitTermination(15, TimeUnit.SECONDS));
131+
mutator.close();
132+
} finally {
133+
executor.shutdownNow();
80134
}
135+
verifyData(count);
81136
}
82137

83-
private void verifyData() throws IOException {
138+
private void verifyData(int count) throws IOException {
84139
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
85-
for (int i = 0; i < COUNT; i++) {
140+
for (int i = 0; i < count; i++) {
86141
Result r = table.get(new Get(Bytes.toBytes(i)));
87142
assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ));
88143
}

0 commit comments

Comments
 (0)