Skip to content

Commit a2406f3

Browse files
authored
HBASE-26660 delayed FlushRegionEntry should be removed when we need a non-delayed one (#4042)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent a3e7d36 commit a2406f3

File tree

3 files changed

+113
-10
lines changed

3 files changed

+113
-10
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9227,6 +9227,10 @@ public void incrementFlushesQueuedCount() {
92279227
flushesQueued.incrementAndGet();
92289228
}
92299229

9230+
protected void decrementFlushesQueuedCount() {
9231+
flushesQueued.decrementAndGet();
9232+
}
9233+
92309234
/**
92319235
* Do not change this sequence id.
92329236
* @return sequenceId

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class MemStoreFlusher implements FlushRequester {
8181
// a corresponding entry in the other.
8282
private final BlockingQueue<FlushQueueEntry> flushQueue =
8383
new DelayQueue<FlushQueueEntry>();
84-
private final Map<Region, FlushRegionEntry> regionsInQueue =
84+
protected final Map<Region, FlushRegionEntry> regionsInQueue =
8585
new HashMap<Region, FlushRegionEntry>();
8686
private AtomicBoolean wakeupPending = new AtomicBoolean();
8787

@@ -363,16 +363,28 @@ private boolean isAboveLowWaterMark() {
363363
@Override
364364
public boolean requestFlush(Region r, boolean forceFlushAllStores) {
365365
synchronized (regionsInQueue) {
366-
if (!regionsInQueue.containsKey(r)) {
367-
// This entry has no delay so it will be added at the top of the flush
368-
// queue. It'll come out near immediately.
369-
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
370-
this.regionsInQueue.put(r, fqe);
371-
this.flushQueue.add(fqe);
372-
((HRegion)r).incrementFlushesQueuedCount();
373-
return true;
366+
FlushRegionEntry existFqe = regionsInQueue.get(r);
367+
if (existFqe != null) {
368+
// if a delayed one exists and not reach the time to execute, just remove it
369+
if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
370+
LOG.info("Remove the existing delayed flush entry for " + r + ", "
371+
+ "because we need to flush it immediately");
372+
this.regionsInQueue.remove(r);
373+
this.flushQueue.remove(existFqe);
374+
((HRegion)r).decrementFlushesQueuedCount();
375+
} else {
376+
LOG.info("Flush already requested on " + r);
377+
return false;
378+
}
374379
}
375-
return false;
380+
381+
// This entry has no delay so it will be added at the top of the flush
382+
// queue. It'll come out near immediately.
383+
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
384+
this.regionsInQueue.put(r, fqe);
385+
this.flushQueue.add(fqe);
386+
((HRegion)r).incrementFlushesQueuedCount();
387+
return true;
376388
}
377389
}
378390

@@ -752,6 +764,13 @@ public boolean isMaximumWait(final long maximumWait) {
752764
return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
753765
}
754766

767+
/**
768+
* @return True if the entry is a delay flush task
769+
*/
770+
protected boolean isDelay() {
771+
return this.whenToExpire > this.createTime;
772+
}
773+
755774
/**
756775
* @return Count of times {@link #requeue(long)} was called; i.e this is
757776
* number of times we've been requeued.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
import static org.mockito.Mockito.doReturn;
24+
import static org.mockito.Mockito.mock;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.HRegionInfo;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.testclassification.SmallTests;
29+
import org.apache.hadoop.hbase.util.Threads;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.junit.experimental.categories.Category;
33+
34+
@Category(SmallTests.class)
35+
public class TestMemStoreFlusher {
36+
private MemStoreFlusher msf;
37+
38+
@Before
39+
public void setUp() throws Exception {
40+
Configuration conf = new Configuration();
41+
conf.set("hbase.hstore.flusher.count", "0");
42+
msf = new MemStoreFlusher(conf, null);
43+
}
44+
45+
@Test
46+
public void testReplaceDelayedFlushEntry() {
47+
HRegionInfo hri = new HRegionInfo(1, TableName.valueOf("TestTable"), 0);
48+
HRegion r = mock(HRegion.class);
49+
doReturn(hri).when(r).getRegionInfo();
50+
51+
// put a delayed task with 30s delay
52+
msf.requestDelayedFlush(r, 30000, false);
53+
assertEquals(1, msf.getFlushQueueSize());
54+
assertTrue(msf.regionsInQueue.get(r).isDelay());
55+
56+
// put a non-delayed task, then the delayed one should be replaced
57+
assertTrue(msf.requestFlush(r, false));
58+
assertEquals(1, msf.getFlushQueueSize());
59+
assertFalse(msf.regionsInQueue.get(r).isDelay());
60+
}
61+
62+
@Test
63+
public void testNotReplaceDelayedFlushEntryWhichExpired() {
64+
HRegionInfo hri = new HRegionInfo(1, TableName.valueOf("TestTable"), 0);
65+
HRegion r = mock(HRegion.class);
66+
doReturn(hri).when(r).getRegionInfo();
67+
68+
// put a delayed task with 100ms delay
69+
msf.requestDelayedFlush(r, 100, false);
70+
assertEquals(1, msf.getFlushQueueSize());
71+
assertTrue(msf.regionsInQueue.get(r).isDelay());
72+
73+
Threads.sleep(200);
74+
75+
// put a non-delayed task, and the delayed one is expired, so it should not be replaced
76+
assertFalse(msf.requestFlush(r, false));
77+
assertEquals(1, msf.getFlushQueueSize());
78+
assertTrue(msf.regionsInQueue.get(r).isDelay());
79+
}
80+
}

0 commit comments

Comments
 (0)