Skip to content

Commit ce6db03

Browse files
author
sbarnoud060710
committed
HBASE-22634 Improve performance of BufferedMutator
Fix findBugs
1 parent 737bd2d commit ce6db03

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,8 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache)
547547
* @param actionsForReplicaThread original actions for replica thread; null on non-first call.
548548
*/
549549
// Must be synchronized because of the background thread writeBufferPeriodicFlushTimer
550+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD",
551+
justification="Slots are freed while in the executor thread, so the thread is not yet available for the pool. In that case, we have a RejectedExecutionException. Sleep let some time to threads to be available again")
550552
synchronized void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
551553
int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
552554
// Run the last item on the same thread if we are already on a send thread.

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,14 +301,16 @@ public int getOperationTimeout() {
301301
public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
302302
// This will avoid concurrency between period flush, flush() and close()
303303
// mutate are not synchronized, because it use doFlush(false)
304+
boolean haveLocked = false;
304305
if (writeBufferPeriodicFlushTimer != null) {
305306
lock.lock();
307+
haveLocked = false; // make sure to unlock even if writeBufferPeriodicFlushTimer is set to null before the end
306308
}
307309
try {
308310
checkClose();
309311
doFlush(true);
310312
} finally {
311-
if (writeBufferPeriodicFlushTimer != null) {
313+
if (haveLocked) {
312314
lock.unlock();
313315
}
314316
}
@@ -320,6 +322,8 @@ public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsEx
320322
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
321323
* returning. Otherwise, flush until buffer size is smaller than threshold
322324
*/
325+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD",
326+
justification="Backpressure, when we have to many pending Future, we wait under the lock to slow down the application")
323327
private void doFlush(boolean flushAll) throws InterruptedIOException,
324328
RetriesExhaustedWithDetailsException {
325329
List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
* - the amount of time spent in those background threads
3838
*/
3939

40+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
41+
justification="Class provided as illustration for application as an alternative to the default pool")
4042
@InterfaceAudience.Public
4143
@InterfaceStability.Evolving
4244
public class BufferedMutatorThreadPoolExecutor extends ThreadPoolExecutor {

0 commit comments

Comments
 (0)