Skip to content

Commit

Permalink
Data, MR, Spark: Test deletes with format-version=3
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Nov 14, 2024
1 parent e06b069 commit bbd0810
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ public void testPartitionSpecEvolutionRemovalV2() {
.withPartitionPath("id=11")
.build();

DeleteFile delete10 = posDelete(table, data10);
DeleteFile delete11 = posDelete(table, data11);
DeleteFile delete10 = newDeletes(data10);
DeleteFile delete11 = newDeletes(data11);

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();
Expand Down Expand Up @@ -441,12 +441,6 @@ public void testPartitionSpecEvolutionRemovalV2() {
assertThat(tasks).hasSize(expectedScanTaskCount(3));
}

private DeleteFile posDelete(Table table, DataFile dataFile) {
return formatVersion >= 3
? FileGenerationUtil.generateDV(table, dataFile)
: FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
}

@TestTemplate
public void testPartitionSpecEvolutionAdditiveV1() {
assumeThat(formatVersion).isEqualTo(1);
Expand Down Expand Up @@ -537,8 +531,8 @@ public void testPartitionSpecEvolutionAdditiveV2AndAbove() {
.withPartitionPath("data_bucket=1/id=11")
.build();

DeleteFile delete10 = posDelete(table, data10);
DeleteFile delete11 = posDelete(table, data11);
DeleteFile delete10 = newDeletes(data10);
DeleteFile delete11 = newDeletes(data11);

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();
Expand Down
40 changes: 25 additions & 15 deletions data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.data;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -82,19 +83,23 @@ public abstract class DeleteReadTests {

@Parameter protected FileFormat format;

@Parameter(index = 1)
protected int formatVersion;

@Parameters(name = "fileFormat = {0}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET},
new Object[] {FileFormat.AVRO},
new Object[] {FileFormat.ORC}
new Object[] {FileFormat.PARQUET, 2},
new Object[] {FileFormat.AVRO, 2},
new Object[] {FileFormat.ORC, 2},
new Object[] {FileFormat.PARQUET, 3},
};
}

@BeforeEach
public void writeTestDataFile() throws IOException {
this.tableName = "test";
this.table = createTable(tableName, SCHEMA, SPEC);
this.table = createTable(tableName, SCHEMA, SPEC, formatVersion);
this.records = Lists.newArrayList();

// records all use IDs that are in bucket id_bucket=0
Expand Down Expand Up @@ -126,7 +131,7 @@ public void cleanup() throws IOException {
private void initDateTable() throws IOException {
dropTable("test2");
this.dateTableName = "test2";
this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC);
this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC, formatVersion);

GenericRecord record = GenericRecord.create(dateTable.schema());

Expand Down Expand Up @@ -179,8 +184,8 @@ private void initDateTable() throws IOException {
.commit();
}

protected abstract Table createTable(String name, Schema schema, PartitionSpec spec)
throws IOException;
protected abstract Table createTable(
String name, Schema schema, PartitionSpec spec, int formatVersion) throws IOException;

protected abstract void dropTable(String name) throws IOException;

Expand Down Expand Up @@ -384,7 +389,8 @@ public void testPositionDeletes() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand All @@ -401,6 +407,10 @@ public void testPositionDeletes() throws IOException {

@TestTemplate
public void testMultiplePosDeleteFiles() throws IOException {
assumeThat(formatVersion)
.as("Can't write multiple delete files with formatVersion >= 3")
.isEqualTo(2);

List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 0L), // id = 29
Expand All @@ -412,25 +422,24 @@ public void testMultiplePosDeleteFiles() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 6L) // id = 122
);
deletes = Lists.newArrayList(Pair.of(dataFile.path(), 6L)); // id = 122

posDeletes =
FileHelpers.writeDeleteFile(
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -475,7 +484,8 @@ public void testMixedPositionAndEqualityDeletes() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ public class TestGenericReaderDeletes extends DeleteReadTests {
@TempDir private File tableDir;

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException {
return TestTables.create(tableDir, name, schema, spec, 2);
protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion)
throws IOException {
return TestTables.create(tableDir, name, schema, spec, formatVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ public static void stopMetastore() throws Exception {
}

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) {
protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion) {
Map<String, String> props = Maps.newHashMap();
props.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());

Table table = catalog.createTable(TableIdentifier.of(databaseName, name), schema, spec, props);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
ops.commit(meta, meta.upgradeToFormatVersion(formatVersion));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,20 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
private final HadoopTables tables = new HadoopTables(conf);
private TestHelper helper;

@Parameter(index = 1)
@Parameter(index = 2)
private String inputFormat;

@Parameters(name = "fileFormat = {0}, inputFormat = {1}")
@Parameters(name = "fileFormat = {0}, formatVersion = {1}, inputFormat = {2}")
public static Object[][] parameters() {
return new Object[][] {
{FileFormat.PARQUET, "IcebergInputFormat"},
{FileFormat.AVRO, "IcebergInputFormat"},
{FileFormat.ORC, "IcebergInputFormat"},
{FileFormat.PARQUET, "MapredIcebergInputFormat"},
{FileFormat.AVRO, "MapredIcebergInputFormat"},
{FileFormat.ORC, "MapredIcebergInputFormat"},
{FileFormat.PARQUET, 2, "IcebergInputFormat"},
{FileFormat.AVRO, 2, "IcebergInputFormat"},
{FileFormat.ORC, 2, "IcebergInputFormat"},
{FileFormat.PARQUET, 2, "MapredIcebergInputFormat"},
{FileFormat.AVRO, 2, "MapredIcebergInputFormat"},
{FileFormat.ORC, 2, "MapredIcebergInputFormat"},
{FileFormat.PARQUET, 3, "IcebergInputFormat"},
{FileFormat.PARQUET, 3, "MapredIcebergInputFormat"},
};
}

Expand All @@ -72,7 +74,8 @@ public void writeTestDataFile() throws IOException {
}

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException {
protected Table createTable(String name, Schema schema, PartitionSpec spec, int formatVersion)
throws IOException {
Table table;

File location = temp.resolve(inputFormat).resolve(format.name()).toFile();
Expand All @@ -82,7 +85,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) thro

TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
ops.commit(meta, meta.upgradeToFormatVersion(formatVersion));

return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -99,16 +100,21 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
protected static SparkSession spark = null;
protected static HiveCatalog catalog = null;

@Parameter(index = 1)
@Parameter(index = 2)
private boolean vectorized;

@Parameters(name = "format = {0}, vectorized = {1}")
@Parameter(index = 3)
private PlanningMode planningMode;

@Parameters(name = "fileFormat = {0}, formatVersion = {1}, vectorized = {2}, planningMode = {3}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.PARQUET, false},
new Object[] {FileFormat.PARQUET, true},
new Object[] {FileFormat.ORC, false},
new Object[] {FileFormat.AVRO, false}
new Object[] {FileFormat.PARQUET, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 2, true, PlanningMode.LOCAL},
new Object[] {FileFormat.ORC, 2, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.AVRO, 2, false, PlanningMode.LOCAL},
new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL},
};
}

Expand Down Expand Up @@ -162,7 +168,13 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format.name()).commit();
table
.updateProperties()
.set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
.set(TableProperties.DATA_PLANNING_MODE, planningMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, planningMode.modeName())
.set(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion))
.commit();
if (format.equals(FileFormat.PARQUET) || format.equals(FileFormat.ORC)) {
String vectorizationEnabled =
format.equals(FileFormat.PARQUET)
Expand Down Expand Up @@ -342,7 +354,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -374,7 +387,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -450,7 +464,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -482,7 +497,8 @@ public void testFilterOnDeletedMetadataColumn() throws IOException {
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
deletes);
deletes,
formatVersion);

table
.newRowDelta()
Expand Down Expand Up @@ -604,7 +620,10 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio
Pair.of(dataFile.path(), 109L));
Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), deletes);
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
deletes,
formatVersion);
tbl.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
Expand Down
Loading

0 comments on commit bbd0810

Please sign in to comment.