|
37 | 37 | import java.util.Map;
|
38 | 38 | import java.util.OptionalLong;
|
39 | 39 | import java.util.Set;
|
| 40 | +import java.util.concurrent.Callable; |
40 | 41 | import java.util.concurrent.ConcurrentHashMap;
|
41 | 42 | import java.util.concurrent.ConcurrentNavigableMap;
|
42 | 43 | import java.util.concurrent.ConcurrentSkipListMap;
|
43 | 44 | import java.util.concurrent.CopyOnWriteArrayList;
|
44 | 45 | import java.util.concurrent.ExecutionException;
|
45 | 46 | import java.util.concurrent.ExecutorService;
|
46 | 47 | import java.util.concurrent.Executors;
|
| 48 | +import java.util.concurrent.Future; |
47 | 49 | import java.util.concurrent.TimeUnit;
|
| 50 | +import java.util.concurrent.TimeoutException; |
48 | 51 | import java.util.concurrent.atomic.AtomicBoolean;
|
49 | 52 | import java.util.concurrent.atomic.AtomicInteger;
|
50 | 53 | import java.util.concurrent.atomic.AtomicLong;
|
@@ -142,6 +145,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
142 | 145 | public static final String RING_BUFFER_SLOT_COUNT =
|
143 | 146 | "hbase.regionserver.wal.disruptor.event.count";
|
144 | 147 |
|
| 148 | + public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; |
| 149 | + public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; |
| 150 | + |
145 | 151 | /**
|
146 | 152 | * file system instance
|
147 | 153 | */
|
@@ -269,6 +275,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
269 | 275 | protected volatile boolean closed = false;
|
270 | 276 |
|
271 | 277 | protected final AtomicBoolean shutdown = new AtomicBoolean(false);
|
| 278 | + |
| 279 | + protected final long walShutdownTimeout; |
| 280 | + |
272 | 281 | /**
|
273 | 282 | * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
|
274 | 283 | * an IllegalArgumentException if used to compare paths from different wals.
|
@@ -320,8 +329,8 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
|
320 | 329 |
|
321 | 330 | protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
322 | 331 |
|
323 |
| - private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( |
324 |
| - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build()); |
| 332 | + private final ExecutorService logArchiveOrShutdownExecutor = Executors.newSingleThreadExecutor( |
| 333 | + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build()); |
325 | 334 |
|
326 | 335 | private final int archiveRetries;
|
327 | 336 |
|
@@ -478,7 +487,9 @@ public boolean accept(final Path fileName) {
|
478 | 487 | this.syncFutureCache = new SyncFutureCache(conf);
|
479 | 488 | this.implClassName = getClass().getSimpleName();
|
480 | 489 | this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
|
481 |
| - archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0); |
| 490 | + archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); |
| 491 | + this.walShutdownTimeout = |
| 492 | + conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); |
482 | 493 | }
|
483 | 494 |
|
484 | 495 | /**
|
@@ -685,7 +696,7 @@ private void cleanOldLogs() throws IOException {
|
685 | 696 | final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
|
686 | 697 | // make it async
|
687 | 698 | for (Pair<Path, Long> log : localLogsToArchive) {
|
688 |
| - logArchiveExecutor.execute(() -> { |
| 699 | + logArchiveOrShutdownExecutor.execute(() -> { |
689 | 700 | archive(log);
|
690 | 701 | });
|
691 | 702 | this.walFile2Props.remove(log.getFirst());
|
@@ -891,17 +902,42 @@ public void shutdown() throws IOException {
|
891 | 902 | i.logCloseRequested();
|
892 | 903 | }
|
893 | 904 | }
|
894 |
| - rollWriterLock.lock(); |
895 |
| - try { |
896 |
| - doShutdown(); |
897 |
| - if (syncFutureCache != null) { |
898 |
| - syncFutureCache.clear(); |
| 905 | + |
| 906 | + Future<Void> future = logArchiveOrShutdownExecutor.submit(new Callable<Void>() { |
| 907 | + @Override |
| 908 | + public Void call() throws Exception { |
| 909 | + if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { |
| 910 | + try { |
| 911 | + doShutdown(); |
| 912 | + if (syncFutureCache != null) { |
| 913 | + syncFutureCache.clear(); |
| 914 | + } |
| 915 | + } finally { |
| 916 | + rollWriterLock.unlock(); |
| 917 | + } |
| 918 | + } else { |
| 919 | + throw new IOException("Waiting for rollWriterLock timeout"); |
| 920 | + } |
| 921 | + return null; |
899 | 922 | }
|
900 |
| - if (logArchiveExecutor != null) { |
901 |
| - logArchiveExecutor.shutdownNow(); |
| 923 | + }); |
| 924 | + logArchiveOrShutdownExecutor.shutdown(); |
| 925 | + |
| 926 | + try { |
| 927 | + future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); |
| 928 | + } catch (InterruptedException e) { |
| 929 | + throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); |
| 930 | + } catch (TimeoutException e) { |
| 931 | + throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" |
| 932 | + + " the shutdown of WAL doesn't complete! Please check the status of underlying " |
| 933 | + + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS |
| 934 | + + "\"", e); |
| 935 | + } catch (ExecutionException e) { |
| 936 | + if (e.getCause() instanceof IOException) { |
| 937 | + throw (IOException) e.getCause(); |
| 938 | + } else { |
| 939 | + throw new IOException(e.getCause()); |
902 | 940 | }
|
903 |
| - } finally { |
904 |
| - rollWriterLock.unlock(); |
905 | 941 | }
|
906 | 942 | }
|
907 | 943 |
|
|
0 commit comments