Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix setTtl workflow and ttlbucket bugs #16933

Merged
merged 17 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add retry attemp in ttlbucket to prevent inodettlchecker from process…
…ing inodes forever
  • Loading branch information
lucyge2022 committed Mar 6, 2023
commit 2a3a5f9ebb2de9ba7e1220d716d580bc1f0b5145
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;

Expand All @@ -61,13 +62,19 @@ public InodeTtlChecker(FileSystemMaster fileSystemMaster, InodeTree inodeTree) {
@Override
public void heartbeat() throws InterruptedException {
Set<TtlBucket> expiredBuckets = mTtlBuckets.pollExpiredBuckets(System.currentTimeMillis());
Set<Inode> failedInodes = new HashSet();
Map<Inode, Integer> failedInodesToRetryNum = new HashMap<>();
for (TtlBucket bucket : expiredBuckets) {
for (long inodeId : bucket.getInodeIds()) {
for (Map.Entry<Long, Integer> inodeExpiryEntry : bucket.getInodeExpiries()) {
// Throw if interrupted.
if (Thread.interrupted()) {
throw new InterruptedException("InodeTtlChecker interrupted.");
}
long inodeId = inodeExpiryEntry.getKey();
int leftRetries = inodeExpiryEntry.getValue();
// Exhausted retry attempt to expire this inode, bail.
if (leftRetries <= 0) {
continue;
}
AlluxioURI path = null;
try (LockedInodePath inodePath =
mInodeTree.lockFullInodePath(
Expand Down Expand Up @@ -139,18 +146,19 @@ public void heartbeat() throws InterruptedException {
LOG.error("Unknown ttl action {}", ttlAction);
}
} catch (Exception e) {
LOG.error("Exception trying to clean up {} for ttl check", path, e);
if (inode != null) {
failedInodes.add(inode);
LOG.error("Exception trying to clean up {} for ttl check. Left retries:{}. {}",
path, leftRetries - 1, e);
if (inode != null && --leftRetries > 0) {
failedInodesToRetryNum.put(inode, leftRetries);
}
}
}
}
}
// Put back those failed-to-expire inodes for next round retry.
if (!failedInodes.isEmpty()) {
for (Inode inode : failedInodes) {
mTtlBuckets.insert(inode);
if (!failedInodesToRetryNum.isEmpty()) {
for (Map.Entry<Inode, Integer> failedInodeEntry : failedInodesToRetryNum.entrySet()) {
mTtlBuckets.insert(failedInodeEntry.getKey(), failedInodeEntry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

package alluxio.master.file.meta;

import alluxio.collections.ConcurrentHashSet;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;

import com.google.common.base.Objects;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -34,16 +35,18 @@ public final class TtlBucket implements Comparable<TtlBucket> {
*/
private static long sTtlIntervalMs =
Configuration.getMs(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS);
public static final int DEFAULT_RETRY_ATTEMPTS = 5;
/**
* Each bucket has a time to live interval, this value is the start of the interval, interval
* value is the same as the configuration of {@link PropertyKey#MASTER_TTL_CHECKER_INTERVAL_MS}.
*/
private final long mTtlIntervalStartTimeMs;
/**
* A collection of inodes whose ttl value is in the range of this bucket's interval. The mapping
* is from inode id to inode.
* A collection containing those inodes whose ttl value is
* in the range of this bucket's interval. The mapping
* is from inode id to the number of left retry to process.
*/
private final ConcurrentHashSet<Long> mInodeList;
private final ConcurrentHashMap<Long, Integer> mInodeToRetryMap;

/**
* Creates a new instance of {@link TtlBucket}.
Expand All @@ -52,7 +55,7 @@ public final class TtlBucket implements Comparable<TtlBucket> {
*/
public TtlBucket(long startTimeMs) {
mTtlIntervalStartTimeMs = startTimeMs;
mInodeList = new ConcurrentHashSet<>();
mInodeToRetryMap = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -82,34 +85,54 @@ public static long getTtlIntervalMs() {
* changes made to the returned set will be shown in the internal set, and vice versa.
*/
public Collection<Long> getInodeIds() {
return mInodeList;
return mInodeToRetryMap.keySet();
}

/**
* Adds a inode to the bucket.
* Get collection of inode to its left ttl process retry attempts.
* @return collection of inode to its left ttl process retry attempts
*/
public Collection<Map.Entry<Long, Integer>> getInodeExpiries() {
return mInodeToRetryMap.entrySet();
}

/**
* Adds an inode with default num of retry attempt to expire.
* @param inode
*/
public void addInode(Inode inode) {
addInode(inode, DEFAULT_RETRY_ATTEMPTS);
}

/**
* Adds an inode to the bucket with a specific left retry number.
*
* @param inode the inode to be added
* @return true if a new inode was added to the bucket
* @param numOfRetry num of retry left when added to the ttlbucket
*/
public boolean addInode(Inode inode) {
return mInodeList.add(inode.getId());
public void addInode(Inode inode, int numOfRetry) {
mInodeToRetryMap.compute(inode.getId(), (k, v) -> {
if (v != null) {
return Math.min(v, numOfRetry);
}
return numOfRetry;
});
}

/**
* Removes an inode from the bucket.
*
* @param inode the inode to be removed
* @return true if an inode was removed
*/
public boolean removeInode(InodeView inode) {
return mInodeList.remove(inode.getId());
public void removeInode(InodeView inode) {
mInodeToRetryMap.remove(inode.getId());
}

/**
* @return the number of inodes in the bucket
*/
public int size() {
return mInodeList.size();
return mInodeToRetryMap.size();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,24 @@ private TtlBucket getBucketContaining(InodeView inode) {
return bucket;
}

/**
* Insert inode to the ttlbucket with default number of retry attempts.
* @param inode
*/
public void insert(Inode inode) {
insert(inode, TtlBucket.DEFAULT_RETRY_ATTEMPTS);
}

/**
* Inserts an inode to the appropriate bucket where its ttl end time lies in the
* bucket's interval, if no appropriate bucket exists, a new bucket will be created to contain
* this inode, if ttl value is {@link Constants#NO_TTL}, the inode won't be inserted to any
* buckets and nothing will happen.
*
* @param inode the inode to be inserted
* @param numOfRetry number of retries left to process this inode
*/
public void insert(Inode inode) {
public void insert(Inode inode, int numOfRetry) {
if (inode.getTtl() == Constants.NO_TTL) {
return;
}
Expand All @@ -143,7 +152,7 @@ public void insert(Inode inode) {
continue;
}
}
bucket.addInode(inode);
bucket.addInode(inode, numOfRetry);
/* if we added to the bucket but it got concurrently polled by InodeTtlChecker,
we're not sure this newly-added inode will be processed by the checker,
so we need to try insert again. Resolve for (c.f. ALLUXIO-2821) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private List<TtlBucket> pollSortedExpiredBuckets(long expireTime) {
private void assertExpired(List<TtlBucket> expiredBuckets, int bucketIndex,
Inode... inodes) {
TtlBucket bucket = expiredBuckets.get(bucketIndex);
Assert.assertEquals(inodes.length, bucket.getInodeIds().size());
Assert.assertEquals(inodes.length, bucket.size());
List<Long> inodeIds = Lists.newArrayList(inodes).stream().map(Inode::getId)
.collect(Collectors.toList());
Assert.assertTrue(bucket.getInodeIds().containsAll(inodeIds));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,32 @@ public void addAndRemoveInodeFile() {
Assert.assertTrue(mBucket.getInodeIds().isEmpty());

mBucket.addInode(fileTtl1);
Assert.assertEquals(1, mBucket.getInodeIds().size());
Assert.assertEquals(1, mBucket.size());

// The same file, won't be added.
mBucket.addInode(fileTtl1);
Assert.assertEquals(1, mBucket.getInodeIds().size());
Assert.assertEquals(1, mBucket.size());

// Different file, will be added.
mBucket.addInode(fileTtl2);
Assert.assertEquals(2, mBucket.getInodeIds().size());
Assert.assertEquals(2, mBucket.size());

// Remove files;
mBucket.removeInode(fileTtl1);
Assert.assertEquals(1, mBucket.getInodeIds().size());
Assert.assertEquals(1, mBucket.size());
Assert.assertTrue(mBucket.getInodeIds().contains(fileTtl2.getId()));
mBucket.removeInode(fileTtl2);
Assert.assertEquals(0, mBucket.getInodeIds().size());
Assert.assertEquals(0, mBucket.size());

// Retry attempts;
mBucket.addInode(fileTtl1);
Assert.assertTrue(mBucket.getInodeIds().contains(fileTtl1.getId()));
int retryAttempt = mBucket.getInodeExpiries().iterator().next().getValue();
Assert.assertEquals(retryAttempt, TtlBucket.DEFAULT_RETRY_ATTEMPTS);
mBucket.addInode(fileTtl1, 2);
Assert.assertTrue(mBucket.getInodeIds().contains(fileTtl1.getId()));
int newRetryAttempt = mBucket.getInodeExpiries().iterator().next().getValue();
Assert.assertEquals(newRetryAttempt, 2);
}

/**
Expand All @@ -106,19 +116,19 @@ public void addAndRemoveInodeDirectory() {
Assert.assertTrue(mBucket.getInodeIds().isEmpty());

mBucket.addInode(directoryTtl1);
Assert.assertEquals(1, mBucket.getInodeIds().size());
Assert.assertEquals(1, mBucket.size());

// The same directory, won't be added.
mBucket.addInode(directoryTtl1);
Assert.assertEquals(1, mBucket.getInodeIds().size());
Assert.assertEquals(1, mBucket.size());

// Different directory, will be added.
mBucket.addInode(directoryTtl2);
Assert.assertEquals(2, mBucket.getInodeIds().size());
Assert.assertEquals(2, mBucket.size());

// Remove directorys;
mBucket.removeInode(directoryTtl1);
Assert.assertEquals(1, mBucket.getInodeIds().size());
Assert.assertEquals(1, mBucket.size());
Assert.assertTrue(mBucket.getInodeIds().contains(directoryTtl2.getId()));
mBucket.removeInode(directoryTtl2);
Assert.assertEquals(0, mBucket.getInodeIds().size());
Expand Down