Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Preserve content offset and size during manifest rewrites #11469

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class SparkContentFile<F> implements ContentFile<F> {
private final int fileSpecIdPosition;
private final int equalityIdsPosition;
private final int referencedDataFilePosition;
private final int contentOffsetPosition;
private final int contentSizePosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
Expand Down Expand Up @@ -105,6 +107,8 @@ public abstract class SparkContentFile<F> implements ContentFile<F> {
this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
this.referencedDataFilePosition = positions.get(DataFile.REFERENCED_DATA_FILE.name());
this.contentOffsetPosition = positions.get(DataFile.CONTENT_OFFSET.name());
this.contentSizePosition = positions.get(DataFile.CONTENT_SIZE.name());
}

public F wrap(Row row) {
Expand Down Expand Up @@ -240,6 +244,20 @@ public String referencedDataFile() {
return wrapped.getString(referencedDataFilePosition);
}

public Long contentOffset() {
if (wrapped.isNullAt(contentOffsetPosition)) {
return null;
}
return wrapped.getLong(contentOffsetPosition);
}

public Long contentSizeInBytes() {
if (wrapped.isNullAt(contentSizePosition)) {
return null;
}
return wrapped.getLong(contentSizePosition);
}

private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public static Object[] parameters() {
new Object[] {"true", "true", false, 1},
new Object[] {"false", "true", true, 1},
new Object[] {"true", "false", false, 2},
new Object[] {"false", "false", false, 2}
new Object[] {"false", "false", false, 2},
new Object[] {"false", "false", false, 3}
};
}

Expand Down Expand Up @@ -150,16 +151,16 @@ public void testRewriteManifestsPreservesOptionalFields() throws IOException {
.appendFile(dataFile3)
.commit();

DeleteFile deleteFile1 = newDeleteFileWithRef(table, dataFile1);
assertThat(deleteFile1.referencedDataFile()).isEqualTo(dataFile1.location());
DeleteFile deleteFile1 = newDeletes(table, dataFile1);
assertDeletes(dataFile1, deleteFile1);
table.newRowDelta().addDeletes(deleteFile1).commit();

DeleteFile deleteFile2 = newDeleteFileWithRef(table, dataFile2);
assertThat(deleteFile2.referencedDataFile()).isEqualTo(dataFile2.location());
DeleteFile deleteFile2 = newDeletes(table, dataFile2);
assertDeletes(dataFile2, deleteFile2);
table.newRowDelta().addDeletes(deleteFile2).commit();

DeleteFile deleteFile3 = newDeleteFileWithRef(table, dataFile3);
assertThat(deleteFile3.referencedDataFile()).isEqualTo(dataFile3.location());
DeleteFile deleteFile3 = newDeletes(table, dataFile3);
assertDeletes(dataFile3, deleteFile3);
table.newRowDelta().addDeletes(deleteFile3).commit();

SparkActions actions = SparkActions.get();
Expand All @@ -178,10 +179,13 @@ public void testRewriteManifestsPreservesOptionalFields() throws IOException {
DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes());
if (dataFile.location().equals(dataFile1.location())) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile());
assertEqual(deleteFile, deleteFile1);
} else if (dataFile.location().equals(dataFile2.location())) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
assertEqual(deleteFile, deleteFile2);
} else {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile3.referencedDataFile());
assertEqual(deleteFile, deleteFile3);
}
}
}
Expand Down Expand Up @@ -1035,10 +1039,18 @@ private DataFiles.Builder newDataFileBuilder(Table table) {
.withRecordCount(1);
}

private DeleteFile newDeletes(Table table, DataFile dataFile) {
return formatVersion >= 3 ? newDV(table, dataFile) : newDeleteFileWithRef(table, dataFile);
}

private DeleteFile newDeleteFileWithRef(Table table, DataFile dataFile) {
return FileGenerationUtil.generatePositionDeleteFileWithRef(table, dataFile);
}

private DeleteFile newDV(Table table, DataFile dataFile) {
return FileGenerationUtil.generateDV(table, dataFile);
}

private DeleteFile newDeleteFile(Table table, String partitionPath) {
return FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
Expand Down Expand Up @@ -1097,4 +1109,26 @@ private DeleteFile writeEqDeletes(Table table, StructLike partition, String key,
OutputFile outputFile = Files.localOutput(File.createTempFile("junit", null, temp.toFile()));
return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes, deleteSchema);
}

private void assertDeletes(DataFile dataFile, DeleteFile deleteFile) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(dataFile.location());
if (formatVersion >= 3) {
assertThat(deleteFile.contentOffset()).isNotNull();
assertThat(deleteFile.contentSizeInBytes()).isNotNull();
} else {
assertThat(deleteFile.contentOffset()).isNull();
assertThat(deleteFile.contentSizeInBytes()).isNull();
}
}

private void assertEqual(DeleteFile deleteFile1, DeleteFile deleteFile2) {
assertThat(deleteFile1.location()).isEqualTo(deleteFile2.location());
assertThat(deleteFile1.content()).isEqualTo(deleteFile2.content());
assertThat(deleteFile1.specId()).isEqualTo(deleteFile2.specId());
assertThat(deleteFile1.partition()).isEqualTo(deleteFile2.partition());
assertThat(deleteFile1.format()).isEqualTo(deleteFile2.format());
assertThat(deleteFile1.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
assertThat(deleteFile1.contentOffset()).isEqualTo(deleteFile2.contentOffset());
assertThat(deleteFile1.contentSizeInBytes()).isEqualTo(deleteFile2.contentSizeInBytes());
}
}