@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeSt
29
29
import org .apache .spark .sql .execution .adaptive .DisableAdaptiveExecutionSuite
30
30
import org .apache .spark .sql .execution .aggregate .HashAggregateExec
31
31
import org .apache .spark .sql .execution .exchange .ShuffleExchangeExec
32
+ import org .apache .spark .sql .execution .joins .ShuffledHashJoinExec
32
33
import org .apache .spark .sql .functions ._
33
34
import org .apache .spark .sql .internal .SQLConf
34
35
import org .apache .spark .sql .test .SharedSparkSession
@@ -363,6 +364,41 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
363
364
}
364
365
}
365
366
367
+ test(" SPARK-32629: ShuffledHashJoin(full outer) metrics" ) {
368
+ val uniqueLeftDf = Seq ((" 1" , " 1" ), (" 11" , " 11" )).toDF(" key" , " value" )
369
+ val nonUniqueLeftDf = Seq ((" 1" , " 1" ), (" 1" , " 2" ), (" 11" , " 11" )).toDF(" key" , " value" )
370
+ val rightDf = (1 to 10 ).map(i => (i.toString, i.toString)).toDF(" key2" , " value" )
371
+ Seq (
372
+ // Test unique key on build side
373
+ (uniqueLeftDf, rightDf, 11 , 134228048 , 10 , 134221824 ),
374
+ // Test non-unique key on build side
375
+ (nonUniqueLeftDf, rightDf, 12 , 134228552 , 11 , 134221824 )
376
+ ).foreach { case (leftDf, rightDf, fojRows, fojBuildSize, rojRows, rojBuildSize) =>
377
+ val fojDf = leftDf.hint(" shuffle_hash" ).join(
378
+ rightDf, $" key" === $" key2" , " full_outer" )
379
+ fojDf.collect()
380
+ val fojPlan = fojDf.queryExecution.executedPlan.collectFirst {
381
+ case s : ShuffledHashJoinExec => s
382
+ }
383
+ assert(fojPlan.isDefined, " The query plan should have shuffled hash join" )
384
+ testMetricsInSparkPlanOperator(fojPlan.get,
385
+ Map (" numOutputRows" -> fojRows, " buildDataSize" -> fojBuildSize))
386
+
387
+ // Test right outer join as well to verify build data size to be different
388
+ // from full outer join. This makes sure we take extra BitSet/OpenHashSet
389
+ // for full outer join into account.
390
+ val rojDf = leftDf.hint(" shuffle_hash" ).join(
391
+ rightDf, $" key" === $" key2" , " right_outer" )
392
+ rojDf.collect()
393
+ val rojPlan = rojDf.queryExecution.executedPlan.collectFirst {
394
+ case s : ShuffledHashJoinExec => s
395
+ }
396
+ assert(rojPlan.isDefined, " The query plan should have shuffled hash join" )
397
+ testMetricsInSparkPlanOperator(rojPlan.get,
398
+ Map (" numOutputRows" -> rojRows, " buildDataSize" -> rojBuildSize))
399
+ }
400
+ }
401
+
366
402
test(" BroadcastHashJoin(outer) metrics" ) {
367
403
val df1 = Seq ((1 , " a" ), (1 , " b" ), (4 , " c" )).toDF(" key" , " value" )
368
404
val df2 = Seq ((1 , " a" ), (1 , " b" ), (2 , " c" ), (3 , " d" )).toDF(" key2" , " value" )
@@ -686,16 +722,6 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
686
722
}
687
723
688
724
test(" SPARK-28332: SQLMetric merge should handle -1 properly" ) {
689
- def checkSparkPlanMetrics (plan : SparkPlan , expected : Map [String , Long ]): Unit = {
690
- expected.foreach { case (metricName : String , metricValue : Long ) =>
691
- assert(plan.metrics.contains(metricName), s " The query plan should have metric $metricName" )
692
- val actualMetric = plan.metrics.get(metricName).get
693
- assert(actualMetric.value == metricValue,
694
- s " The query plan metric $metricName did not match, " +
695
- s " expected: $metricValue, actual: ${actualMetric.value}" )
696
- }
697
- }
698
-
699
725
val df = testData.join(testData2.filter(' b === 0 ), $" key" === $" a" , " left_outer" )
700
726
df.collect()
701
727
val plan = df.queryExecution.executedPlan
@@ -706,7 +732,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
706
732
707
733
assert(exchanges.size == 2 , " The query plan should have two shuffle exchanges" )
708
734
709
- checkSparkPlanMetrics(exchanges(0 ), Map (" dataSize" -> 3200 , " shuffleRecordsWritten" -> 100 ))
710
- checkSparkPlanMetrics(exchanges(1 ), Map (" dataSize" -> 0 , " shuffleRecordsWritten" -> 0 ))
735
+ testMetricsInSparkPlanOperator(exchanges.head,
736
+ Map (" dataSize" -> 3200 , " shuffleRecordsWritten" -> 100 ))
737
+ testMetricsInSparkPlanOperator(exchanges(1 ), Map (" dataSize" -> 0 , " shuffleRecordsWritten" -> 0 ))
711
738
}
712
739
}
0 commit comments