Skip to content

Commit

Permalink
chore: Remove ObjectHashAggregate from BaseAggregateExec pattern as i…
Browse files Browse the repository at this point in the history
…t is not supported
  • Loading branch information
viirya committed Oct 1, 2024
1 parent 84cccf7 commit 5b59c9c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, Comet
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -437,9 +437,7 @@ class CometSparkSessionExtensions
op
}

case op: BaseAggregateExec
if op.isInstanceOf[HashAggregateExec] ||
op.isInstanceOf[ObjectHashAggregateExec] =>
case op: BaseAggregateExec if op.isInstanceOf[HashAggregateExec] =>
val groupingExprs = op.groupingExpressions
val aggExprs = op.aggregateExpressions
val resultExpressions = op.resultExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.window.WindowExec
Expand Down Expand Up @@ -2813,8 +2813,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
}

case aggregate: BaseAggregateExec
if (aggregate.isInstanceOf[HashAggregateExec] ||
aggregate.isInstanceOf[ObjectHashAggregateExec]) &&
if aggregate.isInstanceOf[HashAggregateExec] &&
CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) =>
val groupingExpressions = aggregate.groupingExpressions
val aggregateExpressions = aggregate.aggregateExpressions
Expand Down

0 comments on commit 5b59c9c

Please sign in to comment.