Skip to content

Commit

Permalink
checkstyle fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lucyge2022 committed Feb 22, 2023
1 parent 8f4654f commit b018aee
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void heartbeat() throws InterruptedException {
Set<TtlBucket> expiredBuckets = mTtlBuckets.pollExpiredBuckets(System.currentTimeMillis());
Set<Inode> failedInodes = new HashSet();
for (TtlBucket bucket : expiredBuckets) {
for (long inodeId : bucket.getInodes()) {
for (long inodeId : bucket.getInodes()) {
// Throw if interrupted.
if (Thread.interrupted()) {
throw new InterruptedException("InodeTtlChecker interrupted.");
Expand All @@ -87,8 +87,9 @@ public void heartbeat() throws InterruptedException {
inode = mTtlBuckets.loadInode(inodeId);
// Check again if this inode is indeed expired.
if (inode == null || inode.getTtl() == Constants.NO_TTL
|| inode.getCreationTimeMs() + inode.getTtl() > System.currentTimeMillis())
|| inode.getCreationTimeMs() + inode.getTtl() > System.currentTimeMillis()) {
continue;
}
TtlAction ttlAction = inode.getTtlAction();
LOG.info("Path {} TTL has expired, performing action {}", path.getPath(), ttlAction);
switch (ttlAction) {
Expand Down Expand Up @@ -148,7 +149,7 @@ public void heartbeat() throws InterruptedException {
}
// Put back those failed-to-expire inodes for next round retry.
if (!failedInodes.isEmpty()) {
for(Inode inode : failedInodes) {
for (Inode inode : failedInodes) {
mTtlBuckets.insert(inode);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
import com.google.common.base.Objects;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -89,7 +85,6 @@ public Collection<Long> getInodes() {
return mInodeList;
}


/**
* Adds a inode to the bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -61,6 +60,11 @@ public TtlBucketList(ReadOnlyInodeStore inodeStore) {
mBucketList = new ConcurrentSkipListSet<>();
}

/**
* Load inode from inode store on processing the provided inode id.
* @param inodeId
* @return Inode
*/
public Inode loadInode(long inodeId) {
return mInodeStore.get(inodeId).orElseGet(null);
}
Expand Down Expand Up @@ -126,10 +130,11 @@ public void insert(Inode inode) {
bucket = getBucketContaining(inode);
if (bucket == null) {
long ttlEndTimeMs = inode.getCreationTimeMs() + inode.getTtl();
// No bucket contains the inode, so a new bucket should be added with an appropriate interval
// start. Assume the list of buckets have continuous intervals, and the first interval starts
// at 0, then ttlEndTimeMs should be in number (ttlEndTimeMs / interval) interval, so the
// start time of this interval should be (ttlEndTimeMs / interval) * interval.
// No bucket contains the inode, so a new bucket should be added with an appropriate
// interval start. Assume the list of buckets have continuous intervals, and the
// first interval starts at 0, then ttlEndTimeMs should be in number
// (ttlEndTimeMs / interval) interval, so the start time of this interval should be
// (ttlEndTimeMs / interval) * interval.
long interval = TtlBucket.getTtlIntervalMs();
bucket = new TtlBucket(interval == 0 ? ttlEndTimeMs : ttlEndTimeMs / interval * interval);
if (!mBucketList.add(bucket)) {
Expand All @@ -139,23 +144,24 @@ public void insert(Inode inode) {
}
}
bucket.addInode(inode);
/* if we added to the bucket but it got concurrently polled by InodeTtlChecker, we're not sure
this newly-added inode will be processed by the checker, so we need to try insert again.
Resolve for (c.f. ALLUXIO-2821) */
/* if we added to the bucket but it got concurrently polled by InodeTtlChecker,
we're not sure this newly-added inode will be processed by the checker,
so we need to try insert again. Resolve for (c.f. ALLUXIO-2821) */
if (mBucketList.contains(bucket)) {
break;
}
}
}

/**
* Removes an inode from the bucket containing it if the inode is in one of the buckets, otherwise,
* do nothing.
* Removes an inode from the bucket containing it if the inode is in one
* of the buckets, otherwise, do nothing.
*
* <p>
* Assume that no inode in the buckets has ttl value that equals {@link Constants#NO_TTL}.
* If an inode with valid ttl value is inserted to the buckets and its ttl value is going to be set
* to {@link Constants#NO_TTL} later, be sure to remove the inode from the buckets first.
* If an inode with valid ttl value is inserted to the buckets and its ttl value is
* going to be set to {@link Constants#NO_TTL} later, be sure to remove the inode
* from the buckets first.
*
* @param inode the inode to be removed
*/
Expand All @@ -178,7 +184,7 @@ public void remove(InodeView inode) {
*/
public Set<TtlBucket> pollExpiredBuckets(long time) {
Set<TtlBucket> expiredBuckets = new HashSet<>();
TtlBucket upperBound = new TtlBucket(time- TtlBucket.getTtlIntervalMs());
TtlBucket upperBound = new TtlBucket(time - TtlBucket.getTtlIntervalMs());
while (!mBucketList.isEmpty() && mBucketList.first().compareTo(upperBound) <= 0) {
expiredBuckets.add(mBucketList.pollFirst());
}
Expand All @@ -198,7 +204,7 @@ public CheckpointName getCheckpointName() {
public void writeToCheckpoint(OutputStream output) throws IOException, InterruptedException {
CheckpointOutputStream cos = new CheckpointOutputStream(output, CheckpointType.LONGS);
for (TtlBucket bucket : mBucketList) {
for(long inodeId : bucket.getInodes()) {
for (long inodeId : bucket.getInodes()) {
cos.writeLong(inodeId);
}
}
Expand Down
8 changes: 4 additions & 4 deletions tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,10 @@ public void expireADirectory() throws Exception {
// Individual children file's ttl should not be changed.
Random random = new Random();
int fileNum = random.nextInt(numFiles);
URIStatus anyFileStatus = mFileSystem.getStatus(new AlluxioURI("/" + directoryName +
"/" + fileNamePrefix + fileNum));
assert(anyFileStatus.getFileInfo().getTtl() ==
(fileNum % 2 == 0 ? TTL_INTERVAL_MS * 2000 : TTL_INTERVAL_MS * 1000));
URIStatus anyFileStatus = mFileSystem.getStatus(new AlluxioURI("/" + directoryName
+ "/" + fileNamePrefix + fileNum));
assert (anyFileStatus.getFileInfo().getTtl()
== (fileNum % 2 == 0 ? TTL_INTERVAL_MS * 2000 : TTL_INTERVAL_MS * 1000));

CommonUtils.sleepMs(4 * TTL_INTERVAL_MS);
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);
Expand Down

0 comments on commit b018aee

Please sign in to comment.