diff --git a/tests/src/test/java/alluxio/client/fs/concurrent/ConcurrentFileSystemMasterSetTtlIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/concurrent/ConcurrentFileSystemMasterSetTtlIntegrationTest.java index b1e791f61fcc..382195a047a5 100644 --- a/tests/src/test/java/alluxio/client/fs/concurrent/ConcurrentFileSystemMasterSetTtlIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/concurrent/ConcurrentFileSystemMasterSetTtlIntegrationTest.java @@ -18,6 +18,7 @@ import alluxio.collections.ConcurrentHashSet; import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; +import alluxio.exception.AlluxioException; import alluxio.grpc.CreateFilePOptions; import alluxio.grpc.FileSystemMasterCommonPOptions; import alluxio.grpc.SetAttributePOptions; @@ -39,6 +40,7 @@ import org.junit.Rule; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -133,6 +135,7 @@ public void uncaughtException(Thread th, Throwable ex) { errors.add(ex); } }; + for (int i = 0; i < numFiles; i++) { final int iteration = i; Thread t = new Thread(new Runnable() { @@ -141,6 +144,8 @@ public void run() { try { AuthenticatedClientUser.set(TEST_USER); barrier.await(); + HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK); + mFileSystem.setAttribute(paths[iteration], SetAttributePOptions.newBuilder() .setCommonOptions(FileSystemMasterCommonPOptions.newBuilder() .setTtl(ttls[iteration]).setTtlAction(TtlAction.DELETE)) @@ -173,4 +178,79 @@ private void assertErrorsSizeEquals(ConcurrentHashSet errors, int exp expected, errors.size()) + Joiner.on("\n").join(errors)); } } + + @Test + public void testConcurrentInsertAndExpire() throws Exception { + /* (ALLUXIO-2821) When an inode is concurrently added to ttlbucket and inodettlchecker + has been processing this particular ttlbucket. The inode should not be left out forever + without being processed by inodettlchecker further. */ + // Create two files + String fileNamePrefix = "file"; + AlluxioURI fileUri1 = new AlluxioURI(fileNamePrefix + "1"); + AlluxioURI fileUri2 = new AlluxioURI(fileNamePrefix + "2"); + mFileSystem.createFile(fileUri1, + CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build()); + mFileSystem.createFile(fileUri2, + CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build()); + // Set ttl on file1. + SetAttributePOptions setTTlOptions = SetAttributePOptions.newBuilder().setRecursive(false) + .setCommonOptions(FileSystemMasterCommonPOptions.newBuilder() + .setTtl(TTL_INTERVAL_MS).setTtlAction(TtlAction.DELETE).build()) + .build(); + mFileSystem.setAttribute((fileUri1), setTTlOptions); + + CommonUtils.sleepMs(4 * TTL_INTERVAL_MS); + // One thread to run InodeTtlChecker, file1 should be expired, another thread + // to set the ttl of file2 which with same ttl as file1, which is supposed to + // land in the bucket that's being processed by ttlchecker at the same time. + final CyclicBarrier barrier = new CyclicBarrier(2); + List threads = new ArrayList<>(2); + final ConcurrentHashSet errors = new ConcurrentHashSet<>(); + Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + errors.add(ex); + } + }; + Thread ttlCheckerThread = new Thread(new Runnable() { + @Override + public void run() { + try { + AuthenticatedClientUser.set(TEST_USER); + barrier.await(); + HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + ttlCheckerThread.setUncaughtExceptionHandler(exceptionHandler); + threads.add(ttlCheckerThread); + + Thread setTtlFile2Thread = new Thread(new Runnable() { + @Override + public void run() { + try { + AuthenticatedClientUser.set(TEST_USER); + barrier.await(); + SetAttributePOptions setTTlOptions = SetAttributePOptions.newBuilder().setRecursive(false) + .setCommonOptions(FileSystemMasterCommonPOptions.newBuilder() + .setTtl(TTL_INTERVAL_MS).setTtlAction(TtlAction.DELETE).build()) + .build(); + mFileSystem.setAttribute(fileUri2, setTTlOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + setTtlFile2Thread.setUncaughtExceptionHandler(exceptionHandler); + threads.add(setTtlFile2Thread); + Collections.shuffle(threads); + long startMs = CommonUtils.getCurrentMs(); + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + } }