Skip to content

Commit c1dacfd

Browse files
authored
HBASE-25547 (addendum): Roll ExecutorType into ExecutorConfig (#2996)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent 53128fe commit c1dacfd

File tree

8 files changed

+81
-78
lines changed

8 files changed

+81
-78
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -127,25 +127,14 @@ public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
127127
return getExecutor(type).getThreadPoolExecutor();
128128
}
129129

130-
public void startExecutorService(final ExecutorType type, final ExecutorConfig config) {
131-
String name = type.getExecutorName(this.servername);
132-
if (isExecutorServiceRunning(name)) {
133-
LOG.debug("Executor service {} already running on {}", this,
134-
this.servername);
135-
return;
136-
}
137-
startExecutorService(config.setName(name));
138-
}
139-
140130
/**
141131
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
142132
* paths should use this method to get the executor, should not start executor by using
143133
* {@link ExecutorService#startExecutorService(ExecutorConfig)}
144134
*/
145-
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) {
146-
String name = type.getExecutorName(this.servername);
147-
return executorMap.computeIfAbsent(name, (executorName) ->
148-
new Executor(config.setName(name))).getThreadPoolExecutor();
135+
public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) {
136+
return executorMap.computeIfAbsent(config.getName(), (executorName) ->
137+
new Executor(config)).getThreadPoolExecutor();
149138
}
150139

151140
public void submit(final EventHandler eh) {
@@ -184,15 +173,24 @@ public Map<String, ExecutorStatus> getAllExecutorStatuses() {
184173
/**
185174
* Configuration wrapper for {@link Executor}.
186175
*/
187-
public static class ExecutorConfig {
176+
public class ExecutorConfig {
188177
// Refer to ThreadPoolExecutor javadoc for details of these configuration.
189178
// Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
190179
// implementation.
191180
public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
192181
private int corePoolSize = -1;
193182
private boolean allowCoreThreadTimeout = false;
194183
private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
195-
private String name;
184+
private ExecutorType executorType;
185+
186+
public ExecutorConfig setExecutorType(ExecutorType type) {
187+
this.executorType = type;
188+
return this;
189+
}
190+
191+
private ExecutorType getExecutorType() {
192+
return Preconditions.checkNotNull(executorType, "ExecutorType not set.");
193+
}
196194

197195
public int getCorePoolSize() {
198196
return corePoolSize;
@@ -217,13 +215,11 @@ public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout)
217215
return this;
218216
}
219217

218+
/**
219+
* @return the executor name inferred from the type and the servername on which this is running.
220+
*/
220221
public String getName() {
221-
return Preconditions.checkNotNull(name);
222-
}
223-
224-
public ExecutorConfig setName(String name) {
225-
this.name = name;
226-
return this;
222+
return getExecutorType().getExecutorName(servername);
227223
}
228224

229225
public long getKeepAliveTimeMillis() {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,42 +1313,43 @@ private void startServiceThreads() throws IOException {
13131313
// Start the executor service pools
13141314
final int masterOpenRegionPoolSize = conf.getInt(
13151315
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
1316-
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1317-
new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize));
1316+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1317+
ExecutorType.MASTER_OPEN_REGION).setCorePoolSize(masterOpenRegionPoolSize));
13181318
final int masterCloseRegionPoolSize = conf.getInt(
13191319
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
1320-
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1321-
new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize));
1320+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1321+
ExecutorType.MASTER_CLOSE_REGION).setCorePoolSize(masterCloseRegionPoolSize));
13221322
final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
13231323
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
1324-
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1325-
new ExecutorConfig().setCorePoolSize(masterServerOpThreads));
1324+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1325+
ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(masterServerOpThreads));
13261326
final int masterServerMetaOpsThreads = conf.getInt(
13271327
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
13281328
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
1329-
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1330-
new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads));
1329+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1330+
ExecutorType.MASTER_META_SERVER_OPERATIONS).setCorePoolSize(masterServerMetaOpsThreads));
13311331
final int masterLogReplayThreads = conf.getInt(
13321332
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
1333-
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1334-
new ExecutorConfig().setCorePoolSize(masterLogReplayThreads));
1333+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1334+
ExecutorType.M_LOG_REPLAY_OPS).setCorePoolSize(masterLogReplayThreads));
13351335
final int masterSnapshotThreads = conf.getInt(
13361336
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
1337-
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
1338-
new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
1337+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1338+
ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(masterSnapshotThreads)
1339+
.setAllowCoreThreadTimeout(true));
13391340
final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
13401341
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
1341-
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS,
1342-
new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads)
1343-
.setAllowCoreThreadTimeout(true));
1342+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1343+
ExecutorType.MASTER_MERGE_OPERATIONS).setCorePoolSize(masterMergeDispatchThreads)
1344+
.setAllowCoreThreadTimeout(true));
13441345

