Skip to content

Commit ac3d09e

Browse files
committed
HBASE-22577 BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture consume too much CPU time
1 parent 621dc88 commit ac3d09e

File tree

2 files changed

+52
-37
lines changed

2 files changed

+52
-37
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,34 +23,41 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.List;
26+
import java.util.Set;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.CompletionException;
29+
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.concurrent.ConcurrentLinkedQueue;
31+
import java.util.concurrent.atomic.AtomicLong;
2932
import java.util.regex.Matcher;
3033
import java.util.regex.Pattern;
31-
import java.util.stream.Collectors;
3234
import org.apache.hadoop.conf.Configuration;
3335
import org.apache.hadoop.hbase.TableName;
3436
import org.apache.hadoop.hbase.util.Pair;
3537
import org.apache.yetus.audience.InterfaceAudience;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3640

3741
/**
3842
* {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}.
3943
*/
4044
@InterfaceAudience.Private
4145
class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator {
4246

47+
private static final Logger LOG =
48+
LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class);
49+
4350
private final AsyncBufferedMutator mutator;
4451

4552
private final ExceptionListener listener;
4653

47-
private List<CompletableFuture<Void>> futures = new ArrayList<>();
54+
private final Set<CompletableFuture<Void>> futures = ConcurrentHashMap.newKeySet();
55+
56+
private final AtomicLong bufferedSize = new AtomicLong(0);
4857

4958
private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors =
5059
new ConcurrentLinkedQueue<>();
5160

52-
private final static int BUFFERED_FUTURES_THRESHOLD = 1024;
53-
5461
BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator,
5562
ExceptionListener listener) {
5663
this.mutator = mutator;
@@ -100,62 +107,69 @@ private RetriesExhaustedWithDetailsException makeError() {
100107
return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
101108
}
102109

110+
private void internalFlush() throws RetriesExhaustedWithDetailsException {
111+
// should get the future array before calling mutator.flush, otherwise we may hit an infinite
112+
// wait, since someone may add new future to the map after we calling the flush.
113+
CompletableFuture<?>[] toWait = futures.toArray(new CompletableFuture<?>[0]);
114+
mutator.flush();
115+
try {
116+
CompletableFuture.allOf(toWait).join();
117+
} catch (CompletionException e) {
118+
// just ignore, we will record the actual error in the errors field
119+
LOG.debug("Flush failed, you should get an exception thrown to your code", e);
120+
}
121+
if (!errors.isEmpty()) {
122+
RetriesExhaustedWithDetailsException error = makeError();
123+
listener.onException(error, this);
124+
}
125+
}
126+
103127
@Override
104128
public void mutate(List<? extends Mutation> mutations) throws IOException {
105-
List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
106129
List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
107130
for (int i = 0, n = fs.size(); i < n; i++) {
108131
CompletableFuture<Void> toComplete = new CompletableFuture<>();
109-
final int index = i;
110-
addListener(fs.get(index), (r, e) -> {
132+
futures.add(toComplete);
133+
Mutation mutation = mutations.get(i);
134+
long heapSize = mutation.heapSize();
135+
bufferedSize.addAndGet(heapSize);
136+
addListener(fs.get(i), (r, e) -> {
137+
futures.remove(toComplete);
138+
bufferedSize.addAndGet(-heapSize);
111139
if (e != null) {
112-
errors.add(Pair.newPair(mutations.get(index), e));
140+
errors.add(Pair.newPair(mutation, e));
113141
toComplete.completeExceptionally(e);
114142
} else {
115143
toComplete.complete(r);
116144
}
117145
});
118-
toBuffered.add(toComplete);
119146
}
120147
synchronized (this) {
121-
futures.addAll(toBuffered);
122-
if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
123-
tryCompleteFuture();
124-
}
125-
if (!errors.isEmpty()) {
148+
if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) {
149+
// We have too many mutations which are not completed yet, let's call a flush to release the
150+
// memory to prevent OOM
151+
// We use buffer size * 2 is because that, the async buffered mutator will flush
152+
// automatically when the write buffer size limit is reached, so usually we do not need to
153+
// call flush explicitly if the buffered size is only a little larger than the buffer size
154+
// limit. But if the buffered size is too large(2 times of the buffer size), we still need
155+
// to block here to prevent OOM.
156+
internalFlush();
157+
} else if (!errors.isEmpty()) {
126158
RetriesExhaustedWithDetailsException error = makeError();
127159
listener.onException(error, this);
128160
}
129161
}
130162
}
131163

132-
private void tryCompleteFuture() {
133-
futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
134-
}
135-
136164
@Override
137-
public void close() throws IOException {
138-
flush();
165+
public synchronized void close() throws IOException {
166+
internalFlush();
139167
mutator.close();
140168
}
141169

142170
@Override
143-
public void flush() throws IOException {
144-
mutator.flush();
145-
synchronized (this) {
146-
List<CompletableFuture<Void>> toComplete = this.futures;
147-
this.futures = new ArrayList<>();
148-
try {
149-
CompletableFuture.allOf(toComplete.toArray(new CompletableFuture<?>[toComplete.size()]))
150-
.join();
151-
} catch (CompletionException e) {
152-
// just ignore, we will record the actual error in the errors field
153-
}
154-
if (!errors.isEmpty()) {
155-
RetriesExhaustedWithDetailsException error = makeError();
156-
listener.onException(error, this);
157-
}
158-
}
171+
public synchronized void flush() throws IOException {
172+
internalFlush();
159173
}
160174

161175
@Override

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public static void tearDown() throws Exception {
6666

6767
@Test
6868
public void test() throws Exception {
69-
try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(TABLE_NAME)) {
69+
try (BufferedMutator mutator = TEST_UTIL.getConnection()
70+
.getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) {
7071
mutator.mutate(IntStream.range(0, COUNT / 2)
7172
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
7273
.collect(Collectors.toList()));

0 commit comments

Comments
 (0)