Skip to content

Commit

Permalink
Core: Switch usage to DataFileSet / DeleteFileSet (#11158)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Oct 14, 2024
1 parent 919387f commit 6a5ae1a
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 72 deletions.
5 changes: 2 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
*/
package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final DataFileSet deletedDataFiles = DataFileSet.create();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final DataFileSet replacedDataFiles = DataFileSet.create();
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -43,8 +43,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final TableOperations ops;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final DataFileSet newFiles = DataFileSet.create();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,9 +85,8 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFiles.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
}

Expand Down
58 changes: 36 additions & 22 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -39,9 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -71,9 +70,9 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final Set<F> deleteFiles = newFileSet();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
Expand Down Expand Up @@ -102,6 +101,8 @@ protected ManifestFilterManager(

protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);

protected abstract Set<F> newFileSet();

protected void failAnyDelete() {
this.failAnyDelete = true;
}
Expand Down Expand Up @@ -153,20 +154,20 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
deleteFiles.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

/** Add a specific path to be deleted in the new snapshot. */
void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.hasPathOnlyDeletes = true;
deletePaths.add(path);
}

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| !deleteFiles.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -233,23 +234,37 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
CharSequenceSet deletedFiles = deletedFiles(manifests);
Set<F> deletedFiles = deletedFiles(manifests);
ValidationException.check(
deletedFiles.containsAll(deleteFiles),
"Missing required files to delete: %s",
COMMA.join(
deleteFiles.stream()
.filter(f -> !deletedFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

CharSequenceSet deletedFilePaths =
deletedFiles.stream()
.map(ContentFile::path)
.collect(Collectors.toCollection(CharSequenceSet::empty));

ValidationException.check(
deletedFiles.containsAll(deletePaths),
deletedFilePaths.containsAll(deletePaths),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFilePaths.contains(path))));
}
}

private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
private Set<F> deletedFiles(ManifestFile[] manifests) {
Set<F> deletedFiles = newFileSet();

if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
deletedFiles.add(file);
}
}
}
Expand Down Expand Up @@ -345,9 +360,9 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
}

boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
if (!deletePaths.isEmpty()) {
canContainDroppedFiles = true;
} else if (!deletePaths.isEmpty()) {
} else if (!deleteFiles.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -374,6 +389,7 @@ private boolean manifestHasDeletedFiles(
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -387,7 +403,7 @@ private boolean manifestHasDeletedFiles(
|| isDelete, // ignore delete files where some records may not match the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
if (failAnyDelete) {
Expand All @@ -409,8 +425,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
Set<F> deletedFiles = newFileSet();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -422,6 +437,7 @@ private ManifestFile filterManifestWithDeletedFiles(
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -436,23 +452,21 @@ private ManifestFile filterManifestWithDeletedFiles(
// the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
writer.delete(entry);

CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
if (deletedFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
wrapper.get());
file.location());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
deletedPaths.add(wrapper);
deletedFiles.add(file.copyWithoutStats());
}
} else {
writer.existing(entry);
Expand Down
28 changes: 20 additions & 8 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
Expand Down Expand Up @@ -81,8 +83,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private final DataFileSet newDataFiles = DataFileSet.create();
private final DeleteFileSet newDeleteFiles = DeleteFileSet.create();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -234,7 +236,7 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
Expand All @@ -244,9 +246,9 @@ protected void add(DataFile file) {

addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
List<DataFile> dataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
newDataFiles.add(file);
dataFiles.add(file);
}
}

Expand All @@ -268,7 +270,7 @@ private void add(DeleteFileHolder fileHolder) {
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
Expand Down Expand Up @@ -970,9 +972,9 @@ private List<ManifestFile> newDataFilesAsManifests() {

if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
(dataSpec, dataFiles) -> {
List<ManifestFile> newDataManifests =
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec);
writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec);
cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
Expand Down Expand Up @@ -1032,6 +1034,11 @@ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec)
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newManifestReader(manifest);
}

@Override
protected Set<DataFile> newFileSet() {
return DataFileSet.create();
}
}

private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
Expand Down Expand Up @@ -1085,6 +1092,11 @@ protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpe
protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}

@Override
protected Set<DeleteFile> newFileSet() {
return DeleteFileSet.create();
}
}

private class DeleteFileMergeManager extends ManifestMergeManager<DeleteFile> {
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -567,17 +568,17 @@ protected boolean cleanupAfterCommit() {
return true;
}

protected List<ManifestFile> writeDataManifests(List<DataFile> files, PartitionSpec spec) {
protected List<ManifestFile> writeDataManifests(Collection<DataFile> files, PartitionSpec spec) {
return writeDataManifests(files, null /* inherit data seq */, spec);
}

protected List<ManifestFile> writeDataManifests(
List<DataFile> files, Long dataSeq, PartitionSpec spec) {
Collection<DataFile> files, Long dataSeq, PartitionSpec spec) {
return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, spec));
}

private List<ManifestFile> writeDataFileGroup(
List<DataFile> files, Long dataSeq, PartitionSpec spec) {
Collection<DataFile> files, Long dataSeq, PartitionSpec spec) {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);

try (RollingManifestWriter<DataFile> closableWriter = writer) {
Expand All @@ -594,12 +595,12 @@ private List<ManifestFile> writeDataFileGroup(
}

protected List<ManifestFile> writeDeleteManifests(
List<DeleteFileHolder> files, PartitionSpec spec) {
Collection<DeleteFileHolder> files, PartitionSpec spec) {
return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
}

private List<ManifestFile> writeDeleteFileGroup(
List<DeleteFileHolder> files, PartitionSpec spec) {
Collection<DeleteFileHolder> files, PartitionSpec spec) {
RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec);

try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
Expand All @@ -618,7 +619,7 @@ private List<ManifestFile> writeDeleteFileGroup(
}

private static <F> List<ManifestFile> writeManifests(
List<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
Collection<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size());
List<List<F>> groups = divide(files, parallelism);
Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
Expand All @@ -630,7 +631,8 @@ private static <F> List<ManifestFile> writeManifests(
return ImmutableList.copyOf(manifests);
}

private static <T> List<List<T>> divide(List<T> list, int groupCount) {
private static <T> List<List<T>> divide(Collection<T> collection, int groupCount) {
List<T> list = Lists.newArrayList(collection);
int groupSize = IntMath.divide(list.size(), groupCount, RoundingMode.CEILING);
return Lists.partition(list, groupSize);
}
Expand Down
Loading

0 comments on commit 6a5ae1a

Please sign in to comment.