From bbd0810109e9fde1aea0fd330e78b9efb1b42453 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 5 Nov 2024 12:04:59 +0100 Subject: [PATCH] Data, MR, Spark: Test deletes with format-version=3 --- .../iceberg/TestMetadataTableFilters.java | 14 ++---- .../apache/iceberg/data/DeleteReadTests.java | 40 ++++++++++------- .../data/TestGenericReaderDeletes.java | 5 ++- .../source/TestFlinkReaderDeletesBase.java | 4 +- .../mr/TestInputFormatReaderDeletes.java | 23 +++++----- .../spark/source/TestSparkReaderDeletes.java | 43 +++++++++++++------ .../spark/source/TestSparkReaderDeletes.java | 34 +++++++++------ .../spark/source/TestSparkReaderDeletes.java | 40 ++++++++++------- 8 files changed, 125 insertions(+), 78 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java index 7c5a860db15f..0762d3b2dca4 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java @@ -396,8 +396,8 @@ public void testPartitionSpecEvolutionRemovalV2() { .withPartitionPath("id=11") .build(); - DeleteFile delete10 = posDelete(table, data10); - DeleteFile delete11 = posDelete(table, data11); + DeleteFile delete10 = newDeletes(data10); + DeleteFile delete11 = newDeletes(data11); table.newFastAppend().appendFile(data10).commit(); table.newFastAppend().appendFile(data11).commit(); @@ -441,12 +441,6 @@ public void testPartitionSpecEvolutionRemovalV2() { assertThat(tasks).hasSize(expectedScanTaskCount(3)); } - private DeleteFile posDelete(Table table, DataFile dataFile) { - return formatVersion >= 3 - ? FileGenerationUtil.generateDV(table, dataFile) - : FileGenerationUtil.generatePositionDeleteFile(table, dataFile); - } - @TestTemplate public void testPartitionSpecEvolutionAdditiveV1() { assumeThat(formatVersion).isEqualTo(1); @@ -537,8 +531,8 @@ public void testPartitionSpecEvolutionAdditiveV2AndAbove() { .withPartitionPath("data_bucket=1/id=11") .build(); - DeleteFile delete10 = posDelete(table, data10); - DeleteFile delete11 = posDelete(table, data11); + DeleteFile delete10 = newDeletes(data10); + DeleteFile delete11 = newDeletes(data11); table.newFastAppend().appendFile(data10).commit(); table.newFastAppend().appendFile(data11).commit(); diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 9d16da124062..8a73e1d9ef2e 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -19,6 +19,7 @@ package org.apache.iceberg.data; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -82,19 +83,23 @@ public abstract class DeleteReadTests { @Parameter protected FileFormat format; + @Parameter(index = 1) + protected int formatVersion; + @Parameters(name = "fileFormat = {0}") public static Object[][] parameters() { return new Object[][] { - new Object[] {FileFormat.PARQUET}, - new Object[] {FileFormat.AVRO}, - new Object[] {FileFormat.ORC} + new Object[] {FileFormat.PARQUET, 2}, + new Object[] {FileFormat.AVRO, 2}, + new Object[] {FileFormat.ORC, 2}, + new Object[] {FileFormat.PARQUET, 3}, }; } @BeforeEach public void writeTestDataFile() throws IOException { this.tableName = "test"; - this.table = createTable(tableName, SCHEMA, SPEC); + this.table = createTable(tableName, SCHEMA, SPEC, formatVersion); this.records = Lists.newArrayList(); // records all use IDs that are in bucket id_bucket=0 @@ -126,7 +131,7 @@ public void cleanup() throws IOException { private void initDateTable() throws IOException { dropTable("test2"); this.dateTableName = "test2"; - this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); + this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC, formatVersion); GenericRecord record = GenericRecord.create(dateTable.schema()); @@ -179,8 +184,8 @@ private void initDateTable() throws IOException { .commit(); } - protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) - throws IOException; + protected abstract Table createTable( + String name, Schema schema, PartitionSpec spec, int formatVersion) throws IOException; protected abstract void dropTable(String name) throws IOException; @@ -384,7 +389,8 @@ public void testPositionDeletes() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -401,6 +407,10 @@ public void testPositionDeletes() throws IOException { @TestTemplate public void testMultiplePosDeleteFiles() throws IOException { + assumeThat(formatVersion) + .as("Can't write multiple delete files with formatVersion >= 3") + .isEqualTo(2); + List> deletes = Lists.newArrayList( Pair.of(dataFile.path(), 0L), // id = 29 @@ -412,7 +422,8 @@ public void testMultiplePosDeleteFiles() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -420,17 +431,15 @@ public void testMultiplePosDeleteFiles() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - deletes = - Lists.newArrayList( - Pair.of(dataFile.path(), 6L) // id = 122 - ); + deletes = Lists.newArrayList(Pair.of(dataFile.path(), 6L)); // id = 122 posDeletes = FileHelpers.writeDeleteFile( table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -475,7 +484,8 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java index d7c70919015d..bd3f17e806f3 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java @@ -36,8 +36,9 @@ public class TestGenericReaderDeletes extends DeleteReadTests { @TempDir private File tableDir; @Override - protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException { - return TestTables.create(tableDir, name, schema, spec, 2); + protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion) + throws IOException { + return TestTables.create(tableDir, name, schema, spec, formatVersion); } @Override diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java index 0b5a8011ad3f..f136465b40eb 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkReaderDeletesBase.java @@ -66,14 +66,14 @@ public static void stopMetastore() throws Exception { } @Override - protected Table createTable(String name, Schema schema, PartitionSpec spec) { + protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion) { Map props = Maps.newHashMap(); props.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); Table table = catalog.createTable(TableIdentifier.of(databaseName, name), schema, spec, props); TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); - ops.commit(meta, meta.upgradeToFormatVersion(2)); + ops.commit(meta, meta.upgradeToFormatVersion(formatVersion)); return table; } diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java index 2cb41f11295c..b2f767e4a7f0 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java @@ -49,18 +49,20 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests { private final HadoopTables tables = new HadoopTables(conf); private TestHelper helper; - @Parameter(index = 1) + @Parameter(index = 2) private String inputFormat; - @Parameters(name = "fileFormat = {0}, inputFormat = {1}") + @Parameters(name = "fileFormat = {0}, formatVersion = {1}, inputFormat = {2}") public static Object[][] parameters() { return new Object[][] { - {FileFormat.PARQUET, "IcebergInputFormat"}, - {FileFormat.AVRO, "IcebergInputFormat"}, - {FileFormat.ORC, "IcebergInputFormat"}, - {FileFormat.PARQUET, "MapredIcebergInputFormat"}, - {FileFormat.AVRO, "MapredIcebergInputFormat"}, - {FileFormat.ORC, "MapredIcebergInputFormat"}, + {FileFormat.PARQUET, 2, "IcebergInputFormat"}, + {FileFormat.AVRO, 2, "IcebergInputFormat"}, + {FileFormat.ORC, 2, "IcebergInputFormat"}, + {FileFormat.PARQUET, 2, "MapredIcebergInputFormat"}, + {FileFormat.AVRO, 2, "MapredIcebergInputFormat"}, + {FileFormat.ORC, 2, "MapredIcebergInputFormat"}, + {FileFormat.PARQUET, 3, "IcebergInputFormat"}, + {FileFormat.PARQUET, 3, "MapredIcebergInputFormat"}, }; } @@ -72,7 +74,8 @@ public void writeTestDataFile() throws IOException { } @Override - protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException { + protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion) + throws IOException { Table table; File location = temp.resolve(inputFormat).resolve(format.name()).toFile(); @@ -82,7 +85,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) thro TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); - ops.commit(meta, meta.upgradeToFormatVersion(2)); + ops.commit(meta, meta.upgradeToFormatVersion(formatVersion)); return table; } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index bde87778ad62..6c6bbf54ec16 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -44,6 +44,7 @@ import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -99,16 +100,21 @@ public class TestSparkReaderDeletes extends DeleteReadTests { protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - @Parameter(index = 1) + @Parameter(index = 2) private boolean vectorized; - @Parameters(name = "format = {0}, vectorized = {1}") + @Parameter(index = 3) + private PlanningMode planningMode; + + @Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}") public static Object[][] parameters() { return new Object[][] { - new Object[] {FileFormat.PARQUET, false}, - new Object[] {FileFormat.PARQUET, true}, - new Object[] {FileFormat.ORC, false}, - new Object[] {FileFormat.AVRO, false} + new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL}, + new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL}, + new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL}, }; } @@ -162,7 +168,13 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format.name()).commit(); + table + .updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, format.name()) + .set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName()) + .set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()) + .set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) + .commit(); if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) { String vectorizationEnabled = format.equals(FileFormat.PARQUET) @@ -342,7 +354,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -374,7 +387,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -450,7 +464,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -482,7 +497,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -604,7 +620,10 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Pair.of(dataFile.path(), 109L)); Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + deletes, + formatVersion); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second()) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 29c2d4b39a1e..6c6bbf54ec16 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -100,19 +100,21 @@ public class TestSparkReaderDeletes extends DeleteReadTests { protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - @Parameter(index = 1) + @Parameter(index = 2) private boolean vectorized; - @Parameter(index = 2) + @Parameter(index = 3) private PlanningMode planningMode; - @Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}") public static Object[][] parameters() { return new Object[][] { - new Object[] {FileFormat.PARQUET, false, PlanningMode.DISTRIBUTED}, - new Object[] {FileFormat.PARQUET, true, PlanningMode.LOCAL}, - new Object[] {FileFormat.ORC, false, PlanningMode.DISTRIBUTED}, - new Object[] {FileFormat.AVRO, false, PlanningMode.LOCAL} + new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL}, + new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL}, + new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL}, }; } @@ -171,6 +173,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { .set(TableProperties.DEFAULT_FILE_FORMAT, format.name()) .set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName()) .set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()) + .set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) .commit(); if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) { String vectorizationEnabled = @@ -351,7 +354,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -383,7 +387,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -459,7 +464,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -491,7 +497,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -613,7 +620,10 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Pair.of(dataFile.path(), 109L)); Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + deletes, + formatVersion); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second()) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 29c2d4b39a1e..72fd307f115b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -100,19 +100,21 @@ public class TestSparkReaderDeletes extends DeleteReadTests { protected static SparkSession spark = null; protected static HiveCatalog catalog = null; - @Parameter(index = 1) + @Parameter(index = 2) private boolean vectorized; - @Parameter(index = 2) + @Parameter(index = 3) private PlanningMode planningMode; - @Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}") public static Object[][] parameters() { return new Object[][] { - new Object[] {FileFormat.PARQUET, false, PlanningMode.DISTRIBUTED}, - new Object[] {FileFormat.PARQUET, true, PlanningMode.LOCAL}, - new Object[] {FileFormat.ORC, false, PlanningMode.DISTRIBUTED}, - new Object[] {FileFormat.AVRO, false, PlanningMode.LOCAL} + new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL}, + new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL}, + new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED}, + new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL}, }; } @@ -161,16 +163,17 @@ public void cleanup() throws IOException { } @Override - protected Table createTable(String name, Schema schema, PartitionSpec spec) { + protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion) { Table table = catalog.createTable(TableIdentifier.of("default", name), schema); TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); - ops.commit(meta, meta.upgradeToFormatVersion(2)); + ops.commit(meta, meta.upgradeToFormatVersion(formatVersion)); table .updateProperties() .set(TableProperties.DEFAULT_FILE_FORMAT, format.name()) .set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName()) .set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()) + .set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)) .commit(); if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) { String vectorizationEnabled = @@ -351,7 +354,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -383,7 +387,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -459,7 +464,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -491,7 +497,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException { table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); table .newRowDelta() @@ -557,7 +564,7 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio assumeThat(format).isEqualTo("parquet"); String tblName = "test3"; - Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); + Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned(), formatVersion); List fileSplits = Lists.newArrayList(); StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); @@ -613,7 +620,10 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio Pair.of(dataFile.path(), 109L)); Pair posDeletes = FileHelpers.writeDeleteFile( - table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + deletes, + formatVersion); tbl.newRowDelta() .addDeletes(posDeletes.first()) .validateDataFilesExist(posDeletes.second())