Skip to content

Commit 085feaf

Browse files
openinxJenkins
authored andcommitted
HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (apache#486)
(cherry picked from commit b4734e7) Change-Id: Id5c952da1949f7039d6caa5329c6599ca54fbba4
1 parent 3c8bdd4 commit 085feaf

File tree

5 files changed

+80
-19
lines changed

5 files changed

+80
-19
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,6 +1456,33 @@ public enum OperationStatusCode {
14561456
"hbase.util.default.lossycounting.errorrate";
14571457
public static final String NOT_IMPLEMENTED = "Not implemented";
14581458

1459+
/**
1460+
* Configurations for master executor services.
1461+
*/
1462+
public static final String MASTER_OPEN_REGION_THREADS =
1463+
"hbase.master.executor.openregion.threads";
1464+
public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5;
1465+
1466+
public static final String MASTER_CLOSE_REGION_THREADS =
1467+
"hbase.master.executor.closeregion.threads";
1468+
public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5;
1469+
1470+
public static final String MASTER_SERVER_OPERATIONS_THREADS =
1471+
"hbase.master.executor.serverops.threads";
1472+
public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
1473+
1474+
public static final String MASTER_META_SERVER_OPERATIONS_THREADS =
1475+
"hbase.master.executor.meta.serverops.threads";
1476+
public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
1477+
1478+
public static final String MASTER_LOG_REPLAY_OPS_THREADS =
1479+
"hbase.master.executor.logreplayops.threads";
1480+
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
1481+
1482+
public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS =
1483+
"hbase.master.executor.snapshot.threads";
1484+
public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3;
1485+
14591486
private HConstants() {
14601487
// Can't be instantiated with this ctor.
14611488
}

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,13 @@ public enum EventType {
206206
* C_M_SNAPSHOT_TABLE<br>
207207
* Client asking Master to snapshot an offline table.
208208
*/
209-
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS),
209+
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
210210
/**
211211
* Messages originating from Client to Master.<br>
212212
* C_M_RESTORE_SNAPSHOT<br>
213213
* Client asking Master to restore a snapshot.
214214
*/
215-
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS),
215+
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
216216

217217
// Updates from master to ZK. This is done by the master and there is
218218
// nothing to process by either Master or RS
@@ -314,11 +314,6 @@ public static EventType get(final int code) {
314314
throw new IllegalArgumentException("Unknown code " + code);
315315
}
316316

317-
public boolean isOnlineSchemaChangeSupported() {
318-
return this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) ||
319-
this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_TABLE);
320-
}
321-
322317
ExecutorType getExecutorServiceType() {
323318
return this.executor;
324319
}

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public enum ExecutorType {
3434
MASTER_RS_SHUTDOWN (5),
3535
MASTER_META_SERVER_OPERATIONS (6),
3636
M_LOG_REPLAY_OPS (7),
37+
MASTER_SNAPSHOT_OPERATIONS (8),
3738

3839
// RegionServer executor services
3940
RS_OPEN_REGION (20),

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,18 +1392,23 @@ public TableStateManager getTableStateManager() {
13921392
* as OOMEs; it should be lightly loaded. See what HRegionServer does if
13931393
* need to install an unexpected exception handler.
13941394
*/
1395-
private void startServiceThreads() throws IOException{
1396-
// Start the executor service pools
1397-
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1398-
conf.getInt("hbase.master.executor.openregion.threads", 5));
1399-
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1400-
conf.getInt("hbase.master.executor.closeregion.threads", 5));
1401-
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1402-
conf.getInt("hbase.master.executor.serverops.threads", 5));
1403-
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1404-
conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
1405-
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1406-
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1395+
private void startServiceThreads() throws IOException {
1396+
// Start the executor service pools
1397+
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
1398+
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
1399+
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
1400+
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
1401+
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1402+
conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
1403+
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
1404+
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1405+
conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
1406+
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
1407+
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
1408+
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
1409+
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
1410+
conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS,
1411+
HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT));
14071412

14081413
// We depend on there being only one instance of this executor running
14091414
// at a time. To do concurrency, would need fencing of enable/disable of

hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.io.IOException;
3030
import java.io.StringWriter;
31+
import java.util.concurrent.CountDownLatch;
3132
import java.util.concurrent.ThreadPoolExecutor;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +42,7 @@
4142
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
4243
import org.apache.hadoop.hbase.testclassification.MiscTests;
4344
import org.apache.hadoop.hbase.testclassification.SmallTests;
45+
import org.junit.Assert;
4446
import org.junit.ClassRule;
4547
import org.junit.Test;
4648
import org.junit.experimental.categories.Category;
@@ -219,5 +221,36 @@ public boolean evaluate() throws Exception {
219221
executorService.shutdown();
220222
}
221223

224+
@Test
225+
public void testSnapshotHandlers() throws Exception {
226+
final Configuration conf = HBaseConfiguration.create();
227+
final Server server = mock(Server.class);
228+
when(server.getConfiguration()).thenReturn(conf);
229+
230+
ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
231+
executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);
232+
233+
CountDownLatch latch = new CountDownLatch(1);
234+
executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
235+
@Override
236+
public void process() throws IOException {
237+
try {
238+
latch.await();
239+
} catch (InterruptedException e) {
240+
Thread.currentThread().interrupt();
241+
}
242+
}
243+
});
244+
245+
int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
246+
.getThreadPoolExecutor().getActiveCount();
247+
Assert.assertEquals(activeCount, 1);
248+
latch.countDown();
249+
Waiter.waitFor(conf, 3000, () -> {
250+
int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
251+
.getThreadPoolExecutor().getActiveCount();
252+
return count == 0;
253+
});
254+
}
222255
}
223256

0 commit comments

Comments
 (0)