Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 19, 2024
1 parent 64c9c37 commit 73638a6
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 21 deletions.
22 changes: 22 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -189,6 +190,10 @@ public static Comparator<DeleteFile> deleteFile() {
return DeleteFileComparator.INSTANCE;
}

public static Comparator<ContentFile<?>> contentFile() {
return ContentFileComparator.INSTANCE;
}

private static class NullsFirst<T> implements Comparator<T> {
private static final NullsFirst<?> INSTANCE = new NullsFirst<>();

Expand Down Expand Up @@ -426,4 +431,21 @@ public int compare(DeleteFile s1, DeleteFile s2) {
return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}

private static class ContentFileComparator implements Comparator<ContentFile<?>> {
private static final ContentFileComparator INSTANCE = new ContentFileComparator();

private ContentFileComparator() {}

@Override
public int compare(ContentFile<?> s1, ContentFile<?> s2) {
if (s1 instanceof DataFile && s2 instanceof DataFile) {
return DataFileComparator.INSTANCE.compare((DataFile) s1, (DataFile) s2);
} else if (s1 instanceof DeleteFile && s2 instanceof DeleteFile) {
return DeleteFileComparator.INSTANCE.compare((DeleteFile) s1, (DeleteFile) s2);
}

return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}
}
52 changes: 31 additions & 21 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -36,10 +37,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
Expand Down Expand Up @@ -71,6 +72,7 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final Set<F> filesToDelete = Sets.newTreeSet(Comparators.contentFile());
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
Expand All @@ -83,8 +85,7 @@ public String partition() {
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
private final Map<ManifestFile, Set<F>> filteredManifestToDeletedFiles = Maps.newConcurrentMap();

private final Supplier<ExecutorService> workerPoolSupplier;

Expand Down Expand Up @@ -153,7 +154,7 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
filesToDelete.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -167,6 +168,7 @@ void delete(CharSequence path) {

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| !filesToDelete.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -233,23 +235,28 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
CharSequenceSet deletedFiles = deletedFiles(manifests);
Set<F> deletedFiles = deletedFiles(manifests);

ValidationException.check(
deletedFiles.containsAll(deletePaths),
deletedFiles.containsAll(filesToDelete),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
COMMA.join(
filesToDelete.stream()
.filter(f -> !deletedFiles.contains(f))
.map(ContentFile::path)
.collect(Collectors.toList())));
}
}

private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
private Set<F> deletedFiles(ManifestFile[] manifests) {
Set<F> deletedFiles = Sets.newTreeSet(Comparators.contentFile());

if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
deletedFiles.add(file);
}
}
}
Expand Down Expand Up @@ -347,7 +354,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
canContainDroppedFiles = true;
} else if (!deletePaths.isEmpty()) {
} else if (!filesToDelete.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -372,8 +379,14 @@ private boolean manifestHasDeletedFiles(

for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();

// add path-based delete to set of files to be deleted
if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) {
filesToDelete.add(file);
}

boolean markedForDelete =
deletePaths.contains(file.path())
filesToDelete.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -409,8 +422,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
Set<F> deleted = Sets.newTreeSet(Comparators.contentFile());

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -421,7 +433,7 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
filesToDelete.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -441,18 +453,16 @@ private ManifestFile filterManifestWithDeletedFiles(
if (allRowsMatch) {
writer.delete(entry);

CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
if (deleted.contains(entry.file())) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
wrapper.get());
entry.file().path());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
deletedPaths.add(wrapper);
deleted.add(entry.file().copyWithoutStats());
}
} else {
writer.existing(entry);
Expand All @@ -472,7 +482,7 @@ private ManifestFile filterManifestWithDeletedFiles(

// update caches
filteredManifests.put(manifest, filtered);
filteredManifestToDeletedFiles.put(filtered, deletedFiles);
filteredManifestToDeletedFiles.put(filtered, deleted);

return filtered;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ public void compareDataFiles() {
assertThat(Comparators.dataFile().compare(FILE_B, FILE_A)).isGreaterThan(0);
}

@Test
public void compareDataFilesWithContentFileComparator() {
// path is same as FILE_A
DataFile dataFile =
DataFiles.builder(SPEC)
.withPath(FILE_A.path().toString())
.withFileSizeInBytes(100)
.withRecordCount(100)
.build();
assertThat(Comparators.contentFile().compare(FILE_A, dataFile)).isEqualTo(0);
assertThat(Comparators.contentFile().compare(FILE_A, FILE_A)).isEqualTo(0);

assertThat(Comparators.contentFile().compare(FILE_A, FILE_B)).isLessThan(0);
assertThat(Comparators.contentFile().compare(FILE_B, FILE_A)).isGreaterThan(0);

// technically using contentFile comparator allows to compare data & delete files, even though
// that won't be used
assertThat(Comparators.contentFile().compare(FILE_A, FILE_A_DELETES)).isGreaterThan(0);
}

@Test
public void compareDeleteFiles() {
// same path as FILE_A_DELETES
Expand All @@ -64,4 +84,24 @@ public void compareDeleteFiles() {
assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0);
assertThat(Comparators.deleteFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0);
}

@Test
public void compareDeleteFilesWithContentFileComparator() {
// same path as FILE_A_DELETES
DeleteFile deleteFile =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath(FILE_A_DELETES.path().toString())
.withFileSizeInBytes(100)
.withRecordCount(100)
.build();
assertThat(Comparators.contentFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0);
assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0);

assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0);
assertThat(Comparators.contentFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0);

assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0);
assertThat(Comparators.contentFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0);
}
}

0 comments on commit 73638a6

Please sign in to comment.