diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index efbca863e507..0dd516a88d15 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -83,8 +83,8 @@ public class CleanPlanner implements Serializable { private final HoodieTimeline commitTimeline; private final Map fgIdToPendingCompactionOperations; private final Map fgIdToPendingLogCompactionOperations; - private HoodieTable hoodieTable; - private HoodieWriteConfig config; + private final HoodieTable hoodieTable; + private final HoodieWriteConfig config; private transient HoodieEngineContext context; public CleanPlanner(HoodieEngineContext context, HoodieTable hoodieTable, HoodieWriteConfig config) { @@ -314,6 +314,9 @@ private Pair> getFilesToCleanKeepingLatestCommits(S */ private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, Option earliestCommitToRetain, HoodieCleaningPolicy policy) { + if (policy != HoodieCleaningPolicy.KEEP_LATEST_COMMITS && policy != HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + throw new IllegalArgumentException("getFilesToCleanKeepingLatestCommits can only be used for KEEP_LATEST_COMMITS or KEEP_LATEST_BY_HOURS"); + } LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List deletePaths = new ArrayList<>(); @@ -351,23 +354,13 @@ private Pair> getFilesToCleanKeepingLatestCommits(S continue; } - if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - // Do not delete the latest commit and also the last commit before the earliest commit we - // are retaining - // The window of commit retain == max query run time. So a query could be running which - // still - // uses this file. - if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { - // move on to the next file - continue; - } - } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - // This block corresponds to KEEP_LATEST_BY_HOURS policy - // Do not delete the latest commit. - if (fileCommitTime.equals(lastVersion)) { - // move on to the next file - continue; - } + // Do not delete the latest commit and also the last commit before the earliest commit we + // are retaining + // The window of commit retain == max query run time. So a query could be running which + // still uses this file. + if (fileCommitTime.equals(lastVersion) || fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain)) { + // move on to the next file + continue; } // Always keep the last commit diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java new file mode 100644 index 000000000000..e5a528b9382e --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action; + +import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.CleanFileInfo; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.clean.CleanPlanner; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCleanPlanner { + private static final Configuration CONF = new Configuration(); + private final HoodieEngineContext context = new HoodieLocalEngineContext(CONF); + + private final HoodieTable mockHoodieTable = mock(HoodieTable.class); + + private SyncableFileSystemView mockFsView; + + @BeforeEach + void setUp() { + mockFsView = mock(SyncableFileSystemView.class); + when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView); + SyncableFileSystemView sliceView = mock(SyncableFileSystemView.class); + when(mockHoodieTable.getSliceView()).thenReturn(sliceView); + when(sliceView.getPendingCompactionOperations()).thenReturn(Stream.empty()); + when(sliceView.getPendingLogCompactionOperations()).thenReturn(Stream.empty()); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + when(mockHoodieTable.getMetaClient()).thenReturn(metaClient); + HoodieTableConfig tableConfig = new HoodieTableConfig(); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class); + when(mockCompletedCommitsTimeline.countInstants()).thenReturn(10); + when(mockHoodieTable.getCompletedCommitsTimeline()).thenReturn(mockCompletedCommitsTimeline); + } + + @ParameterizedTest + @MethodSource("testCases") + void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant, List allFileGroups, List>> savepoints, + List replacedFileGroups, Pair> expected) { + + // setup savepoint mocks + Set savepointTimestamps = savepoints.stream().map(Pair::getLeft).collect(Collectors.toSet()); + when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps); + if (!savepoints.isEmpty()) { + HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class); + when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline); + for (Pair> savepoint : savepoints) { + HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepoint.getLeft()); + when(activeTimeline.getInstantDetails(instant)).thenReturn(savepoint.getRight()); + } + } + String partitionPath = "partition1"; + // setup replaced file groups mocks + if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { + when(mockFsView.getAllReplacedFileGroups(partitionPath)).thenReturn(replacedFileGroups.stream()); + } else { + when(mockFsView.getReplacedFileGroupsBefore(earliestInstant, partitionPath)).thenReturn(replacedFileGroups.stream()); + } + // setup current file groups mocks + when(mockFsView.getAllFileGroupsStateless(partitionPath)).thenReturn(allFileGroups.stream()); + + CleanPlanner cleanPlanner = new CleanPlanner<>(context, mockHoodieTable, config); + HoodieInstant earliestCommitToRetain = new HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant); + Pair> actual = cleanPlanner.getDeletePaths(partitionPath, Option.of(earliestCommitToRetain)); + assertEquals(expected, actual); + } + + static Stream testCases() { + return Stream.concat(keepLatestByHoursOrCommitsArgs(), keepLatestVersionsArgs()); + } + + static Stream keepLatestVersionsArgs() { + HoodieWriteConfig keepLatestVersionsConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainFileVersions(2) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .build()) + .build(); + String instant1 = "20231205194919610"; + String instant2 = "20231204194919610"; + String instant3 = "20231201194919610"; + String instant4 = "20231127194919610"; + List arguments = new ArrayList<>(); + // Two file slices in the group: both should be retained + arguments.add(Arguments.of( + keepLatestVersionsConfig, + instant1, + Collections.singletonList(buildFileGroup(Arrays.asList(instant2, instant1))), + Collections.emptyList(), + Collections.emptyList(), + Pair.of(false, Collections.emptyList()))); + // Four file slices in the group: only the latest two should be retained + HoodieFileGroup fileGroup = buildFileGroup(Arrays.asList(instant4, instant3, instant2, instant1)); + String instant3Path = fileGroup.getAllBaseFiles() + .filter(baseFile -> baseFile.getCommitTime().equals(instant3)).findFirst().get().getPath(); + CleanFileInfo expectedCleanFileInfoForInstant3 = new CleanFileInfo(instant3Path, false); + String instant4Path = fileGroup.getAllBaseFiles() + .filter(baseFile -> baseFile.getCommitTime().equals(instant4)).findFirst().get().getPath(); + CleanFileInfo expectedCleanFileInfoForInstant4 = new CleanFileInfo(instant4Path, false); + arguments.add(Arguments.of( + keepLatestVersionsConfig, + instant1, + Collections.singletonList(fileGroup), + Collections.emptyList(), + Collections.emptyList(), + Pair.of(false, Arrays.asList(expectedCleanFileInfoForInstant3, expectedCleanFileInfoForInstant4)))); + // Four file slices in group but instant4 is part of savepiont: only instant 3's files should be cleaned + List>> savepoints = Collections.singletonList(Pair.of(instant4, getSavepointBytes("partition1", Collections.singletonList(instant4Path)))); + arguments.add(Arguments.of( + keepLatestVersionsConfig, + instant1, + Collections.singletonList(fileGroup), + savepoints, + Collections.emptyList(), + Pair.of(false, Arrays.asList(expectedCleanFileInfoForInstant3)))); + // Two file slices with a replaced file group: only replaced files cleaned up + HoodieFileGroup replacedFileGroup = buildFileGroup(Collections.singletonList(instant4)); + String replacedFilePath = replacedFileGroup.getAllBaseFiles().findFirst().get().getPath(); + CleanFileInfo expectedReplaceCleanFileInfo = new CleanFileInfo(replacedFilePath, false); + arguments.add(Arguments.of( + keepLatestVersionsConfig, + instant1, + Collections.singletonList(buildFileGroup(Arrays.asList(instant2, instant1))), + Collections.emptyList(), + Collections.singletonList(replacedFileGroup), + Pair.of(false, Collections.singletonList(expectedReplaceCleanFileInfo)))); + // replaced file groups referenced by savepoint should not be cleaned up + List>> replacedFileGroupSavepoint = Collections.singletonList(Pair.of(instant4, getSavepointBytes("partition1", Collections.singletonList(replacedFilePath)))); + arguments.add(Arguments.of( + keepLatestVersionsConfig, + instant1, + Collections.singletonList(buildFileGroup(Arrays.asList(instant2, instant1))), + replacedFileGroupSavepoint, + Collections.singletonList(replacedFileGroup), + Pair.of(false, Collections.emptyList()))); + return arguments.stream(); + } + + static Stream keepLatestByHoursOrCommitsArgs() { + String earliestInstant = "20231204194919610"; + String earliestInstantPlusTwoDays = "20231205194919610"; + String earliestInstantMinusThreeDays = "20231201194919610"; + String earliestInstantMinusOneWeek = "20231127194919610"; + String earliestInstantMinusOneMonth = "20231104194919610"; + List arguments = new ArrayList<>(); + // Only one file slice in the group: should still be kept even with commit earlier than "earliestInstant" + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(buildFileGroup(Collections.singletonList(earliestInstantMinusOneMonth))), + Collections.emptyList(), + Collections.emptyList(), + Pair.of(false, Collections.emptyList()))); + // File group with two slices, both are before the earliestInstant. Only the latest slice should be kept. + HoodieFileGroup fileGroupsBeforeInstant = buildFileGroup(Arrays.asList(earliestInstantMinusOneMonth, earliestInstantMinusOneWeek)); + CleanFileInfo expectedCleanFileInfoForFirstFile = new CleanFileInfo(fileGroupsBeforeInstant.getAllBaseFiles() + .filter(baseFile -> baseFile.getCommitTime().equals(earliestInstantMinusOneMonth)).findFirst().get().getPath(), false); + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(fileGroupsBeforeInstant), + Collections.emptyList(), + Collections.emptyList(), + Pair.of(false, Collections.singletonList(expectedCleanFileInfoForFirstFile)))); + // File group with two slices, one is after the earliestInstant and the other is before the earliestInstant. + // We should keep both since base files are required for queries evaluating the table at time NOW - 24hrs (24hrs is configured for test) + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(buildFileGroup(Arrays.asList(earliestInstantMinusOneMonth, earliestInstantPlusTwoDays))), + Collections.emptyList(), + Collections.emptyList(), + Pair.of(false, Collections.emptyList()))); + // File group with three slices, one is after the earliestInstant and the other two are before the earliestInstant. + // Oldest slice will be removed since it is not required for queries evaluating the table at time NOW - 24hrs + String oldestFileInstant = earliestInstantMinusOneMonth; + HoodieFileGroup fileGroup = buildFileGroup(Arrays.asList(oldestFileInstant, earliestInstantMinusThreeDays, earliestInstantPlusTwoDays)); + String oldestFilePath = fileGroup.getAllBaseFiles().filter(baseFile -> baseFile.getCommitTime().equals(oldestFileInstant)).findFirst().get().getPath(); + CleanFileInfo expectedCleanFileInfo = new CleanFileInfo(oldestFilePath, false); + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(fileGroup), + Collections.emptyList(), + Collections.emptyList(), + Pair.of(false, Collections.singletonList(expectedCleanFileInfo)))); + // File group with three slices, one is after the earliestInstant and the other two are before the earliestInstant. Oldest slice is also in savepoint so should not be removed. + List>> savepoints = Collections.singletonList(Pair.of(oldestFileInstant, getSavepointBytes("partition1", Collections.singletonList(oldestFilePath)))); + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(fileGroup), + savepoints, + Collections.emptyList(), + Pair.of(false, Collections.emptyList()))); + // File group is replaced before the earliestInstant. Should be removed. + HoodieFileGroup replacedFileGroup = buildFileGroup(Collections.singletonList(earliestInstantMinusOneMonth)); + String replacedFilePath = replacedFileGroup.getAllBaseFiles().findFirst().get().getPath(); + CleanFileInfo expectedReplaceCleanFileInfo = new CleanFileInfo(replacedFilePath, false); + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(buildFileGroup(Collections.singletonList(earliestInstantMinusOneMonth))), + Collections.emptyList(), + Collections.singletonList(replacedFileGroup), + Pair.of(false, Collections.singletonList(expectedReplaceCleanFileInfo)))); + // File group is replaced before the earliestInstant but referenced in a savepoint. Should be retained. + List>> savepointsForReplacedGroup = Collections.singletonList(Pair.of(oldestFileInstant, + getSavepointBytes("partition1", Collections.singletonList(replacedFilePath)))); + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsCases( + earliestInstant, + Collections.singletonList(buildFileGroup(Collections.singletonList(earliestInstantMinusOneMonth))), + savepointsForReplacedGroup, + Collections.singletonList(replacedFileGroup), + Pair.of(false, Collections.emptyList()))); + // Clean by commits but there are not enough commits in timeline to trigger cleaner + HoodieWriteConfig writeConfigWithLargerRetention = HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(50) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .build()) + .build(); + arguments.add(Arguments.of( + writeConfigWithLargerRetention, + earliestInstant, + Collections.singletonList(buildFileGroup(Collections.singletonList(earliestInstantMinusOneMonth))), + Collections.emptyList(), + Collections.singletonList(replacedFileGroup), + Pair.of(false, Collections.emptyList()))); + + return arguments.stream(); + } + + private static HoodieWriteConfig getCleanByHoursConfig() { + return HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .cleanerNumHoursRetained(24) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) + .build()) + .build(); + } + + private static HoodieWriteConfig getCleanByCommitsConfig() { + return HoodieWriteConfig.newBuilder().withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .retainCommits(5) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .build()) + .build(); + } + + // helper to build common cases for the two policies + private static List buildArgumentsForCleanByHoursAndCommitsCases(String earliestInstant, List allFileGroups, List>> savepoints, + List replacedFileGroups, Pair> expected) { + return Arrays.asList(Arguments.of(getCleanByHoursConfig(), earliestInstant, allFileGroups, savepoints, replacedFileGroups, expected), + Arguments.of(getCleanByCommitsConfig(), earliestInstant, allFileGroups, savepoints, replacedFileGroups, expected)); + } + + private static HoodieFileGroup buildFileGroup(List baseFileCommitTimes) { + String fileGroup = UUID.randomUUID() + "-0"; + HoodieFileGroupId fileGroupId = new HoodieFileGroupId("partition1", UUID.randomUUID().toString()); + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.lastInstant()).thenReturn(Option.of(new HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", baseFileCommitTimes.get(baseFileCommitTimes.size() - 1)))); + HoodieFileGroup group = new HoodieFileGroup(fileGroupId, timeline); + for (String baseFileCommitTime : baseFileCommitTimes) { + when(timeline.containsOrBeforeTimelineStarts(baseFileCommitTime)).thenReturn(true); + HoodieBaseFile baseFile = new HoodieBaseFile(String.format("file:///tmp/base/%s_1-0-1_%s.parquet", fileGroup, baseFileCommitTime)); + group.addBaseFile(baseFile); + } + return group; + } + + private static Option getSavepointBytes(String partition, List paths) { + try { + Map partitionMetadata = new HashMap<>(); + List fileNames = paths.stream().map(path -> path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList()); + partitionMetadata.put(partition, new HoodieSavepointPartitionMetadata(partition, fileNames)); + HoodieSavepointMetadata savepointMetadata = + new HoodieSavepointMetadata("user", 1L, "comments", partitionMetadata, 1); + return TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index 1db3360bd25d..aa35e67bb94e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -646,7 +646,7 @@ public void testKeepXHoursWithCleaning( : UUID.randomUUID().toString(); Instant instant = Instant.now(); ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); - int minutesForFirstCommit = 150; + int minutesForFirstCommit = 180; String firstCommitTs = HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFirstCommit).toInstant())); Map> part1ToFileId = Collections.unmodifiableMap(new HashMap>() { { @@ -664,7 +664,7 @@ public void testKeepXHoursWithCleaning( assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0)); // make next commit, with 1 insert & 1 update per partition - int minutesForSecondCommit = 90; + int minutesForSecondCommit = 150; String secondCommitTs = HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForSecondCommit).toInstant())); Map partitionAndFileId002 = testTable.addInflightCommit(secondCommitTs).getFileIdsWithBaseFilesInPartitions(p0, p1); String file2P0C1 = partitionAndFileId002.get(p0); @@ -678,10 +678,27 @@ public void testKeepXHoursWithCleaning( commitWithMdt(secondCommitTs, part2ToFileId, testTable, metadataWriter, true, true); metaClient = HoodieTableMetaClient.reload(metaClient); - List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry, simulateMetadataFailure); + // make next commit, with 1 insert per partition + int minutesForThirdCommit = 90; + String thirdCommitTs = HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForThirdCommit).toInstant())); + Map partitionAndFileId003 = testTable.addInflightCommit(thirdCommitTs).getFileIdsWithBaseFilesInPartitions(p0, p1); + String file3P0C1 = partitionAndFileId003.get(p0); + String file3P1C1 = partitionAndFileId003.get(p1); + Map> part3ToFileId = Collections.unmodifiableMap(new HashMap>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C1)); + put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1, file3P1C1)); + } + }); + commitWithMdt(thirdCommitTs, part3ToFileId, testTable, metadataWriter, true, true); + metaClient = HoodieTableMetaClient.reload(metaClient); + + List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry, simulateMetadataFailure); metaClient = HoodieTableMetaClient.reload(metaClient); - assertEquals(2, hoodieCleanStatsTwo.size(), "Should clean one file each from both the partitions"); + assertEquals(2, hoodieCleanStatsThree.size(), "Should clean one file each from both the partitions"); + assertTrue(testTable.baseFileExists(p0, thirdCommitTs, file3P0C1)); + assertTrue(testTable.baseFileExists(p1, thirdCommitTs, file3P1C1)); assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1)); assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1)); assertTrue(testTable.baseFileExists(p0, secondCommitTs, file1P0C0)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java index 22939a2aee7d..b00918d555fa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieCleanFileInfo; import java.io.Serializable; +import java.util.Objects; /** * File info for clean action. @@ -46,5 +47,22 @@ public boolean isBootstrapBaseFile() { public HoodieCleanFileInfo toHoodieFileCleanInfo() { return new HoodieCleanFileInfo(filePath, isBootstrapBaseFile); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CleanFileInfo that = (CleanFileInfo) o; + return isBootstrapBaseFile == that.isBootstrapBaseFile && Objects.equals(filePath, that.filePath); + } + + @Override + public int hashCode() { + return Objects.hash(filePath, isBootstrapBaseFile); + } }