Skip to content

Commit

Permalink
Core: Add ContentFile comparator
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 18, 2024
1 parent 40ffcb9 commit 10150bf
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 29 deletions.
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.UnicodeUtil;
Expand Down Expand Up @@ -179,6 +180,10 @@ public static Comparator<CharSequence> filePath() {
return FilePathComparator.INSTANCE;
}

public static Comparator<ContentFile<?>> contentFile() {
return ContentFileComparator.INSTANCE;
}

private static class NullsFirst<T> implements Comparator<T> {
private static final NullsFirst<?> INSTANCE = new NullsFirst<>();

Expand Down Expand Up @@ -394,4 +399,15 @@ public int compare(CharSequence s1, CharSequence s2) {
return 0;
}
}

private static class ContentFileComparator implements Comparator<ContentFile<?>> {
private static final ContentFileComparator INSTANCE = new ContentFileComparator();

private ContentFileComparator() {}

@Override
public int compare(ContentFile<?> s1, ContentFile<?> s2) {
return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final Set<DataFile> deletedDataFiles = Sets.newTreeSet(Comparators.contentFile());
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final Set<DataFile> replacedDataFiles = Sets.newTreeSet(Comparators.contentFile());
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -44,7 +45,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final Set<DataFile> newFileSet = Sets.newTreeSet(Comparators.contentFile());
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,7 +87,7 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFileSet.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -81,8 +82,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private final Set<DataFile> newDataFiles = Sets.newTreeSet(Comparators.contentFile());
private final Set<DeleteFile> newDeleteFiles = Sets.newTreeSet(Comparators.contentFile());
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -234,7 +235,7 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
Expand All @@ -244,9 +245,9 @@ protected void add(DataFile file) {

addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
List<DataFile> dataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
newDataFiles.add(file);
dataFiles.add(file);
}
}

Expand All @@ -268,7 +269,7 @@ private void add(DeleteFileHolder fileHolder) {
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
Expand Down Expand Up @@ -970,9 +971,9 @@ private List<ManifestFile> newDataFilesAsManifests() {

if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
(dataSpec, dataFiles) -> {
List<ManifestFile> newDataManifests =
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec);
writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec);
cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,8 +74,8 @@ public RewriteDataFilesCommitManager(
* @param fileGroups fileSets to commit
*/
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
Set<DataFile> rewrittenDataFiles = Sets.newHashSet();
Set<DataFile> addedDataFiles = Sets.newHashSet();
Set<DataFile> rewrittenDataFiles = Sets.newTreeSet(Comparators.contentFile());
Set<DataFile> addedDataFiles = Sets.newTreeSet(Comparators.contentFile());
for (RewriteFileGroup group : fileGroups) {
rewrittenDataFiles.addAll(group.rewrittenFiles());
addedDataFiles.addAll(group.addedFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -29,6 +28,8 @@
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* Container class representing a set of files to be rewritten by a RewriteAction and the new files
Expand All @@ -38,7 +39,7 @@ public class RewriteFileGroup {
private final FileGroupInfo info;
private final List<FileScanTask> fileScanTasks;

private Set<DataFile> addedFiles = Collections.emptySet();
private Set<DataFile> addedFiles = Sets.newTreeSet(Comparators.contentFile());

public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> fileScanTasks) {
this.info = info;
Expand All @@ -58,7 +59,9 @@ public void setOutputFiles(Set<DataFile> files) {
}

public Set<DataFile> rewrittenFiles() {
return fileScans().stream().map(FileScanTask::file).collect(Collectors.toSet());
return fileScans().stream()
.map(FileScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
}

public Set<DataFile> addedFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -30,6 +29,8 @@
import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* Container class representing a set of position delete files to be rewritten by a {@link
Expand All @@ -40,7 +41,7 @@ public class RewritePositionDeletesGroup {
private final List<PositionDeletesScanTask> tasks;
private final long maxRewrittenDataSequenceNumber;

private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();
private Set<DeleteFile> addedDeleteFiles = Sets.newTreeSet(Comparators.contentFile());

public RewritePositionDeletesGroup(FileGroupInfo info, List<PositionDeletesScanTask> tasks) {
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
Expand All @@ -67,7 +68,9 @@ public long maxRewrittenDataSequenceNumber() {
}

public Set<DeleteFile> rewrittenDeleteFiles() {
return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet());
return tasks().stream()
.map(PositionDeletesScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
}

public Set<DeleteFile> addedDeleteFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
Expand All @@ -36,11 +37,12 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.types.Comparators;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -148,7 +150,11 @@ public boolean useCommitCoordinator() {
@Override
public void commit(WriterCommitMessage[] messages) {
PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get();
coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages)));
coordinator.stageRewrite(
table,
fileSetId,
files(messages).stream()
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
import org.apache.iceberg.types.Comparators;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -491,7 +492,11 @@ private RewriteFiles(String fileSetID) {
@Override
public void commit(WriterCommitMessage[] messages) {
FileRewriteCoordinator coordinator = FileRewriteCoordinator.get();
coordinator.stageRewrite(table, fileSetID, ImmutableSet.copyOf(files(messages)));
coordinator.stageRewrite(
table,
fileSetID,
files(messages).stream()
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Comparators;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -93,7 +95,7 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException {
Set<DataFile> rewrittenFiles =
taskSetManager.fetchTasks(table, fileSetID).stream()
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toSet());
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
Expand Down Expand Up @@ -165,7 +167,7 @@ public void testSortRewrite() throws NoSuchTableException, IOException {
Set<DataFile> rewrittenFiles =
taskSetManager.fetchTasks(table, fileSetID).stream()
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toSet());
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
Expand Down Expand Up @@ -247,7 +249,7 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio
Set<DataFile> addedFiles =
fileSetIDs.stream()
.flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream())
.collect(Collectors.toSet());
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();

table.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
Expand Down Expand Up @@ -787,7 +788,7 @@ public static List<DataFile> dataFiles(Table table, String branch) {
}

public static Set<DeleteFile> deleteFiles(Table table) {
Set<DeleteFile> deleteFiles = Sets.newHashSet();
Set<DeleteFile> deleteFiles = Sets.newTreeSet(Comparators.contentFile());

for (FileScanTask task : table.newScan().planFiles()) {
deleteFiles.addAll(task.deletes());
Expand Down

0 comments on commit 10150bf

Please sign in to comment.