Skip to content

Commit

Permalink
[HUDI-7223] Cleaner KEEP_LATEST_BY_HOURS should retain latest commit …
Browse files Browse the repository at this point in the history
…before earliest commit to retain (#10307)
  • Loading branch information
the-other-tim-brown authored Dec 17, 2023
1 parent 27ac428 commit 50f0d9f
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public class CleanPlanner<T, I, K, O> implements Serializable {
private final HoodieTimeline commitTimeline;
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingLogCompactionOperations;
private HoodieTable<T, I, K, O> hoodieTable;
private HoodieWriteConfig config;
private final HoodieTable<T, I, K, O> hoodieTable;
private final HoodieWriteConfig config;
private transient HoodieEngineContext context;

public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
Expand Down Expand Up @@ -314,6 +314,9 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
*/
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath,
int commitsRetained, Option<HoodieInstant> earliestCommitToRetain, HoodieCleaningPolicy policy) {
if (policy != HoodieCleaningPolicy.KEEP_LATEST_COMMITS && policy != HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
throw new IllegalArgumentException("getFilesToCleanKeepingLatestCommits can only be used for KEEP_LATEST_COMMITS or KEEP_LATEST_BY_HOURS");
}
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();

Expand Down Expand Up @@ -351,23 +354,13 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
continue;
}

if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
// Do not delete the latest commit and also the last commit before the earliest commit we
// are retaining
// The window of commit retain == max query run time. So a query could be running which
// still
// uses this file.
if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
// move on to the next file
continue;
}
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
// This block corresponds to KEEP_LATEST_BY_HOURS policy
// Do not delete the latest commit.
if (fileCommitTime.equals(lastVersion)) {
// move on to the next file
continue;
}
// Do not delete the latest commit and also the last commit before the earliest commit we
// are retaining
// The window of commit retain == max query run time. So a query could be running which
// still uses this file.
if (fileCommitTime.equals(lastVersion) || fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain)) {
// move on to the next file
continue;
}

// Always keep the last commit
Expand Down
Loading

0 comments on commit 50f0d9f

Please sign in to comment.