From 3a9516f629b548373f453c013424847cb0da4af2 Mon Sep 17 00:00:00 2001 From: Rakesh Das <16017562+rakesh-das08@users.noreply.github.com> Date: Sat, 8 Jul 2023 04:09:44 +0530 Subject: [PATCH] Spark 3.4: WAP branch not propagated when using DELETE without WHERE (#7900) --- .../iceberg/spark/extensions/TestDelete.java | 38 +++++++++++++++++++ .../apache/iceberg/spark/SparkTableUtil.java | 38 +++++++++++++++++++ .../iceberg/spark/source/SparkTable.java | 5 +++ 3 files changed, 81 insertions(+) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index f9888cad0a8b..128828866e0f 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -27,6 +27,7 @@ import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.spark.sql.functions.lit; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Arrays; import java.util.Collections; @@ -48,6 +49,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -1267,6 +1269,42 @@ public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableE branch))); } + @Test + public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableException { + assumeThat(branch) + .as("Run only if custom WAP branch is not main") + .isNotNull() + .isNotEqualTo(SnapshotRef.MAIN_BRANCH); + + createAndInitPartitionedTable(); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr")); + createBranchIfNeeded(); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch), + () -> { + sql("DELETE FROM %s t WHERE id=1", tableName); + Assertions.assertThat(spark.table(tableName).count()).isEqualTo(2L); + Assertions.assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(2L); + Assertions.assertThat(spark.table(tableName + ".branch_main").count()) + .as("Should not modify main branch") + .isEqualTo(3L); + }); + withSQLConf( + ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch), + () -> { + sql("DELETE FROM %s t", tableName); + Assertions.assertThat(spark.table(tableName).count()).isEqualTo(0L); + Assertions.assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(0L); + Assertions.assertThat(spark.table(tableName + ".branch_main").count()) + .as("Should not modify main branch") + .isEqualTo(3L); + }); + } + // TODO: multiple stripes for ORC protected void createAndInitPartitionedTable() { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index b8c00b713a23..6fe40a245c5a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -44,6 +44,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.hadoop.Util; @@ -666,6 +667,43 @@ public static Dataset loadMetadataTable( spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options)); } + /** + * Determine the write branch. + * + *

Validate wap config and determine the write branch. + * + * @param spark a Spark Session + * @param branch write branch if there is no WAP branch configured + * @return branch for write operation + */ + public static String determineWriteBranch(SparkSession spark, String branch) { + String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null); + String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null); + ValidationException.check( + wapId == null || wapBranch == null, + "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]", + wapId, + wapBranch); + + if (wapBranch != null) { + ValidationException.check( + branch == null, + "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]", + branch, + wapBranch); + + return wapBranch; + } + return branch; + } + + public static boolean wapEnabled(Table table) { + return PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, + Boolean.getBoolean(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)); + } + /** Class representing a table partition. */ public static class SparkPartition implements Serializable { private final Map values; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 240a9df0a88c..0ad33aaadc45 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -57,6 +57,7 @@ import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; @@ -372,6 +373,10 @@ public void deleteWhere(Filter[] filters) { .set("spark.app.id", sparkSession().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); + if (SparkTableUtil.wapEnabled(table())) { + branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch); + } + if (branch != null) { deleteFiles.toBranch(branch); }