Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data, Flink, Spark: Test deletes with format-version=3 #11538

Merged
merged 1 commit into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Data, Flink, MR, Spark: Test deletes with format-version=3
  • Loading branch information
nastra committed Nov 14, 2024
commit bccbc14f56c23a981a1eecfcbd75c80f21cde5db
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ public void testPartitionSpecEvolutionRemovalV2() {
.withPartitionPath("id=11")
.build();

DeleteFile delete10 = posDelete(table, data10);
DeleteFile delete11 = posDelete(table, data11);
DeleteFile delete10 = newDeletes(data10);
DeleteFile delete11 = newDeletes(data11);

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();
Expand Down Expand Up @@ -441,12 +441,6 @@ public void testPartitionSpecEvolutionRemovalV2() {
assertThat(tasks).hasSize(expectedScanTaskCount(3));
}

private DeleteFile posDelete(Table table, DataFile dataFile) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this addresses #11485 (comment)

return formatVersion >= 3
? FileGenerationUtil.generateDV(table, dataFile)
: FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
}

@TestTemplate
public void testPartitionSpecEvolutionAdditiveV1() {
assumeThat(formatVersion).isEqualTo(1);
Expand Down Expand Up @@ -537,8 +531,8 @@ public void testPartitionSpecEvolutionAdditiveV2AndAbove() {
.withPartitionPath("data_bucket=1/id=11")
.build();

DeleteFile delete10 = posDelete(table, data10);
DeleteFile delete11 = posDelete(table, data11);
DeleteFile delete10 = newDeletes(data10);
DeleteFile delete11 = newDeletes(data11);

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();
Expand Down
32 changes: 21 additions & 11 deletions data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.data;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -82,12 +83,16 @@ public abstract class DeleteReadTests {

@Parameter protected FileFormat format;

@Parameter(index = 1)
protected int formatVersion;

@Parameters(name = "fileFormat = {0}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET},
new Object[] {FileFormat.AVRO},
new Object[] {FileFormat.ORC}
new Object[] {FileFormat.PARQUET, 2},
new Object[] {FileFormat.AVRO, 2},
new Object[] {FileFormat.ORC, 2},
new Object[] {FileFormat.PARQUET, 3},
};
}

Expand Down Expand Up @@ -384,7 +389,8 @@ public void testPositionDeletes() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand All @@ -401,6 +407,10 @@ public void testPositionDeletes() throws IOException {

@TestTemplate
public void testMultiplePosDeleteFiles() throws IOException {
assumeThat(formatVersion)
.as("Can't write multiple delete files with formatVersion >= 3")
.isEqualTo(2);

List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 0L), // id = 29
Expand All @@ -412,25 +422,24 @@ public void testMultiplePosDeleteFiles() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 6L) // id = 122
);
deletes = Lists.newArrayList(Pair.of(dataFile.path(), 6L)); // id = 122

posDeletes =
FileHelpers.writeDeleteFile(
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -475,7 +484,8 @@ public void testMixedPositionAndEqualityDeletes() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TestGenericReaderDeletes extends DeleteReadTests {

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException {
return TestTables.create(tableDir, name, schema, spec, 2);
return TestTables.create(tableDir, name, schema, spec, formatVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Table table = catalog.createTable(TableIdentifier.of(databaseName, name), schema, spec, props);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
ops.commit(meta, meta.upgradeToFormatVersion(formatVersion));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Table table = catalog.createTable(TableIdentifier.of(databaseName, name), schema, spec, props);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
ops.commit(meta, meta.upgradeToFormatVersion(formatVersion));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Table table = catalog.createTable(TableIdentifier.of(databaseName, name), schema, spec, props);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
ops.commit(meta, meta.upgradeToFormatVersion(formatVersion));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,20 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
private final HadoopTables tables = new HadoopTables(conf);
private TestHelper helper;

@Parameter(index = 1)
@Parameter(index = 2)
private String inputFormat;

@Parameters(name = "fileFormat = {0}, inputFormat = {1}")
@Parameters(name = "fileFormat = {0}, formatVersion = {1}, inputFormat = {2}")
public static Object[][] parameters() {
return new Object[][] {
{FileFormat.PARQUET, "IcebergInputFormat"},
{FileFormat.AVRO, "IcebergInputFormat"},
{FileFormat.ORC, "IcebergInputFormat"},
{FileFormat.PARQUET, "MapredIcebergInputFormat"},
{FileFormat.AVRO, "MapredIcebergInputFormat"},
{FileFormat.ORC, "MapredIcebergInputFormat"},
{FileFormat.PARQUET, 2, "IcebergInputFormat"},
{FileFormat.AVRO, 2, "IcebergInputFormat"},
{FileFormat.ORC, 2, "IcebergInputFormat"},
{FileFormat.PARQUET, 2, "MapredIcebergInputFormat"},
{FileFormat.AVRO, 2, "MapredIcebergInputFormat"},
{FileFormat.ORC, 2, "MapredIcebergInputFormat"},
nastra marked this conversation as resolved.
Show resolved Hide resolved
{FileFormat.PARQUET, 3, "IcebergInputFormat"},
{FileFormat.PARQUET, 3, "MapredIcebergInputFormat"},
};
}

Expand All @@ -82,7 +84,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) thro

TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
ops.commit(meta, meta.upgradeToFormatVersion(formatVersion));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -99,16 +100,21 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
protected static SparkSession spark = null;
protected static HiveCatalog catalog = null;

@Parameter(index = 1)
@Parameter(index = 2)
private boolean vectorized;

@Parameters(name = "format = {0}, vectorized = {1}")
@Parameter(index = 3)
private PlanningMode planningMode;

@Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET, false},
new Object[] {FileFormat.PARQUET, true},
new Object[] {FileFormat.ORC, false},
new Object[] {FileFormat.AVRO, false}
new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL},
new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL},
new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL},
};
}

Expand Down Expand Up @@ -162,7 +168,13 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format.name()).commit();
table
.updateProperties()
.set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
.set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName())
.set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion))
.commit();
if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) {
String vectorizationEnabled =
format.equals(FileFormat.PARQUET)
Expand Down Expand Up @@ -342,7 +354,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -374,7 +387,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -450,7 +464,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -482,7 +497,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -604,7 +620,10 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio
Pair.of(dataFile.path(), 109L));
Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes);
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
deletes,
formatVersion);
tbl.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,21 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
protected static SparkSession spark = null;
protected static HiveCatalog catalog = null;

@Parameter(index = 1)
@Parameter(index = 2)
private boolean vectorized;

@Parameter(index = 2)
@Parameter(index = 3)
private PlanningMode planningMode;

@Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}")
@Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, true, PlanningMode.LOCAL},
new Object[] {FileFormat.ORC, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.AVRO, false, PlanningMode.LOCAL}
new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL},
new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL},
new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL},
};
}

Expand Down Expand Up @@ -171,6 +173,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
.set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
.set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName())
.set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion))
.commit();
if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) {
String vectorizationEnabled =
Expand Down Expand Up @@ -351,7 +354,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -383,7 +387,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -459,7 +464,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -491,7 +497,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -613,7 +620,10 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio
Pair.of(dataFile.path(), 109L));
Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes);
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
deletes,
formatVersion);
tbl.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
Expand Down
Loading