Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Oct 9, 2024
1 parent 8bbab8e commit ae3bcac
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
38 changes: 24 additions & 14 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ManifestFileUtil;
Expand Down Expand Up @@ -75,7 +75,6 @@ public String partition() {
private final FilesToDeleteHolder filesToDelete = new FilesToDeleteHolder();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
Expand Down Expand Up @@ -163,7 +162,6 @@ void delete(F file) {
void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.hasPathOnlyDeletes = true;
deletePaths.add(path);
}

Expand Down Expand Up @@ -234,7 +232,9 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
*/
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
deletedFiles(manifests).validateRequiredDeletes(filesToDelete);
FilesToDeleteHolder deletedFiles = deletedFiles(manifests);
deletedFiles.validateRequiredDeletes(filesToDelete);
deletedFiles.validateRequiredDeletes(deletePaths);
}
}

Expand Down Expand Up @@ -340,7 +340,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
}

boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
if (!deletePaths.isEmpty()) {
canContainDroppedFiles = true;
} else if (filesToDelete.containsDeletes()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
Expand Down Expand Up @@ -368,13 +368,9 @@ private boolean manifestHasDeletedFiles(
for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();

// add path-based delete to set of files to be deleted
if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) {
filesToDelete.delete(file);
}

boolean markedForDelete =
filesToDelete.markedForDelete(file)
deletePaths.contains(file.path())
|| filesToDelete.markedForDelete(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -388,7 +384,7 @@ private boolean manifestHasDeletedFiles(
|| isDelete, // ignore delete files where some records may not match the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
if (failAnyDelete) {
Expand Down Expand Up @@ -421,7 +417,8 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
filesToDelete.markedForDelete(file)
deletePaths.contains(file.path())
|| filesToDelete.markedForDelete(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -441,7 +438,8 @@ private ManifestFile filterManifestWithDeletedFiles(
if (allRowsMatch) {
writer.delete(entry);

if (deletedFiles.markedForDelete(file)) {
if (deletePaths.contains(file.path())
|| deletedFiles.markedForDelete(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
Expand Down Expand Up @@ -577,5 +575,17 @@ private void validateRequiredDeletes(FilesToDeleteHolder filesToBeDeleted) {
.map(ContentFile::location)
.collect(Collectors.toList())));
}

@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(CharSequenceSet filePathsToBeDeleted) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
dataFiles.stream().map(ContentFile::path).forEach(deletedFiles::add);
deleteFiles.stream().map(ContentFile::path).forEach(deletedFiles::add);

ValidationException.check(
deletedFiles.containsAll(filePathsToBeDeleted),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(filePathsToBeDeleted, path -> !deletedFiles.contains(path))));
}
}
}
15 changes: 14 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,20 @@ public void testDeleteValidateFileExistence() {

assertThatThrownBy(
() -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch))
.isInstanceOf(ValidationException.class);
.isInstanceOf(ValidationException.class)
.hasMessage("Missing required files to delete: /path/to/data-b.parquet");

assertThatThrownBy(
() ->
commit(
table,
table
.newDelete()
.deleteFile("/path/to/non-existing.parquet")
.validateFilesExist(),
branch))
.isInstanceOf(ValidationException.class)
.hasMessage("Missing required files to delete: /path/to/non-existing.parquet");
}

@TestTemplate
Expand Down

0 comments on commit ae3bcac

Please sign in to comment.