Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Switch usage to DataFileSet / DeleteFileSet #11158

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
*/
package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final DataFileSet deletedDataFiles = DataFileSet.create();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.DataFileSet;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final DataFileSet replacedDataFiles = DataFileSet.create();
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -43,8 +43,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final TableOperations ops;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final DataFileSet newFiles = DataFileSet.create();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,9 +85,8 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFiles.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
}

Expand Down
58 changes: 36 additions & 22 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -39,9 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -71,9 +70,9 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final Set<F> deleteFiles = newFileSet();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
Expand Down Expand Up @@ -102,6 +101,8 @@ protected ManifestFilterManager(

protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);

protected abstract Set<F> newFileSet();

protected void failAnyDelete() {
this.failAnyDelete = true;
}
Expand Down Expand Up @@ -153,20 +154,20 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
deleteFiles.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

/** Add a specific path to be deleted in the new snapshot. */
void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.hasPathOnlyDeletes = true;
deletePaths.add(path);
}

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| !deleteFiles.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -233,23 +234,37 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
CharSequenceSet deletedFiles = deletedFiles(manifests);
Set<F> deletedFiles = deletedFiles(manifests);
ValidationException.check(
deletedFiles.containsAll(deleteFiles),
"Missing required files to delete: %s",
COMMA.join(
deleteFiles.stream()
.filter(f -> !deletedFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

CharSequenceSet deletedFilePaths =
deletedFiles.stream()
.map(ContentFile::path)
.collect(Collectors.toCollection(CharSequenceSet::empty));

ValidationException.check(
deletedFiles.containsAll(deletePaths),
deletedFilePaths.containsAll(deletePaths),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFilePaths.contains(path))));
}
}

private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
private Set<F> deletedFiles(ManifestFile[] manifests) {
Set<F> deletedFiles = newFileSet();

if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
deletedFiles.add(file);
}
}
}
Expand Down Expand Up @@ -345,9 +360,9 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
}

boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
if (!deletePaths.isEmpty()) {
canContainDroppedFiles = true;
} else if (!deletePaths.isEmpty()) {
} else if (!deleteFiles.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -374,6 +389,7 @@ private boolean manifestHasDeletedFiles(
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -387,7 +403,7 @@ private boolean manifestHasDeletedFiles(
|| isDelete, // ignore delete files where some records may not match the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
if (failAnyDelete) {
Expand All @@ -409,8 +425,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
Set<F> deletedFiles = newFileSet();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -422,6 +437,7 @@ private ManifestFile filterManifestWithDeletedFiles(
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -436,23 +452,21 @@ private ManifestFile filterManifestWithDeletedFiles(
// the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
writer.delete(entry);

CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
if (deletedFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
wrapper.get());
file.location());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
deletedPaths.add(wrapper);
deletedFiles.add(file.copyWithoutStats());
}
} else {
writer.existing(entry);
Expand Down
28 changes: 20 additions & 8 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
Expand Down Expand Up @@ -81,8 +83,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private final DataFileSet newDataFiles = DataFileSet.create();
private final DeleteFileSet newDeleteFiles = DeleteFileSet.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these extra collections? Can't we use sets in newDataFilesBySpec and newDeleteFilesBySpec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm handling this already in 4d42f18. I just didn't want to introduce too many changes/refactorings as the PR is already quite large

private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -234,7 +236,7 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
Expand All @@ -244,9 +246,9 @@ protected void add(DataFile file) {

addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
List<DataFile> dataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
newDataFiles.add(file);
dataFiles.add(file);
}
}

Expand All @@ -268,7 +270,7 @@ private void add(DeleteFileHolder fileHolder) {
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
Expand Down Expand Up @@ -970,9 +972,9 @@ private List<ManifestFile> newDataFilesAsManifests() {

if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
(dataSpec, dataFiles) -> {
List<ManifestFile> newDataManifests =
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec);
writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec);
cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
Expand Down Expand Up @@ -1032,6 +1034,11 @@ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec)
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newManifestReader(manifest);
}

@Override
protected Set<DataFile> newFileSet() {
return DataFileSet.create();
}
}

private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
Expand Down Expand Up @@ -1085,6 +1092,11 @@ protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpe
protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}

@Override
protected Set<DeleteFile> newFileSet() {
return DeleteFileSet.create();
}
}

private class DeleteFileMergeManager extends ManifestMergeManager<DeleteFile> {
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -567,17 +568,17 @@ protected boolean cleanupAfterCommit() {
return true;
}

protected List<ManifestFile> writeDataManifests(List<DataFile> files, PartitionSpec spec) {
protected List<ManifestFile> writeDataManifests(Collection<DataFile> files, PartitionSpec spec) {
return writeDataManifests(files, null /* inherit data seq */, spec);
}

protected List<ManifestFile> writeDataManifests(
List<DataFile> files, Long dataSeq, PartitionSpec spec) {
Collection<DataFile> files, Long dataSeq, PartitionSpec spec) {
return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, spec));
}

private List<ManifestFile> writeDataFileGroup(
List<DataFile> files, Long dataSeq, PartitionSpec spec) {
Collection<DataFile> files, Long dataSeq, PartitionSpec spec) {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);

try (RollingManifestWriter<DataFile> closableWriter = writer) {
Expand All @@ -594,12 +595,12 @@ private List<ManifestFile> writeDataFileGroup(
}

protected List<ManifestFile> writeDeleteManifests(
List<DeleteFileHolder> files, PartitionSpec spec) {
Collection<DeleteFileHolder> files, PartitionSpec spec) {
return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
}

private List<ManifestFile> writeDeleteFileGroup(
List<DeleteFileHolder> files, PartitionSpec spec) {
Collection<DeleteFileHolder> files, PartitionSpec spec) {
RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec);

try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
Expand All @@ -618,7 +619,7 @@ private List<ManifestFile> writeDeleteFileGroup(
}

private static <F> List<ManifestFile> writeManifests(
List<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
Collection<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size());
List<List<F>> groups = divide(files, parallelism);
Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
Expand All @@ -630,7 +631,8 @@ private static <F> List<ManifestFile> writeManifests(
return ImmutableList.copyOf(manifests);
}

private static <T> List<List<T>> divide(List<T> list, int groupCount) {
private static <T> List<List<T>> divide(Collection<T> collection, int groupCount) {
List<T> list = Lists.newArrayList(collection);
int groupSize = IntMath.divide(list.size(), groupCount, RoundingMode.CEILING);
return Lists.partition(list, groupSize);
}
Expand Down
Loading