Skip to content

Commit

Permalink
add test to address ALLUXIO-2821 fix for concurrent insert and proces…
Browse files Browse the repository at this point in the history
…sing of ttlbucketlist
  • Loading branch information
lucyge2022 committed Feb 23, 2023
1 parent b018aee commit 2d5ce5e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import alluxio.AuthenticatedUserRule;
import alluxio.Constants;
import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus;
import alluxio.collections.ConcurrentHashSet;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.FileSystemMasterCommonPOptions;
import alluxio.grpc.SetAttributePOptions;
Expand All @@ -39,6 +41,7 @@
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -133,6 +136,7 @@ public void uncaughtException(Thread th, Throwable ex) {
errors.add(ex);
}
};

for (int i = 0; i < numFiles; i++) {
final int iteration = i;
Thread t = new Thread(new Runnable() {
Expand All @@ -141,6 +145,8 @@ public void run() {
try {
AuthenticatedClientUser.set(TEST_USER);
barrier.await();
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);

mFileSystem.setAttribute(paths[iteration], SetAttributePOptions.newBuilder()
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder()
.setTtl(ttls[iteration]).setTtlAction(TtlAction.DELETE))
Expand Down Expand Up @@ -173,4 +179,93 @@ private void assertErrorsSizeEquals(ConcurrentHashSet<Throwable> errors, int exp
expected, errors.size()) + Joiner.on("\n").join(errors));
}
}

@Test
public void testConcurrentInsertAndExpire() throws Exception {
/* (ALLUXIO-2821) When an inode is concurrently added to ttlbucket and inodettlchecker
has been processing this particular ttlbucket. The inode should not be left out forever
without being processed by inodettlchecker further. */
// Create two files
String fileNamePrefix = "file";
AlluxioURI fileUri1 = new AlluxioURI("/" + fileNamePrefix + "1");
AlluxioURI fileUri2 = new AlluxioURI("/" + fileNamePrefix + "2");
mFileSystem.createFile(fileUri1,
CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build());
mFileSystem.createFile(fileUri2,
CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build());
// Set ttl on file1.
SetAttributePOptions setTTlOptions = SetAttributePOptions.newBuilder().setRecursive(false)
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder()
.setTtl(TTL_INTERVAL_MS).setTtlAction(TtlAction.DELETE).build())
.build();
mFileSystem.setAttribute((fileUri1), setTTlOptions);

CommonUtils.sleepMs(4 * TTL_INTERVAL_MS);
// One thread to run InodeTtlChecker, file1 should be expired, another thread
// to set the ttl of file2 which with same ttl as file1, which is supposed to
// land in the bucket that's being processed by ttlchecker at the same time.
final CyclicBarrier barrier = new CyclicBarrier(2);
List<Thread> threads = new ArrayList<>(2);
final ConcurrentHashSet<Throwable> errors = new ConcurrentHashSet<>();
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
errors.add(ex);
}
};
Thread ttlCheckerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
AuthenticatedClientUser.set(TEST_USER);
barrier.await();
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
ttlCheckerThread.setUncaughtExceptionHandler(exceptionHandler);
threads.add(ttlCheckerThread);

Thread setTtlFile2Thread = new Thread(new Runnable() {
@Override
public void run() {
try {
AuthenticatedClientUser.set(TEST_USER);
barrier.await();
SetAttributePOptions setTTlOptions = SetAttributePOptions.newBuilder().setRecursive(false)
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder()
.setTtl(TTL_INTERVAL_MS).setTtlAction(TtlAction.DELETE).build())
.build();
mFileSystem.setAttribute(fileUri2, setTTlOptions);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
setTtlFile2Thread.setUncaughtExceptionHandler(exceptionHandler);
threads.add(setTtlFile2Thread);
Collections.shuffle(threads);
long startMs = CommonUtils.getCurrentMs();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
// Now file2 inode should either be in ttlbucket or it is cleaned up as part of
// the ttlchecker processing
List<URIStatus> fileStatus = mFileSystem.listStatus(new AlluxioURI("/"));
assert(!fileStatus.stream().anyMatch(status -> new AlluxioURI(status.getFileInfo().getPath())
.equals(fileUri1)));
if (fileStatus.stream().anyMatch(status -> new AlluxioURI(status.getFileInfo().getPath())
.equals(fileUri2))) {
// The inode is not being processed during concurrent insertion into ttlbucket
assert (fileStatus.get(0).getFileInfo().getTtl() == TTL_INTERVAL_MS);
// Now run ttl checker again, it should be gone.
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);
Assert.assertEquals("There are remaining file existing with expired TTLs",
0, mFileSystem.listStatus(new AlluxioURI("/")).size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void basicRenameTest7() throws Exception {
// Due to Hadoop 1 support we stick with the deprecated version. If we drop support for it
// FSDataOutputStream.hflush will be the new one.
//#ifdef HADOOP1
o.sync();
// o.sync();
//#else
o.hflush();
//#endif
Expand Down

0 comments on commit 2d5ce5e

Please sign in to comment.