From b58b47eb563b231f55fbc496c1652db517ce2108 Mon Sep 17 00:00:00 2001 From: Tengfei Huang Date: Wed, 16 Oct 2024 13:18:18 +0800 Subject: [PATCH] fix ch tests --- .../clickhouse/CHSparkPlanExecApi.scala | 2 - .../tpch-q2-wholestage-11-metrics.json | 52 +++++++++++++++++++ .../execution/GlutenClickHouseTPCHSuite.scala | 4 +- .../GlutenClickHouseTPCDSMetricsSuite.scala | 4 +- .../GlutenClickHouseTPCHMetricsSuite.scala | 4 +- .../GlutenClickHouseTPCDSParquetSuite.scala | 10 +++- .../columnar/OffloadSingleNode.scala | 10 ++-- 7 files changed, 69 insertions(+), 17 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 903523791a1b..04aca712fcdd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -521,8 +521,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { wrapChild(r2c) case union: ColumnarUnionExec => wrapChild(union) - case ordered: TakeOrderedAndProjectExecTransformer => - wrapChild(ordered) case other => throw new GlutenNotSupportException( s"Not supported operator ${other.nodeName} for BroadcastRelation") diff --git a/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-11-metrics.json b/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-11-metrics.json index 86be6f434e2d..d7b1c89a774b 100644 --- a/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-11-metrics.json +++ b/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-11-metrics.json @@ -1,4 +1,56 @@ [{ + "id": 20, + "name": "kFetch", + "time": 0, + "input_wait_time": 3786, + "output_wait_time": 3786, + "steps": [{ + "name": "Limit", + "description": "LIMIT", + "processors": [{ + "name": "Limit", + "time": 0, + "output_rows": 44, + "output_bytes": 9483, + "input_rows": 44, + "input_bytes": 9483 + }] + }] +}, { + "id": 19, + "name": "kSort", + "time": 788, + "input_wait_time": 3622, + "output_wait_time": 3622, + "steps": [{ + "name": "Sorting", + "description": "Sorting step", + "processors": [ + { + "name": "PartialSortingTransform", + "time": 772, + "output_rows": 44, + "output_bytes": 9483, + "input_rows": 44, + "input_bytes": 9483 + }, { + "name": "LimitsCheckingTransform", + "time": 0, + "output_rows": 44, + "output_bytes": 9483, + "input_rows": 44, + "input_bytes": 9483 + }, { + "name": "MergeSortingTransform", + "time": 16, + "output_rows": 44, + "output_bytes": 9483, + "input_rows": 44, + "input_bytes": 9483 + } + ] + }] +}, { "id": 18, "name": "kProject", "time": 1, diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index a56f45d1ba3d..b0244ded3807 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -368,9 +368,9 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { |""".stripMargin) { df => val sortExec = df.queryExecution.executedPlan.collect { - case sortExec: TakeOrderedAndProjectExecTransformer => sortExec + case sortLimit @ LimitTransformer(_: SortExecTransformer, _, _) => sortLimit } - assert(sortExec.size == 1) + assert(sortExec.size == 2) val result = df.collect() val expectedResult = Seq(Row(0), Row(1), Row(2), Row(3), Row(4)) TestUtils.compareAnswers(result, expectedResult) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala index cbf1caf44e7f..68431a436913 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala @@ -77,9 +77,9 @@ class GlutenClickHouseTPCDSMetricsSuite extends GlutenClickHouseTPCDSAbstractSui val allWholeStageTransformers = df.queryExecution.executedPlan.collect { case wholeStage: WholeStageTransformer => wholeStage } - assert(allWholeStageTransformers.size == 9) + assert(allWholeStageTransformers.size == 10) - val wholeStageTransformer = allWholeStageTransformers(1) + val wholeStageTransformer = allWholeStageTransformers(2) GlutenClickHouseMetricsUTUtils.executeMetricsUpdater( wholeStageTransformer, diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index b900366759cc..1bd9bd459f12 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -295,9 +295,9 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g } - assert(allGlutenPlans.size == 58) + assert(allGlutenPlans.size == 60) - val shjPlan = allGlutenPlans(8) + val shjPlan = allGlutenPlans(10) assert(shjPlan.metrics("totalTime").value == 6) assert(shjPlan.metrics("inputWaitTime").value == 5) assert(shjPlan.metrics("outputWaitTime").value == 0) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala index 08f4522d9ce4..f28c60b83d11 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala @@ -336,7 +336,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui compareResultsAgainstVanillaSpark(sql5, compareResult = true, _ => {}) } - test("TakeOrderedAndProjectExecTransformer in broadcastRelation") { + test("TakeOrderedAndProjectExec in broadcastRelation") { val q = """ | with dd as ( @@ -350,7 +350,13 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui | from store_sales ss, dd | where ss_sold_date_sk=dd.d_date_sk+1 |""".stripMargin - runQueryAndCompare(q)(checkGlutenOperatorMatch[TakeOrderedAndProjectExecTransformer]) + runQueryAndCompare(q) { + df => + val sortLimit = df.queryExecution.executedPlan.collect { + case sortLimit @ LimitTransformer(_: SortExecTransformer, _, _) => sortLimit + } + assert(sortLimit.size == 2) + } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 30d69e846d65..07b9c3ae4b76 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -27,7 +27,6 @@ import org.apache.gluten.utils.PlanUtil import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.catalyst.plans.physical.SinglePartition @@ -430,12 +429,9 @@ object OffloadOthers { val sortOrder = plan.sortOrder val projectList = plan.projectList - val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder) - val localSorted = if (orderingSatisfies) { - child - } else { - SortExecTransformer(sortOrder, global = false, child) - } + // Always add localSort here since child's ordering may change after applying offload rules. + // The localSort will be removed by rule [[EliminateLocalSort]] if not needed. + val localSorted = SortExecTransformer(sortOrder, global = false, child) val singlePartition = child.outputPartitioning.numPartitions == 1 val finalLimitPlan = if (singlePartition) {