Skip to content

Commit

Permalink
WIP - concurrent testing on new changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lucyge2022 committed Feb 23, 2023
1 parent b018aee commit 2a731f6
Showing 1 changed file with 80 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import alluxio.collections.ConcurrentHashSet;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.FileSystemMasterCommonPOptions;
import alluxio.grpc.SetAttributePOptions;
Expand All @@ -39,6 +40,7 @@
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -133,6 +135,7 @@ public void uncaughtException(Thread th, Throwable ex) {
errors.add(ex);
}
};

for (int i = 0; i < numFiles; i++) {
final int iteration = i;
Thread t = new Thread(new Runnable() {
Expand All @@ -141,6 +144,8 @@ public void run() {
try {
AuthenticatedClientUser.set(TEST_USER);
barrier.await();
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);

mFileSystem.setAttribute(paths[iteration], SetAttributePOptions.newBuilder()
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder()
.setTtl(ttls[iteration]).setTtlAction(TtlAction.DELETE))
Expand Down Expand Up @@ -173,4 +178,79 @@ private void assertErrorsSizeEquals(ConcurrentHashSet<Throwable> errors, int exp
expected, errors.size()) + Joiner.on("\n").join(errors));
}
}

@Test
public void testConcurrentInsertAndExpire() throws Exception {
/* (ALLUXIO-2821) When an inode is concurrently added to ttlbucket and inodettlchecker
has been processing this particular ttlbucket. The inode should not be left out forever
without being processed by inodettlchecker further. */
// Create two files
String fileNamePrefix = "file";
AlluxioURI fileUri1 = new AlluxioURI(fileNamePrefix + "1");
AlluxioURI fileUri2 = new AlluxioURI(fileNamePrefix + "2");
mFileSystem.createFile(fileUri1,
CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build());
mFileSystem.createFile(fileUri2,
CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build());
// Set ttl on file1.
SetAttributePOptions setTTlOptions = SetAttributePOptions.newBuilder().setRecursive(false)
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder()
.setTtl(TTL_INTERVAL_MS).setTtlAction(TtlAction.DELETE).build())
.build();
mFileSystem.setAttribute((fileUri1), setTTlOptions);

CommonUtils.sleepMs(4 * TTL_INTERVAL_MS);
// One thread to run InodeTtlChecker, file1 should be expired, another thread
// to set the ttl of file2 which with same ttl as file1, which is supposed to
// land in the bucket that's being processed by ttlchecker at the same time.
final CyclicBarrier barrier = new CyclicBarrier(2);
List<Thread> threads = new ArrayList<>(2);
final ConcurrentHashSet<Throwable> errors = new ConcurrentHashSet<>();
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
errors.add(ex);
}
};
Thread ttlCheckerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
AuthenticatedClientUser.set(TEST_USER);
barrier.await();
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
ttlCheckerThread.setUncaughtExceptionHandler(exceptionHandler);
threads.add(ttlCheckerThread);

Thread setTtlFile2Thread = new Thread(new Runnable() {
@Override
public void run() {
try {
AuthenticatedClientUser.set(TEST_USER);
barrier.await();
SetAttributePOptions setTTlOptions = SetAttributePOptions.newBuilder().setRecursive(false)
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder()
.setTtl(TTL_INTERVAL_MS).setTtlAction(TtlAction.DELETE).build())
.build();
mFileSystem.setAttribute(fileUri2, setTTlOptions);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
setTtlFile2Thread.setUncaughtExceptionHandler(exceptionHandler);
threads.add(setTtlFile2Thread);
Collections.shuffle(threads);
long startMs = CommonUtils.getCurrentMs();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
}

0 comments on commit 2a731f6

Please sign in to comment.