@@ -217,6 +217,15 @@ public interface ProcedureExecutorListener {
217
217
*/
218
218
private TimeoutExecutorThread <TEnvironment > timeoutExecutor ;
219
219
220
+ /**
221
+ * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
222
+ * there is no worker to assign meta, it will new worker thread for it, so it is very important.
223
+ * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
224
+ * and so on, some tasks may execute for a long time so will block other tasks like
225
+ * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
226
+ */
227
+ private TimeoutExecutorThread <TEnvironment > workerMonitorExecutor ;
228
+
220
229
private int corePoolSize ;
221
230
private int maxPoolSize ;
222
231
@@ -560,7 +569,8 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
560
569
corePoolSize , maxPoolSize );
561
570
562
571
this .threadGroup = new ThreadGroup ("PEWorkerGroup" );
563
- this .timeoutExecutor = new TimeoutExecutorThread <>(this , threadGroup );
572
+ this .timeoutExecutor = new TimeoutExecutorThread <>(this , threadGroup , "ProcExecTimeout" );
573
+ this .workerMonitorExecutor = new TimeoutExecutorThread <>(this , threadGroup , "WorkerMonitor" );
564
574
565
575
// Create the workers
566
576
workerId .set (0 );
@@ -604,12 +614,13 @@ public void startWorkers() throws IOException {
604
614
// Start the executors. Here we must have the lastProcId set.
605
615
LOG .trace ("Start workers {}" , workerThreads .size ());
606
616
timeoutExecutor .start ();
617
+ workerMonitorExecutor .start ();
607
618
for (WorkerThread worker : workerThreads ) {
608
619
worker .start ();
609
620
}
610
621
611
622
// Internal chores
612
- timeoutExecutor .add (new WorkerMonitor ());
623
+ workerMonitorExecutor .add (new WorkerMonitor ());
613
624
614
625
// Add completed cleaner chore
615
626
addChore (new CompletedProcedureCleaner <>(conf , store , procExecutionLock , completed ,
@@ -624,6 +635,7 @@ public void stop() {
624
635
LOG .info ("Stopping" );
625
636
scheduler .stop ();
626
637
timeoutExecutor .sendStopSignal ();
638
+ workerMonitorExecutor .sendStopSignal ();
627
639
}
628
640
629
641
@ VisibleForTesting
@@ -632,6 +644,8 @@ public void join() {
632
644
633
645
// stop the timeout executor
634
646
timeoutExecutor .awaitTermination ();
647
+ // stop the work monitor executor
648
+ workerMonitorExecutor .awaitTermination ();
635
649
636
650
// stop the worker threads
637
651
for (WorkerThread worker : workerThreads ) {
0 commit comments