Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Rename DeleteFileHolder to PendingDeleteFile / Optimize duplicate data/delete file detection #11254

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
*/
package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final DataFileSet deletedDataFiles = DataFileSet.create();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final DataFileSet replacedDataFiles = DataFileSet.create();
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -43,8 +43,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final TableOperations ops;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final DataFileSet newFiles = DataFileSet.create();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,9 +85,8 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFiles.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
}

Expand Down Expand Up @@ -215,7 +213,7 @@ private List<ManifestFile> writeNewManifests() throws IOException {
}

if (newManifests == null && !newFiles.isEmpty()) {
this.newManifests = writeDataManifests(newFiles, spec);
this.newManifests = writeDataManifests(Lists.newArrayList(newFiles), spec);
hasNewFiles = false;
}

Expand Down
109 changes: 77 additions & 32 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -36,12 +37,12 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -71,6 +72,7 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final FilesToDeleteHolder filesToDelete = new FilesToDeleteHolder();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
Expand All @@ -83,7 +85,7 @@ public String partition() {
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
private final Map<ManifestFile, FilesToDeleteHolder> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();

private final Supplier<ExecutorService> workerPoolSupplier;
Expand Down Expand Up @@ -153,7 +155,7 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
filesToDelete.delete(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -167,6 +169,7 @@ void delete(CharSequence path) {

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| filesToDelete.containsDeletes()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -211,11 +214,10 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {

for (ManifestFile manifest : manifests) {
PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
summaryBuilder.deletedFile(manifestSpec, file);
}
manifestDeletes.dataFiles.forEach(file -> summaryBuilder.deletedFile(manifestSpec, file));
manifestDeletes.deleteFiles.forEach(file -> summaryBuilder.deletedFile(manifestSpec, file));
}
}

Expand All @@ -230,27 +232,20 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
*
* @param manifests a set of filtered manifests
*/
@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
CharSequenceSet deletedFiles = deletedFiles(manifests);
ValidationException.check(
deletedFiles.containsAll(deletePaths),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
deletedFiles(manifests).validateRequiredDeletes(filesToDelete);
}
}

private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();

private FilesToDeleteHolder deletedFiles(ManifestFile[] manifests) {
FilesToDeleteHolder deletedFiles = new FilesToDeleteHolder();
if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
}
deletedFiles.dataFiles.addAll(manifestDeletes.dataFiles);
deletedFiles.deleteFiles.addAll(manifestDeletes.deleteFiles);
}
}
}
Expand Down Expand Up @@ -347,7 +342,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
canContainDroppedFiles = true;
} else if (!deletePaths.isEmpty()) {
} else if (filesToDelete.containsDeletes()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -372,8 +367,14 @@ private boolean manifestHasDeletedFiles(

for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();

// add path-based delete to set of files to be deleted
if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) {
filesToDelete.delete(file);
}

boolean markedForDelete =
deletePaths.contains(file.path())
filesToDelete.markedForDelete(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -409,8 +410,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
FilesToDeleteHolder deletedFiles = new FilesToDeleteHolder();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -421,7 +421,7 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
filesToDelete.markedForDelete(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -436,23 +436,21 @@ private ManifestFile filterManifestWithDeletedFiles(
// the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
writer.delete(entry);

CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
if (deletedFiles.markedForDelete(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
wrapper.get());
file.location());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
deletedPaths.add(wrapper);
deletedFiles.delete(file.copyWithoutStats());
}
} else {
writer.existing(entry);
Expand Down Expand Up @@ -533,4 +531,51 @@ private Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> metricsEvaluator
return metricsEvaluators.get(partition);
}
}

private class FilesToDeleteHolder {
private final DataFileSet dataFiles = DataFileSet.create();
private final DeleteFileSet deleteFiles = DeleteFileSet.create();

private FilesToDeleteHolder() {}

private void delete(F file) {
if (file instanceof DataFile) {
dataFiles.add((DataFile) file);
} else {
deleteFiles.add((DeleteFile) file);
}
}

private boolean containsDeletes() {
return !dataFiles.isEmpty() || !deleteFiles.isEmpty();
}

private boolean markedForDelete(F file) {
if (file instanceof DataFile) {
return dataFiles.contains((DataFile) file);
} else {
return deleteFiles.contains((DeleteFile) file);
}
}

private void validateRequiredDeletes(FilesToDeleteHolder filesToBeDeleted) {
ValidationException.check(
dataFiles.containsAll(filesToBeDeleted.dataFiles),
"Missing required files to delete: %s",
COMMA.join(
filesToBeDeleted.dataFiles.stream()
.filter(f -> !dataFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

ValidationException.check(
deleteFiles.containsAll(filesToBeDeleted.deleteFiles),
"Missing required files to delete: %s",
COMMA.join(
filesToBeDeleted.deleteFiles.stream()
.filter(f -> !deleteFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));
}
}
}
Loading