46
46
import java .util .concurrent .ExecutorService ;
47
47
import java .util .concurrent .Executors ;
48
48
import java .util .concurrent .Future ;
49
+ import java .util .concurrent .LinkedBlockingQueue ;
50
+ import java .util .concurrent .ThreadPoolExecutor ;
49
51
import java .util .concurrent .TimeUnit ;
50
52
import java .util .concurrent .TimeoutException ;
51
53
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -329,8 +331,12 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
329
331
330
332
protected final AtomicBoolean rollRequested = new AtomicBoolean (false );
331
333
332
- private final ExecutorService logArchiveOrShutdownExecutor = Executors .newSingleThreadExecutor (
333
- new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Archive-Or-Shutdown-%d" ).build ());
334
+ // Run in caller if we get reject execution exception, to avoid aborting region server when we get
335
+ // reject execution exception. Usually this should not happen but let's make it more robust.
336
+ private final ExecutorService logArchiveExecutor =
337
+ new ThreadPoolExecutor (1 , 1 , 1L , TimeUnit .MINUTES , new LinkedBlockingQueue <Runnable >(),
338
+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Archive-%d" ).build (),
339
+ new ThreadPoolExecutor .CallerRunsPolicy ());
334
340
335
341
private final int archiveRetries ;
336
342
@@ -696,7 +702,7 @@ private void cleanOldLogs() throws IOException {
696
702
final List <Pair <Path , Long >> localLogsToArchive = logsToArchive ;
697
703
// make it async
698
704
for (Pair <Path , Long > log : localLogsToArchive ) {
699
- logArchiveOrShutdownExecutor .execute (() -> {
705
+ logArchiveExecutor .execute (() -> {
700
706
archive (log );
701
707
});
702
708
this .walFile2Props .remove (log .getFirst ());
@@ -903,7 +909,10 @@ public void shutdown() throws IOException {
903
909
}
904
910
}
905
911
906
- Future <Void > future = logArchiveOrShutdownExecutor .submit (new Callable <Void >() {
912
+ ExecutorService shutdownExecutor = Executors .newSingleThreadExecutor (
913
+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Shutdown-%d" ).build ());
914
+
915
+ Future <Void > future = shutdownExecutor .submit (new Callable <Void >() {
907
916
@ Override
908
917
public Void call () throws Exception {
909
918
if (rollWriterLock .tryLock (walShutdownTimeout , TimeUnit .SECONDS )) {
@@ -921,7 +930,7 @@ public Void call() throws Exception {
921
930
return null ;
922
931
}
923
932
});
924
- logArchiveOrShutdownExecutor .shutdown ();
933
+ shutdownExecutor .shutdown ();
925
934
926
935
try {
927
936
future .get (walShutdownTimeout , TimeUnit .MILLISECONDS );
@@ -938,6 +947,12 @@ public Void call() throws Exception {
938
947
} else {
939
948
throw new IOException (e .getCause ());
940
949
}
950
+ } finally {
951
+ // in shutdown we may call cleanOldLogs so shutdown this executor in the end.
952
+ // In sync replication implementation, we may shutdown a WAL without shutting down the whole
953
+ // region server, if we shutdown this executor earlier we may get reject execution exception
954
+ // and abort the region server
955
+ logArchiveExecutor .shutdown ();
941
956
}
942
957
}
943
958
0 commit comments