13451346
// We depend on there being only one instance of this executor running
13461347
// at a time. To do concurrency, would need fencing of enable/disable of
13471348
// tables.
13481349
// Any time changing this maxThreads to > 1, pls see the comment at
13491350
// AccessController#postCompletedCreateTableAction
1350-
this.executorService.startExecutorService(
1351-
ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
1351+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1352+
ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
13521353
startProcedureExecutor();
13531354

13541355
// Create cleaner thread pool

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,57 +2042,59 @@ private void startServices() throws IOException {
20422042

20432043
// Start executor services
20442044
final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
2045-
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
2046-
new ExecutorConfig().setCorePoolSize(openRegionThreads));
2045+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2046+
ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
20472047
final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
2048-
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
2049-
new ExecutorConfig().setCorePoolSize(openMetaThreads));
2048+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2049+
ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
20502050
final int openPriorityRegionThreads =
20512051
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
2052-
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
2053-
new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads));
2052+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2053+
ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads));
20542054
final int closeRegionThreads =
20552055
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
2056-
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
2057-
new ExecutorConfig().setCorePoolSize(closeRegionThreads));
2056+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2057+
ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
20582058
final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
2059-
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
2060-
new ExecutorConfig().setCorePoolSize(closeMetaThreads));
2059+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2060+
ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
20612061
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
20622062
final int storeScannerParallelSeekThreads =
20632063
conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
2064-
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
2065-
new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads)
2066-
.setAllowCoreThreadTimeout(true));
2064+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2065+
ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads)
2066+
.setAllowCoreThreadTimeout(true));
20672067
}
20682068
final int logReplayOpsThreads = conf.getInt(
20692069
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
2070-
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
2071-
new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
2070+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2071+
ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads)
2072+
.setAllowCoreThreadTimeout(true));
20722073
// Start the threads for compacted files discharger
20732074
final int compactionDischargerThreads =
20742075
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
2075-
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
2076-
new ExecutorConfig().setCorePoolSize(compactionDischargerThreads));
2076+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2077+
ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads));
20772078
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
20782079
final int regionReplicaFlushThreads = conf.getInt(
20792080
"hbase.regionserver.region.replica.flusher.threads", conf.getInt(
20802081
"hbase.regionserver.executor.openregion.threads", 3));
2081-
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
2082-
new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads));
2082+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2083+
ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads));
20832084
}
20842085
final int refreshPeerThreads =
20852086
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
2086-
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
2087-
new ExecutorConfig().setCorePoolSize(refreshPeerThreads));
2087+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2088+
ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
20882089
final int replaySyncReplicationWALThreads =
20892090
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
2090-
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
2091-
new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads));
2091+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2092+
ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL).setCorePoolSize(
2093+
replaySyncReplicationWALThreads));
20922094
final int switchRpcThrottleThreads =
20932095
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
2094-
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
2095-
new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads));
2096+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
2097+
ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
20962098

20972099
Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
20982100
uncaughtExceptionHandler);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ThreadPoolExecutor;
2323
import java.util.concurrent.TimeUnit;
2424
import org.apache.hadoop.hbase.client.RegionInfo;
25+
import org.apache.hadoop.hbase.executor.ExecutorService;
2526
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
2627
import org.apache.hadoop.hbase.executor.ExecutorType;
2728
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
@@ -97,9 +98,10 @@ private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest(
9798

9899
ThreadPoolExecutor getInMemoryCompactionPool() {
99100
if (rsServices != null) {
100-
ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize);
101-
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
102-
config);
101+
ExecutorService executorService = rsServices.getExecutorService();
102+
ExecutorConfig config = executorService.new ExecutorConfig().setExecutorType(
103+
ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize);
104+
return executorService.getExecutorLazily(config);
103105
} else {
104106
// this could only happen in tests
105107
return getInMemoryCompactionPoolForTest();

hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public void testMetricsCollect() throws Exception {
5959

6060
// Start an executor service pool with max 5 threads
6161
ExecutorService executorService = new ExecutorService("unit_test");
62-
executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
63-
new ExecutorConfig().setCorePoolSize(maxThreads));
62+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
63+
ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(maxThreads));
6464

6565
MetricsRegionServerSource serverSource = CompatibilitySingletonFactory
6666
.getInstance(MetricsRegionServerSourceFactory.class).createServer(null);

hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public void testExecutorService() throws Exception {
7171

7272
// Start an executor service pool with max 5 threads
7373
ExecutorService executorService = new ExecutorService("unit_test");
74-
executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
75-
new ExecutorConfig().setCorePoolSize(maxThreads));
74+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
75+
ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads));
7676

7777
Executor executor =
7878
executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
@@ -197,8 +197,8 @@ public void testAborting() throws Exception {
197197
when(server.getConfiguration()).thenReturn(conf);
198198

199199
ExecutorService executorService = new ExecutorService("unit_test");
200-
executorService.startExecutorService(
201-
ExecutorType.MASTER_SERVER_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
200+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
201+
ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1));
202202

203203

204204
executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
@@ -230,8 +230,8 @@ public void testSnapshotHandlers() throws Exception {
230230
when(server.getConfiguration()).thenReturn(conf);
231231

232232
ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
233-
executorService.startExecutorService(
234-
ExecutorType.MASTER_SNAPSHOT_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
233+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
234+
ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1));
235235

236236
CountDownLatch latch = new CountDownLatch(1);
237237
CountDownLatch waitForEventToStart = new CountDownLatch(1);

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
6464
import org.apache.hadoop.hbase.executor.ExecutorService;
6565
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
66+
import org.apache.hadoop.hbase.executor.ExecutorType;
6667
import org.apache.hadoop.hbase.io.hfile.HFile;
6768
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
6869
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
@@ -193,7 +194,8 @@ public void setUp() throws Exception {
193194
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
194195
.toString();
195196
ExecutorService es = new ExecutorService(string);
196-
es.startExecutorService(new ExecutorConfig().setCorePoolSize(1).setName(string + "-" + string));
197+
es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1).setExecutorType(
198+
ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
197199
when(rss.getExecutorService()).thenReturn(es);
198200
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
199201
primaryRegion.close();

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ public void setup() throws Exception {
212212

213213
SplitLogCounters.resetCounters();
214214
executorService = new ExecutorService("TestSplitLogWorker");
215-
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
216-
new ExecutorConfig().setCorePoolSize(10));
215+
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
216+
ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
217217
}
218218

219219
@After

0 commit comments

Comments
 (0)