Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix setTtl workflow and ttlbucket bugs #16933

Merged
merged 17 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.NoopJournalContext;
import alluxio.proto.journal.File.UpdateInodeEntry;
import alluxio.util.ThreadUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;

Expand All @@ -59,28 +62,42 @@ public InodeTtlChecker(FileSystemMaster fileSystemMaster, InodeTree inodeTree) {

@Override
public void heartbeat() throws InterruptedException {
Set<TtlBucket> expiredBuckets = mTtlBuckets.getExpiredBuckets(System.currentTimeMillis());
Set<TtlBucket> expiredBuckets = mTtlBuckets.pollExpiredBuckets(System.currentTimeMillis());
Map<Inode, Integer> failedInodesToRetryNum = new HashMap<>();
for (TtlBucket bucket : expiredBuckets) {
for (Inode inode : bucket.getInodes()) {
for (Map.Entry<Long, Integer> inodeExpiryEntry : bucket.getInodeExpiries()) {
// Throw if interrupted.
if (Thread.interrupted()) {
throw new InterruptedException("InodeTtlChecker interrupted.");
}
long inodeId = inodeExpiryEntry.getKey();
int leftRetries = inodeExpiryEntry.getValue();
// Exhausted retry attempt to expire this inode, bail.
if (leftRetries <= 0) {
continue;
}
AlluxioURI path = null;
try (LockedInodePath inodePath =
mInodeTree.lockFullInodePath(
inode.getId(), LockPattern.READ, NoopJournalContext.INSTANCE)
inodeId, LockPattern.READ, NoopJournalContext.INSTANCE)
) {
path = inodePath.getUri();
} catch (FileDoesNotExistException e) {
// The inode has already been deleted, nothing needs to be done.
continue;
} catch (Exception e) {
LOG.error("Exception trying to clean up {} for ttl check: {}", inode.toString(),
e.toString());
LOG.error("Exception trying to clean up inode:{},path:{} for ttl check: {}", inodeId,
path, e.toString());
}
if (path != null) {
Inode inode = null;
try {
inode = mTtlBuckets.loadInode(inodeId);
// Check again if this inode is indeed expired.
if (inode == null || inode.getTtl() == Constants.NO_TTL
|| inode.getCreationTimeMs() + inode.getTtl() > System.currentTimeMillis()) {
continue;
}
TtlAction ttlAction = inode.getTtlAction();
LOG.info("Path {} TTL has expired, performing action {}", path.getPath(), ttlAction);
switch (ttlAction) {
Expand All @@ -102,7 +119,6 @@ public void heartbeat() throws InterruptedException {
.setTtlAction(ProtobufUtils.toProtobuf(TtlAction.DELETE))
.build());
}
mTtlBuckets.remove(inode);
break;
case DELETE:
// public delete method will lock the path, and check WRITE permission required at
Expand Down Expand Up @@ -131,12 +147,23 @@ public void heartbeat() throws InterruptedException {
LOG.error("Unknown ttl action {}", ttlAction);
}
} catch (Exception e) {
LOG.error("Exception trying to clean up {} for ttl check", inode, e);
boolean retryExhausted = --leftRetries <= 0;
if (retryExhausted) {
LOG.error("Retry exhausted to clean up {} for ttl check. {}",
path, ThreadUtils.formatStackTrace(e));
} else if (inode != null) {
failedInodesToRetryNum.put(inode, leftRetries);
}
}
}
}
}
mTtlBuckets.removeBuckets(expiredBuckets);
// Put back those failed-to-expire inodes for next round retry.
if (!failedInodesToRetryNum.isEmpty()) {
for (Map.Entry<Inode, Integer> failedInodeEntry : failedInodesToRetryNum.entrySet()) {
mTtlBuckets.insert(failedInodeEntry.getKey(), failedInodeEntry.getValue());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.common.base.Objects;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -34,16 +36,18 @@ public final class TtlBucket implements Comparable<TtlBucket> {
*/
private static long sTtlIntervalMs =
Configuration.getMs(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS);
public static final int DEFAULT_RETRY_ATTEMPTS = 5;
/**
* Each bucket has a time to live interval, this value is the start of the interval, interval
* value is the same as the configuration of {@link PropertyKey#MASTER_TTL_CHECKER_INTERVAL_MS}.
*/
private final long mTtlIntervalStartTimeMs;
/**
* A collection of inodes whose ttl value is in the range of this bucket's interval. The mapping
* is from inode id to inode.
* A collection containing those inodes whose ttl value is
* in the range of this bucket's interval. The mapping
* is from inode id to the number of left retry to process.
*/
private final ConcurrentHashMap<Long, Inode> mInodes;
private final ConcurrentHashMap<Long, Integer> mInodeToRetryMap;

/**
* Creates a new instance of {@link TtlBucket}.
Expand All @@ -52,7 +56,7 @@ public final class TtlBucket implements Comparable<TtlBucket> {
*/
public TtlBucket(long startTimeMs) {
mTtlIntervalStartTimeMs = startTimeMs;
mInodes = new ConcurrentHashMap<>();
mInodeToRetryMap = new ConcurrentHashMap<>();
}

/**
Expand All @@ -78,38 +82,57 @@ public static long getTtlIntervalMs() {
}

/**
* @return the set of all inodes in the bucket backed by the internal set, changes made to the
* returned set will be shown in the internal set, and vice versa
* @return an unmodifiable view of all inodes ids in the bucket
*/
public Collection<Inode> getInodes() {
return mInodes.values();
public Collection<Long> getInodeIds() {
return Collections.unmodifiableSet(mInodeToRetryMap.keySet());
}

/**
* Adds a inode to the bucket.
* Get collection of inode to its left ttl process retry attempts.
* @return collection of inode to its left ttl process retry attempts
*/
public Collection<Map.Entry<Long, Integer>> getInodeExpiries() {
return Collections.unmodifiableSet(mInodeToRetryMap.entrySet());
}

/**
* Adds an inode with default num of retry attempt to expire.
* @param inode
*/
public void addInode(Inode inode) {
addInode(inode, DEFAULT_RETRY_ATTEMPTS);
}

/**
* Adds an inode to the bucket with a specific left retry number.
*
* @param inode the inode to be added
* @return true if a new inode was added to the bucket
* @param numOfRetry num of retries left when added to the ttlbucket
*/
public boolean addInode(Inode inode) {
return mInodes.put(inode.getId(), inode) == null;
public void addInode(Inode inode, int numOfRetry) {
mInodeToRetryMap.compute(inode.getId(), (k, v) -> {
if (v != null) {
return Math.min(v, numOfRetry);
}
return numOfRetry;
});
}

/**
* Removes a inode from the bucket.
* Removes an inode from the bucket.
*
* @param inode the inode to be removed
* @return true if a inode was removed
*/
public boolean removeInode(InodeView inode) {
return mInodes.remove(inode.getId()) != null;
public void removeInode(InodeView inode) {
mInodeToRetryMap.remove(inode.getId());
}

/**
* @return the number of inodes in the bucket
*/
public int size() {
return mInodes.size();
return mInodeToRetryMap.size();
}

/**
Expand Down
Loading