Skip to content

Commit 4ad12e0

Browse files
authored
HBASE-23681 Add UT for procedure store region flusher (#1024)
Signed-off-by: stack <stack@apache.org>
1 parent 0a9e1f8 commit 4ad12e0

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.procedure2.store.region;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.mockito.ArgumentMatchers.anyBoolean;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
25+
26+
import java.io.IOException;
27+
import java.util.Collections;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.hbase.Abortable;
32+
import org.apache.hadoop.hbase.HBaseClassTestRule;
33+
import org.apache.hadoop.hbase.HBaseConfiguration;
34+
import org.apache.hadoop.hbase.Waiter;
35+
import org.apache.hadoop.hbase.regionserver.HRegion;
36+
import org.apache.hadoop.hbase.regionserver.HStore;
37+
import org.apache.hadoop.hbase.testclassification.MasterTests;
38+
import org.apache.hadoop.hbase.testclassification.MediumTests;
39+
import org.junit.After;
40+
import org.junit.Before;
41+
import org.junit.ClassRule;
42+
import org.junit.Test;
43+
import org.junit.experimental.categories.Category;
44+
45+
@Category({ MasterTests.class, MediumTests.class })
46+
public class TestRegionProcedureStoreFlush {
47+
48+
@ClassRule
49+
public static final HBaseClassTestRule CLASS_RULE =
50+
HBaseClassTestRule.forClass(TestRegionProcedureStoreFlush.class);
51+
52+
private Configuration conf;
53+
54+
private HRegion region;
55+
56+
private RegionFlusherAndCompactor flusher;
57+
58+
private AtomicInteger flushCalled;
59+
60+
private AtomicLong memstoreHeapSize;
61+
62+
private AtomicLong memstoreOffHeapSize;
63+
64+
@Before
65+
public void setUp() throws IOException {
66+
conf = HBaseConfiguration.create();
67+
region = mock(HRegion.class);
68+
HStore store = mock(HStore.class);
69+
when(store.getStorefilesCount()).thenReturn(1);
70+
when(region.getStores()).thenReturn(Collections.singletonList(store));
71+
flushCalled = new AtomicInteger(0);
72+
memstoreHeapSize = new AtomicLong(0);
73+
memstoreOffHeapSize = new AtomicLong(0);
74+
when(region.getMemStoreHeapSize()).thenAnswer(invocation -> memstoreHeapSize.get());
75+
when(region.getMemStoreOffHeapSize()).thenAnswer(invocation -> memstoreOffHeapSize.get());
76+
when(region.flush(anyBoolean())).thenAnswer(invocation -> {
77+
assertTrue(invocation.getArgument(0));
78+
memstoreHeapSize.set(0);
79+
memstoreOffHeapSize.set(0);
80+
flushCalled.incrementAndGet();
81+
return null;
82+
});
83+
}
84+
85+
@After
86+
public void tearDown() {
87+
if (flusher != null) {
88+
flusher.close();
89+
flusher = null;
90+
}
91+
}
92+
93+
private void initFlusher() {
94+
flusher = new RegionFlusherAndCompactor(conf, new Abortable() {
95+
96+
@Override
97+
public boolean isAborted() {
98+
return false;
99+
}
100+
101+
@Override
102+
public void abort(String why, Throwable e) {
103+
}
104+
}, region);
105+
}
106+
107+
@Test
108+
public void testTriggerFlushBySize() throws IOException, InterruptedException {
109+
conf.setLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY, 1024 * 1024);
110+
initFlusher();
111+
memstoreHeapSize.set(1000 * 1024);
112+
flusher.onUpdate();
113+
Thread.sleep(1000);
114+
assertEquals(0, flushCalled.get());
115+
memstoreOffHeapSize.set(1000 * 1024);
116+
flusher.onUpdate();
117+
Waiter.waitFor(conf, 2000, () -> flushCalled.get() == 1);
118+
}
119+
120+
private void assertTriggerFlushByChanges(int changes) throws InterruptedException {
121+
int currentFlushCalled = flushCalled.get();
122+
for (int i = 0; i < changes; i++) {
123+
flusher.onUpdate();
124+
}
125+
Thread.sleep(1000);
126+
assertEquals(currentFlushCalled, flushCalled.get());
127+
flusher.onUpdate();
128+
Waiter.waitFor(conf, 5000, () -> flushCalled.get() == currentFlushCalled + 1);
129+
}
130+
131+
@Test
132+
public void testTriggerFlushByChanges() throws InterruptedException {
133+
conf.setLong(RegionFlusherAndCompactor.FLUSH_PER_CHANGES_KEY, 10);
134+
initFlusher();
135+
assertTriggerFlushByChanges(10);
136+
assertTriggerFlushByChanges(10);
137+
}
138+
139+
@Test
140+
public void testPeriodicalFlush() throws InterruptedException {
141+
conf.setLong(RegionFlusherAndCompactor.FLUSH_INTERVAL_MS_KEY, 1000);
142+
initFlusher();
143+
assertEquals(0, flushCalled.get());
144+
Thread.sleep(1500);
145+
assertEquals(1, flushCalled.get());
146+
Thread.sleep(1000);
147+
assertEquals(2, flushCalled.get());
148+
149+
}
150+
}

0 commit comments

Comments
 (0)