From 7fe82f20310f5a5f1ace0bb7ad9a0fac6e2abe5e Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 26 Oct 2023 08:52:10 -0700 Subject: [PATCH] [SPARK-45652][SQL] SPJ: Handle empty input partitions after dynamic filtering Handle the case when input partitions become empty after V2 dynamic filtering, when SPJ is enabled. Current in the situation when all input partitions are filtered out via dynamic filtering, SPJ doesn't work but instead will panic: ``` java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135) at org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28) ``` This is because the `groupPartitions` method will return `None` in this scenario. We should handle the case. No Added a test case for this. No Closes #43531 from sunchao/SPARK-45652. Authored-by: Chao Sun Signed-off-by: Chao Sun --- .../datasources/v2/BatchScanExec.scala | 5 ++- .../KeyGroupedPartitioningSuite.scala | 42 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 02821d10d5081..55f4a712410d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -105,7 +105,7 @@ case class BatchScanExec( "partition values that are not present in the original partitioning.") } - groupPartitions(newPartitions).get.map(_._2) + groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2) case _ => // no validation is needed as the data source did not report any specific partitioning @@ -148,7 +148,8 @@ case class BatchScanExec( s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " + "is enabled") - val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true).get + val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true) + .getOrElse(Seq.empty) // This means the input partitions are not grouped by partition values. We'll need to // check `groupByPartitionValues` and decide whether to group and replicate splits diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 0c3a1930dc9fa..cf76f6ca32cad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1093,4 +1093,46 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } } + + test("SPARK-45652: SPJ should handle empty partition after dynamic filtering") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") { + val items_partitions = Array(identity("id")) + createTable(items, items_schema, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchases_schema, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") + + Seq(true, false).foreach { pushDownValues => + Seq(true, false).foreach { partiallyClustered => { + withSQLConf( + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> + partiallyClustered.toString, + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { + // The dynamic filtering effectively filtered out all the partitions + val df = sql(s"SELECT p.price from testcat.ns.$items i, testcat.ns.$purchases p " + + "WHERE i.id = p.item_id AND i.price > 50.0") + checkAnswer(df, Seq.empty) + } + } + } + } + } + } }