Skip to content

Commit

Permalink
only apply to the whole plan
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 20, 2024
1 parent fc3e5ff commit 449f2e0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.comet

import org.apache.spark.internal.Logging
import org.apache.spark.sql.comet.{CometExec, CometRowToColumnarExec}
import org.apache.spark.sql.comet.{CometExec, CometRowToColumnarExec, CometSinkPlaceHolder}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{Cost, CostEvaluator, SimpleCost}

Expand Down Expand Up @@ -58,6 +58,7 @@ class CometCostEvaluator extends CostEvaluator with Logging {
case RowToColumnarExec(_) => DEFAULT_TRANSITION_COST
case ColumnarToRowExec(_) => DEFAULT_TRANSITION_COST
case CometRowToColumnarExec(_) => DEFAULT_TRANSITION_COST
case _: CometSinkPlaceHolder => 0
case _: CometExec => DEFAULT_COMET_OPERATOR_COST
case _ => DEFAULT_SPARK_OPERATOR_COST
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -735,16 +735,27 @@ class CometSparkSessionExtensions
plan
}
} else {

var newPlan = transform(plan)

if (CometConf.COMET_CBO_ENABLED.get()) {
val costEvaluator = new CometCostEvaluator()
val sparkCost = costEvaluator.evaluateCost(plan)
val cometCost = costEvaluator.evaluateCost(newPlan)
if (cometCost > sparkCost) {
logWarning(s"Comet plan is more expensive than Spark plan ($cometCost > $sparkCost")
return plan
}
logWarning(s"SPARK: $plan\nCOMET: $newPlan")

// For now, only consider the cost of the entire plan before it starts executing. Once
// execution starts, we cannot simply fall back for individual query stages because
// it can lead to incompatibilities with exchanges.
plan match {
case _: AdaptiveSparkPlanExec =>
if (CometConf.COMET_CBO_ENABLED.get()) {
val costEvaluator = new CometCostEvaluator()
val sparkCost = costEvaluator.evaluateCost(plan)
val cometCost = costEvaluator.evaluateCost(newPlan)
if (cometCost > sparkCost) {
logWarning(
s"Comet plan is more expensive than Spark plan ($cometCost > $sparkCost")
return plan
}
}
case _ =>
}

// if the plan cannot be run fully natively then explain why (when appropriate
Expand Down

0 comments on commit 449f2e0

Please sign in to comment.