|
41 | 41 |
|
42 | 42 | import java.io.IOException;
|
43 | 43 | import java.io.InterruptedIOException;
|
| 44 | +import java.lang.reflect.Field; |
44 | 45 | import java.math.BigDecimal;
|
45 | 46 | import java.nio.charset.StandardCharsets;
|
46 | 47 | import java.security.PrivilegedExceptionAction;
|
|
52 | 53 | import java.util.NavigableMap;
|
53 | 54 | import java.util.Objects;
|
54 | 55 | import java.util.TreeMap;
|
| 56 | +import java.util.concurrent.BlockingQueue; |
55 | 57 | import java.util.concurrent.Callable;
|
56 | 58 | import java.util.concurrent.CountDownLatch;
|
57 | 59 | import java.util.concurrent.ExecutorService;
|
58 | 60 | import java.util.concurrent.Executors;
|
59 | 61 | import java.util.concurrent.Future;
|
| 62 | +import java.util.concurrent.ScheduledExecutorService; |
| 63 | +import java.util.concurrent.ThreadPoolExecutor; |
60 | 64 | import java.util.concurrent.TimeUnit;
|
61 | 65 | import java.util.concurrent.atomic.AtomicBoolean;
|
62 | 66 | import java.util.concurrent.atomic.AtomicInteger;
|
|
161 | 165 | import org.apache.hadoop.hbase.wal.WALProvider;
|
162 | 166 | import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
163 | 167 | import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
| 168 | +import org.apache.hadoop.hbase.wal.WALSplitter; |
| 169 | +import org.apache.hadoop.metrics2.MetricsExecutor; |
164 | 170 | import org.junit.After;
|
165 | 171 | import org.junit.Assert;
|
166 | 172 | import org.junit.Before;
|
@@ -6281,6 +6287,45 @@ public void testBulkLoadReplicationEnabled() throws IOException {
|
6281 | 6287 | getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
|
6282 | 6288 | }
|
6283 | 6289 |
|
| 6290 | + // make sure region is success close when coprocessor wrong region open failed |
| 6291 | + @Test |
| 6292 | + public void testOpenRegionFailedMemoryLeak() throws Exception { |
| 6293 | + final ServerName serverName = ServerName.valueOf("testOpenRegionFailed", 100, 42); |
| 6294 | + final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); |
| 6295 | + |
| 6296 | + HTableDescriptor htd |
| 6297 | + = new HTableDescriptor(TableName.valueOf("testOpenRegionFailed")); |
| 6298 | + htd.addFamily(new HColumnDescriptor(fam1)); |
| 6299 | + htd.setValue("COPROCESSOR$1", "hdfs://test/test.jar|test||"); |
| 6300 | + |
| 6301 | + HRegionInfo hri = new HRegionInfo(htd.getTableName(), |
| 6302 | + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); |
| 6303 | + ScheduledExecutorService executor = CompatibilitySingletonFactory. |
| 6304 | + getInstance(MetricsExecutor.class).getExecutor(); |
| 6305 | + for (int i = 0; i < 20 ; i++) { |
| 6306 | + try { |
| 6307 | + HRegion.openHRegion(hri, htd, rss.getWAL(hri), |
| 6308 | + TEST_UTIL.getConfiguration(), rss, null); |
| 6309 | + }catch(Throwable t){ |
| 6310 | + LOG.info("Expected exception, continue"); |
| 6311 | + } |
| 6312 | + } |
| 6313 | + TimeUnit.SECONDS.sleep(MetricsRegionWrapperImpl.PERIOD); |
| 6314 | + Field[] fields = ThreadPoolExecutor.class.getDeclaredFields(); |
| 6315 | + boolean found = false; |
| 6316 | + for(Field field : fields){ |
| 6317 | + if(field.getName().equals("workQueue")){ |
| 6318 | + field.setAccessible(true); |
| 6319 | + BlockingQueue<Runnable> workQueue = (BlockingQueue<Runnable>)field.get(executor); |
| 6320 | + //there are still two task not cancel, can not cause to memory lack |
| 6321 | + Assert.assertTrue("ScheduledExecutor#workQueue should equals 2, now is " + |
| 6322 | + workQueue.size() + ", please check region is close", 2 == workQueue.size()); |
| 6323 | + found = true; |
| 6324 | + } |
| 6325 | + } |
| 6326 | + Assert.assertTrue("can not find workQueue, test failed", found); |
| 6327 | + } |
| 6328 | + |
6284 | 6329 | /**
|
6285 | 6330 | * The same as HRegion class, the only difference is that instantiateHStore will
|
6286 | 6331 | * create a different HStore - HStoreForTesting. [HBASE-8518]
|
|
0 commit comments