Skip to content

Commit

Permalink
Core: Prevent duplicate data/delete files (apache#10007)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Jun 20, 2024
1 parent 10d7ab1 commit 23a578e
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 33 deletions.
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -43,6 +44,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -83,9 +85,13 @@ protected Map<String, String> summary() {

@Override
public FastAppend appendFile(DataFile file) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
}

return this;
}

Expand Down
21 changes: 14 additions & 7 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -220,10 +222,12 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
hasNewDataFiles = true;
newDataFiles.add(file);
if (newDataFilePaths.add(file.path())) {
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
hasNewDataFiles = true;
newDataFiles.add(file);
}
}

/** Add a delete file to the new snapshot. */
Expand All @@ -243,9 +247,12 @@ private void add(DeleteFileHolder fileHolder) {
PartitionSpec fileSpec = ops.current().spec(specId);
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
}
}

private void setDataSpec(DataFile file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public void fromSnapshotInclusiveWithTag() {
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();

String tagSnapshotBName = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();

/*
files:FILE_A files:FILE_B FILE_B files:FILE_C FILE_C
files:FILE_A files:FILE_B FILE_C files:FILE_D FILE_A2
---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
*/
IncrementalAppendScan scan = newScan().fromSnapshotInclusive(tagSnapshotAName);
Expand Down Expand Up @@ -111,11 +111,11 @@ public void testUseBranch() {
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();

String tagName2 = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotMainBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit();

table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();

table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotBranchBId = table.snapshot(branchName).snapshotId();
Expand All @@ -125,7 +125,7 @@ public void testUseBranch() {

/*
files:FILE_A files:FILE_B FILE_B files:FILE_B FILE_B
files:FILE_A files:FILE_B FILE_C files:FILE_D FILE_A2
---- snapshotA(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
\
\
Expand Down Expand Up @@ -175,21 +175,21 @@ public void testUseBranchWithInvalidSnapshotShouldFail() {
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();

table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotMainBId = table.currentSnapshot().snapshotId();

String branchName = "b1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
table.newFastAppend().appendFile(FILE_D).toBranch(branchName).commit();
long snapshotBranchBId = table.snapshot(branchName).snapshotId();

/*
files:FILE_A files:FILE_B FILE_B
files:FILE_A files:FILE_B FILE_C
---- snapshotA ------ snapshotMainB
\
\
\files:FILE_C
\files:FILE_D
snapshotBranchB(branch:b1)
*/
assertThatThrownBy(
Expand Down Expand Up @@ -267,13 +267,13 @@ public void testFromSnapshotExclusiveWithTag() {
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();

String tagSnapshotBName = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
table.newFastAppend().appendFile(FILE_D).appendFile(FILE_A2).commit();

/*
files:FILE_A files:FILE_B FILE_B files:FILE_C FILE_C
files:FILE_A files:FILE_B FILE_C files:FILE_D FILE_A2
---- snapshotAId(tag:t1) ---- snapshotMainB(tag:t2) ---- currentSnapshot
*/
IncrementalAppendScan scan = newScan().fromSnapshotExclusive(tagSnapshotAName);
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ protected static List<Object> parameters() {
return Arrays.asList(1, 2);
}

@TestTemplate
public void appendNullFile() {
assertThatThrownBy(() -> table.newFastAppend().appendFile(null).commit())
.isInstanceOf(NullPointerException.class)
.hasMessage("Invalid data file: null");
}

@TestTemplate
public void testEmptyTableAppend() {
assertThat(listManifestFiles()).isEmpty();
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ protected static List<Object> parameters() {
new Object[] {2, "testBranch"});
}

@TestTemplate
public void appendNullFile() {
assertThatThrownBy(() -> table.newAppend().appendFile(null).commit())
.isInstanceOf(NullPointerException.class)
.hasMessage("Invalid data file: null");
}

@TestTemplate
public void testEmptyTableAppend() {
assertThat(listManifestFiles()).isEmpty();
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ public void testDeleteWithDuplicateEntriesInManifest() {

validateManifestEntries(
pending.allManifests(table.io()).get(1),
ids(pendingId, pendingId, baseSnapshotId),
files(FILE_A, FILE_A, FILE_B),
statuses(DELETED, DELETED, EXISTING));
ids(pendingId, baseSnapshotId),
files(FILE_A, FILE_B),
statuses(DELETED, EXISTING));

// We should only get the 3 manifests that this test is expected to add.
assertThat(listManifestFiles()).hasSize(3);
Expand Down
Loading

0 comments on commit 23a578e

Please sign in to comment.