Skip to content

Commit

Permalink
Core: Switch all places to use DataFileSet/DeleteFileSet
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 30, 2024
1 parent 25c5a82 commit a2688b8
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 73 deletions.
5 changes: 2 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
*/
package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final DataFileSet deletedDataFiles = DataFileSet.create();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final DataFileSet replacedDataFiles = DataFileSet.create();
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -43,8 +43,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final TableOperations ops;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final DataFileSet newFiles = DataFileSet.create();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,9 +85,8 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFiles.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
}

Expand Down Expand Up @@ -215,7 +213,7 @@ private List<ManifestFile> writeNewManifests() throws IOException {
}

if (newManifests == null && !newFiles.isEmpty()) {
this.newManifests = writeDataManifests(newFiles, spec);
this.newManifests = writeDataManifests(Lists.newArrayList(newFiles), spec);
hasNewFiles = false;
}

Expand Down
107 changes: 76 additions & 31 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -36,12 +37,12 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -71,6 +72,8 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final DataFileSet dataFilesToDelete = DataFileSet.create();
private final DeleteFileSet deleteFilesToDelete = DeleteFileSet.create();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
Expand All @@ -83,7 +86,7 @@ public String partition() {
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
private final Map<ManifestFile, FilesToDeleteHolder> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();

private final Supplier<ExecutorService> workerPoolSupplier;
Expand Down Expand Up @@ -153,7 +156,12 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
if (file instanceof DataFile) {
dataFilesToDelete.add((DataFile) file);
} else {
deleteFilesToDelete.add((DeleteFile) file);
}

deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -167,6 +175,8 @@ void delete(CharSequence path) {

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| !dataFilesToDelete.isEmpty()
|| !deleteFilesToDelete.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -211,11 +221,10 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {

for (ManifestFile manifest : manifests) {
PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
summaryBuilder.deletedFile(manifestSpec, file);
}
manifestDeletes.dataFiles.forEach(file -> summaryBuilder.deletedFile(manifestSpec, file));
manifestDeletes.deleteFiles.forEach(file -> summaryBuilder.deletedFile(manifestSpec, file));
}
}

Expand All @@ -230,32 +239,43 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
*
* @param manifests a set of filtered manifests
*/
@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
CharSequenceSet deletedFiles = deletedFiles(manifests);
Pair<DataFileSet, DeleteFileSet> deletedFiles = deletedFiles(manifests);

ValidationException.check(
deletedFiles.containsAll(deletePaths),
deletedFiles.first().containsAll(dataFilesToDelete),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
COMMA.join(
dataFilesToDelete.stream()
.filter(f -> !deletedFiles.first().contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

ValidationException.check(
deletedFiles.second().containsAll(deleteFilesToDelete),
"Missing required files to delete: %s",
COMMA.join(
deleteFilesToDelete.stream()
.filter(f -> !deletedFiles.second().contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));
}
}

private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();

private Pair<DataFileSet, DeleteFileSet> deletedFiles(ManifestFile[] manifests) {
Pair<DataFileSet, DeleteFileSet> result = Pair.of(DataFileSet.create(), DeleteFileSet.create());
if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
}
manifestDeletes.dataFiles.forEach(result.first()::add);
manifestDeletes.deleteFiles.forEach(result.second()::add);
}
}
}

return deletedFiles;
return result;
}

/**
Expand Down Expand Up @@ -347,7 +367,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
canContainDroppedFiles = true;
} else if (!deletePaths.isEmpty()) {
} else if (!dataFilesToDelete.isEmpty() || !deleteFilesToDelete.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -372,8 +392,19 @@ private boolean manifestHasDeletedFiles(

for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();

// add path-based delete to set of files to be deleted
if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) {
if (file instanceof DataFile) {
dataFilesToDelete.add((DataFile) file);
} else {
deleteFilesToDelete.add((DeleteFile) file);
}
}

boolean markedForDelete =
deletePaths.contains(file.path())
dataFilesToDelete.contains(file)
|| deleteFilesToDelete.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -409,8 +440,8 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
DataFileSet deletedDataFiles = DataFileSet.create();
DeleteFileSet deletedDeleteFiles = DeleteFileSet.create();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -421,7 +452,8 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
dataFilesToDelete.contains(file)
|| deleteFilesToDelete.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -441,18 +473,20 @@ private ManifestFile filterManifestWithDeletedFiles(
if (allRowsMatch) {
writer.delete(entry);

CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
if (deletedDataFiles.contains(file) || deletedDeleteFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
wrapper.get());
file.path());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
deletedPaths.add(wrapper);
if (file instanceof DataFile) {
deletedDataFiles.add((DataFile) file.copyWithoutStats());
} else {
deletedDeleteFiles.add((DeleteFile) file.copyWithoutStats());
}
}
} else {
writer.existing(entry);
Expand All @@ -472,7 +506,8 @@ private ManifestFile filterManifestWithDeletedFiles(

// update caches
filteredManifests.put(manifest, filtered);
filteredManifestToDeletedFiles.put(filtered, deletedFiles);
filteredManifestToDeletedFiles.put(
filtered, new FilesToDeleteHolder(deletedDataFiles, deletedDeleteFiles));

return filtered;

Expand Down Expand Up @@ -533,4 +568,14 @@ private Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> metricsEvaluator
return metricsEvaluators.get(partition);
}
}

private static class FilesToDeleteHolder {
private final DataFileSet dataFiles;
private final DeleteFileSet deleteFiles;

private FilesToDeleteHolder(DataFileSet dataFiles, DeleteFileSet deleteFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
}
}
}
18 changes: 10 additions & 8 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
Expand Down Expand Up @@ -81,8 +83,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private final DataFileSet newDataFiles = DataFileSet.create();
private final DeleteFileSet newDeleteFiles = DeleteFileSet.create();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -234,7 +236,7 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
Expand All @@ -244,9 +246,9 @@ protected void add(DataFile file) {

addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
List<DataFile> dataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
newDataFiles.add(file);
dataFiles.add(file);
}
}

Expand All @@ -268,7 +270,7 @@ private void add(DeleteFileHolder fileHolder) {
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
Expand Down Expand Up @@ -970,9 +972,9 @@ private List<ManifestFile> newDataFilesAsManifests() {

if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
(dataSpec, dataFiles) -> {
List<ManifestFile> newDataManifests =
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec);
writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec);
cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,8 +72,8 @@ public RewriteDataFilesCommitManager(
* @param fileGroups fileSets to commit
*/
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
Set<DataFile> rewrittenDataFiles = Sets.newHashSet();
Set<DataFile> addedDataFiles = Sets.newHashSet();
DataFileSet rewrittenDataFiles = DataFileSet.create();
DataFileSet addedDataFiles = DataFileSet.create();
for (RewriteFileGroup group : fileGroups) {
rewrittenDataFiles.addAll(group.rewrittenFiles());
addedDataFiles.addAll(group.addedFiles());
Expand Down
Loading

0 comments on commit a2688b8

Please sign in to comment.