Skip to content

Commit

Permalink
fix ch tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ivoson committed Oct 17, 2024
1 parent 1a9d4ec commit b58b47e
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
wrapChild(r2c)
case union: ColumnarUnionExec =>
wrapChild(union)
case ordered: TakeOrderedAndProjectExecTransformer =>
wrapChild(ordered)
case other =>
throw new GlutenNotSupportException(
s"Not supported operator ${other.nodeName} for BroadcastRelation")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,56 @@
[{
"id": 20,
"name": "kFetch",
"time": 0,
"input_wait_time": 3786,
"output_wait_time": 3786,
"steps": [{
"name": "Limit",
"description": "LIMIT",
"processors": [{
"name": "Limit",
"time": 0,
"output_rows": 44,
"output_bytes": 9483,
"input_rows": 44,
"input_bytes": 9483
}]
}]
}, {
"id": 19,
"name": "kSort",
"time": 788,
"input_wait_time": 3622,
"output_wait_time": 3622,
"steps": [{
"name": "Sorting",
"description": "Sorting step",
"processors": [
{
"name": "PartialSortingTransform",
"time": 772,
"output_rows": 44,
"output_bytes": 9483,
"input_rows": 44,
"input_bytes": 9483
}, {
"name": "LimitsCheckingTransform",
"time": 0,
"output_rows": 44,
"output_bytes": 9483,
"input_rows": 44,
"input_bytes": 9483
}, {
"name": "MergeSortingTransform",
"time": 16,
"output_rows": 44,
"output_bytes": 9483,
"input_rows": 44,
"input_bytes": 9483
}
]
}]
}, {
"id": 18,
"name": "kProject",
"time": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,9 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
|""".stripMargin) {
df =>
val sortExec = df.queryExecution.executedPlan.collect {
case sortExec: TakeOrderedAndProjectExecTransformer => sortExec
case sortLimit @ LimitTransformer(_: SortExecTransformer, _, _) => sortLimit
}
assert(sortExec.size == 1)
assert(sortExec.size == 2)
val result = df.collect()
val expectedResult = Seq(Row(0), Row(1), Row(2), Row(3), Row(4))
TestUtils.compareAnswers(result, expectedResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ class GlutenClickHouseTPCDSMetricsSuite extends GlutenClickHouseTPCDSAbstractSui
val allWholeStageTransformers = df.queryExecution.executedPlan.collect {
case wholeStage: WholeStageTransformer => wholeStage
}
assert(allWholeStageTransformers.size == 9)
assert(allWholeStageTransformers.size == 10)

val wholeStageTransformer = allWholeStageTransformers(1)
val wholeStageTransformer = allWholeStageTransformers(2)

GlutenClickHouseMetricsUTUtils.executeMetricsUpdater(
wholeStageTransformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g
}

assert(allGlutenPlans.size == 58)
assert(allGlutenPlans.size == 60)

val shjPlan = allGlutenPlans(8)
val shjPlan = allGlutenPlans(10)
assert(shjPlan.metrics("totalTime").value == 6)
assert(shjPlan.metrics("inputWaitTime").value == 5)
assert(shjPlan.metrics("outputWaitTime").value == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
compareResultsAgainstVanillaSpark(sql5, compareResult = true, _ => {})
}

test("TakeOrderedAndProjectExecTransformer in broadcastRelation") {
test("TakeOrderedAndProjectExec in broadcastRelation") {
val q =
"""
| with dd as (
Expand All @@ -350,7 +350,13 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
| from store_sales ss, dd
| where ss_sold_date_sk=dd.d_date_sk+1
|""".stripMargin
runQueryAndCompare(q)(checkGlutenOperatorMatch[TakeOrderedAndProjectExecTransformer])
runQueryAndCompare(q) {
df =>
val sortLimit = df.queryExecution.executedPlan.collect {
case sortLimit @ LimitTransformer(_: SortExecTransformer, _, _) => sortLimit
}
assert(sortLimit.size == 2)
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.gluten.utils.PlanUtil

import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
Expand Down Expand Up @@ -430,12 +429,9 @@ object OffloadOthers {
val sortOrder = plan.sortOrder
val projectList = plan.projectList

val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
val localSorted = if (orderingSatisfies) {
child
} else {
SortExecTransformer(sortOrder, global = false, child)
}
// Always add localSort here since child's ordering may change after applying offload rules.
// The localSort will be removed by rule [[EliminateLocalSort]] if not needed.
val localSorted = SortExecTransformer(sortOrder, global = false, child)

val singlePartition = child.outputPartitioning.numPartitions == 1
val finalLimitPlan = if (singlePartition) {
Expand Down

0 comments on commit b58b47e

Please sign in to comment.