Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3148,17 +3148,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
orderSpec: Seq[SortOrder],
op: SparkPlan): Boolean = {
if (partitionSpec.length != orderSpec.length) {
withInfo(op, "Partitioning and sorting specifications do not match")
return false
}

val partitionColumnNames = partitionSpec.collect { case a: AttributeReference =>
a.name
val partitionColumnNames = partitionSpec.collect {
case a: AttributeReference => a.name
case other =>
withInfo(op, s"Unsupported partition expression: ${other.getClass.getSimpleName}")
return false
}

val orderColumnNames = orderSpec.collect { case s: SortOrder =>
s.child match {
case a: AttributeReference => a.name
case other =>
withInfo(op, s"Unsupported sort expression: ${other.getClass.getSimpleName}")
return false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
import org.apache.spark.sql.comet.CometHashAggregateExec
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{count_distinct, sum}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -89,6 +90,37 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// based on Spark's SQLWindowFunctionSuite test of the same name
test("window function: partition and order expressions") {
for (shuffleMode <- Seq("auto", "native", "jvm")) {
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
val df =
Seq((1, "a", 5), (2, "a", 6), (3, "b", 7), (4, "b", 8), (5, "c", 9), (6, "c", 10)).toDF(
"month",
"area",
"product")
df.createOrReplaceTempView("windowData")
val df2 = sql("""
|select month, area, product, sum(product + 1) over (partition by 1 order by 2)
|from windowData
""".stripMargin)
checkSparkAnswer(df2)
val cometShuffles = collect(df2.queryExecution.executedPlan) {
case _: CometShuffleExchangeExec => true
}
if (shuffleMode == "jvm") {
assert(cometShuffles.length == 1)
} else {
// we fall back to Spark for shuffle because we do not support
// native shuffle with a LocalTableScan input, and we do not fall
// back to Comet columnar shuffle due to
// https://github.com/apache/datafusion-comet/issues/1248
assert(cometShuffles.isEmpty)
}
}
}
}

test("multiple column distinct count") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down
Loading