Skip to content

Commit

Permalink
HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/rest…
Browse files Browse the repository at this point in the history
…oring snapshot (#486)
  • Loading branch information
openinx committed Aug 15, 2019
1 parent 1500d9a commit b97621b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ public enum EventType {
* C_M_SNAPSHOT_TABLE<br>
* Client asking Master to snapshot an offline table.
*/
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS),
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
/**
* Messages originating from Client to Master.<br>
* C_M_RESTORE_SNAPSHOT<br>
* Client asking Master to restore a snapshot.
*/
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS),
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),

// Updates from master to ZK. This is done by the master and there is
// nothing to process by either Master or RS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum ExecutorType {
MASTER_RS_SHUTDOWN (5),
MASTER_META_SERVER_OPERATIONS (6),
M_LOG_REPLAY_OPS (7),
MASTER_SNAPSHOT_OPERATIONS (8),

// RegionServer executor services
RS_OPEN_REGION (20),
Expand Down
27 changes: 27 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,33 @@ public static enum Modify {
public static final String DEFAULT_TEMPORARY_HDFS_DIRECTORY = "/user/"
+ System.getProperty("user.name") + "/hbase-staging";

/**
* Configurations for master executor services.
*/
public static final String MASTER_OPEN_REGION_THREADS =
"hbase.master.executor.openregion.threads";
public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5;

public static final String MASTER_CLOSE_REGION_THREADS =
"hbase.master.executor.closeregion.threads";
public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5;

public static final String MASTER_SERVER_OPERATIONS_THREADS =
"hbase.master.executor.serverops.threads";
public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5;

public static final String MASTER_META_SERVER_OPERATIONS_THREADS =
"hbase.master.executor.meta.serverops.threads";
public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5;

public static final String MASTER_LOG_REPLAY_OPS_THREADS =
"hbase.master.executor.logreplayops.threads";
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;

public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS =
"hbase.master.executor.snapshot.threads";
public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,18 +1159,23 @@ public TableNamespaceManager getTableNamespaceManager() {
* as OOMEs; it should be lightly loaded. See what HRegionServer does if
* need to install an unexpected exception handler.
*/
private void startServiceThreads() throws IOException{
// Start the executor service pools
this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
conf.getInt("hbase.master.executor.openregion.threads", 5));
this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
private void startServiceThreads() throws IOException {
// Start the executor service pools
this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
this.service.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS,
HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT));

// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.*;
import java.io.IOException;
import java.io.StringWriter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -205,5 +207,39 @@ public boolean evaluate() throws Exception {
executorService.shutdown();
}

@Test
public void testSnapshotHandlers() throws Exception {
final Configuration conf = HBaseConfiguration.create();
final Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);

final ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);

final CountDownLatch latch = new CountDownLatch(1);
executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
@Override
public void process() throws IOException {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
.getThreadPoolExecutor().getActiveCount();
Assert.assertEquals(activeCount, 1);
latch.countDown();
Waiter.waitFor(conf, 3000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
.getThreadPoolExecutor().getActiveCount();
return count == 0;
}
});
}
}

0 comments on commit b97621b

Please sign in to comment.