48
48
import java .util .concurrent .ExecutorService ;
49
49
import java .util .concurrent .Executors ;
50
50
import java .util .concurrent .Future ;
51
+ import java .util .concurrent .LinkedBlockingQueue ;
52
+ import java .util .concurrent .ThreadPoolExecutor ;
51
53
import java .util .concurrent .TimeUnit ;
52
54
import java .util .concurrent .TimeoutException ;
53
55
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -345,8 +347,12 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
345
347
346
348
protected final AtomicBoolean rollRequested = new AtomicBoolean (false );
347
349
348
- private final ExecutorService logArchiveOrShutdownExecutor = Executors .newSingleThreadExecutor (
349
- new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Archive-Or-Shutdown-%d" ).build ());
350
+ // Run in caller if we get reject execution exception, to avoid aborting region server when we get
351
+ // reject execution exception. Usually this should not happen but let's make it more robust.
352
+ private final ExecutorService logArchiveExecutor =
353
+ new ThreadPoolExecutor (1 , 1 , 1L , TimeUnit .MINUTES , new LinkedBlockingQueue <Runnable >(),
354
+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Archive-%d" ).build (),
355
+ new ThreadPoolExecutor .CallerRunsPolicy ());
350
356
351
357
private final int archiveRetries ;
352
358
@@ -770,7 +776,7 @@ private void cleanOldLogs() throws IOException {
770
776
final List <Pair <Path , Long >> localLogsToArchive = logsToArchive ;
771
777
// make it async
772
778
for (Pair <Path , Long > log : localLogsToArchive ) {
773
- logArchiveOrShutdownExecutor .execute (() -> {
779
+ logArchiveExecutor .execute (() -> {
774
780
archive (log );
775
781
});
776
782
this .walFile2Props .remove (log .getFirst ());
@@ -985,7 +991,10 @@ public void shutdown() throws IOException {
985
991
}
986
992
}
987
993
988
- Future <Void > future = logArchiveOrShutdownExecutor .submit (new Callable <Void >() {
994
+ ExecutorService shutdownExecutor = Executors .newSingleThreadExecutor (
995
+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("WAL-Shutdown-%d" ).build ());
996
+
997
+ Future <Void > future = shutdownExecutor .submit (new Callable <Void >() {
989
998
@ Override
990
999
public Void call () throws Exception {
991
1000
if (rollWriterLock .tryLock (walShutdownTimeout , TimeUnit .SECONDS )) {
@@ -1003,7 +1012,7 @@ public Void call() throws Exception {
1003
1012
return null ;
1004
1013
}
1005
1014
});
1006
- logArchiveOrShutdownExecutor .shutdown ();
1015
+ shutdownExecutor .shutdown ();
1007
1016
1008
1017
try {
1009
1018
future .get (walShutdownTimeout , TimeUnit .MILLISECONDS );
@@ -1020,6 +1029,12 @@ public Void call() throws Exception {
1020
1029
} else {
1021
1030
throw new IOException (e .getCause ());
1022
1031
}
1032
+ } finally {
1033
+ // in shutdown we may call cleanOldLogs so shutdown this executor in the end.
1034
+ // In sync replication implementation, we may shutdown a WAL without shutting down the whole
1035
+ // region server, if we shutdown this executor earlier we may get reject execution exception
1036
+ // and abort the region server
1037
+ logArchiveExecutor .shutdown ();
1023
1038
}
1024
1039
}
1025
1040
0 commit comments