Skip to content

Commit

Permalink
Spark: Change delete file granularity to file in Spark 3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Dec 12, 2024
1 parent d402f83 commit d4e9ca5
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -149,12 +150,15 @@ public void testCoalesceDelete() throws Exception {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -1301,7 +1305,7 @@ public void testDeleteWithMultipleSpecs() {
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
validateMergeOnRead(currentSnapshot, "3", "4", null);
} else {
validateMergeOnRead(currentSnapshot, "3", "3", null);
validateMergeOnRead(currentSnapshot, "3", "4", null);
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -231,7 +233,6 @@ public void testMergeWithVectorizedReads() {

@TestTemplate
public void testCoalesceMerge() {
assumeThat(formatVersion).isLessThan(3);
createAndInitTable("id INT, salary INT, dep STRING");

String[] records = new String[100];
Expand All @@ -250,7 +251,9 @@ public void testCoalesceMerge() {
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
MERGE_DISTRIBUTION_MODE,
DistributionMode.NONE.modeName());
DistributionMode.NONE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -293,6 +296,9 @@ public void testCoalesceMerge() {
// AQE detects that all shuffle blocks are small and processes them in 1 task
// otherwise, there would be 200 tasks writing to the table
validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DVS_PROP, "4");
} else {
// MoR MERGE would perform a join on `id`
// every task has data for each of 200 reducers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private void createTable(boolean partitioned) throws Exception {
String partitionStmt = partitioned ? "PARTITIONED BY (id)" : "";
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg %s TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.delete.granularity'='partition')",
tableName, partitionStmt);

List<SimpleRecord> records =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -131,12 +132,15 @@ public void testCoalesceUpdate() {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -443,7 +447,7 @@ public void testUpdateWithoutCondition() {
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
validateMergeOnRead(currentSnapshot, "2", "3", "2");
} else {
validateMergeOnRead(currentSnapshot, "2", "2", "2");
validateMergeOnRead(currentSnapshot, "2", "3", "2");
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ public DeleteGranularity deleteGranularity() {
.stringConf()
.option(SparkWriteOptions.DELETE_GRANULARITY)
.tableProperty(TableProperties.DELETE_GRANULARITY)
.defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
.defaultValue(DeleteGranularity.FILE.toString())
.parse();
return DeleteGranularity.fromString(valueAsString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testDeleteGranularityDefault() {
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());

DeleteGranularity value = writeConf.deleteGranularity();
assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
assertThat(value).isEqualTo(DeleteGranularity.FILE);
}

@TestTemplate
Expand All @@ -151,13 +151,13 @@ public void testDeleteGranularityTableProperty() {

table
.updateProperties()
.set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString())
.set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString())
.commit();

SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());

DeleteGranularity value = writeConf.deleteGranularity();
assertThat(value).isEqualTo(DeleteGranularity.FILE);
assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
}

@TestTemplate
Expand Down

0 comments on commit d4e9ca5

Please sign in to comment.