From 00b71e80a6a4bca999e2c0d7bd55aacddcb0bffa Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 18 Oct 2024 11:41:20 +0800 Subject: [PATCH] [GLUTEN-7359][VL] Enable partial project in RAS (#7574) --- .../gluten/backendsapi/velox/VeloxRuleApi.scala | 1 + .../execution/ColumnarPartialProjectExec.scala | 2 +- .../expression/UDFPartialProjectSuite.scala | 16 +++++++++++++++- .../gluten/expression/ExpressionUtils.scala | 2 +- .../columnar/validator/Validators.scala | 2 +- .../gluten/planner/cost/LegacyCostModel.scala | 6 +++++- 6 files changed, 24 insertions(+), 5 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 65bf7844c6ae..84257ed3f7cd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -110,6 +110,7 @@ private object VeloxRuleApi { // Gluten RAS: Post rules. injector.inject(_ => RemoveTransitions) + injector.inject(c => PartialProjectRule.apply(c.session)) injector.inject(_ => RemoveNativeWriteFilesSortAndProject()) injector.inject(c => RewriteTransformer.apply(c.session)) injector.inject(_ => PushDownFilterToScan) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala index d42b7eecf6b9..fc0fc041a45d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala @@ -157,7 +157,7 @@ case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( return ValidationResult.failed("Contains expression not supported") } if ( - ExpressionUtils.isComplexExpression( + ExpressionUtils.hasComplexExpressions( original, GlutenConfig.getConf.fallbackExpressionsThreshold) ) { diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala index cd3d0c531c21..757d4da1313f 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala @@ -24,7 +24,21 @@ import org.apache.spark.sql.functions.udf import java.io.File -class UDFPartialProjectSuite extends WholeStageTransformerSuite { +class UDFPartialProjectSuiteRasOff extends UDFPartialProjectSuite { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.gluten.ras.enabled", "false") + } +} + +class UDFPartialProjectSuiteRasOn extends UDFPartialProjectSuite { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.gluten.ras.enabled", "true") + } +} + +abstract class UDFPartialProjectSuite extends WholeStageTransformerSuite { disableFallbackCheck override protected val resourcePath: String = "/tpch-data-parquet-velox" override protected val fileFormat: String = "parquet" diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionUtils.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionUtils.scala index db129c734ca5..1182791c6e7f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionUtils.scala @@ -33,7 +33,7 @@ object ExpressionUtils { } } - def isComplexExpression(plan: SparkPlan, threshold: Int): Boolean = { + def hasComplexExpressions(plan: SparkPlan, threshold: Int): Boolean = { plan.expressions.exists(e => ExpressionUtils.getExpressionTreeDepth(e) > threshold) } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 404df5edad21..50201efc07d2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -118,7 +118,7 @@ object Validators { private class FallbackComplexExpressions(threshold: Int) extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { - if (ExpressionUtils.isComplexExpression(plan, threshold)) { + if (ExpressionUtils.hasComplexExpressions(plan, threshold)) { return fail( s"Disabled because at least one present expression exceeded depth threshold: " + s"${plan.nodeName}") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala index 9810656197be..c6f4f1fc9f21 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala @@ -19,7 +19,7 @@ package org.apache.gluten.planner.cost import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike} import org.apache.gluten.utils.PlanUtil -import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarToRowExec, ProjectExec, RowToColumnarExec, SparkPlan} /** * A cost model that is supposed to drive RAS planner create the same query plan with legacy @@ -37,6 +37,10 @@ class LegacyCostModel extends LongCostModel { case ColumnarToRowLike(_) => 10L case RowToColumnarLike(_) => 10L case p if PlanUtil.isGlutenColumnarOp(p) => 10L + // 1. 100L << 1000L, to keep the pulled out non-offload-able projects if the main op + // turns into offload-able after pulling. + // 2. 100L >> 10L, to offload project op itself eagerly. + case ProjectExec(_, _) => 100L case p if PlanUtil.isVanillaColumnarOp(p) => 1000L // Other row ops. Usually a vanilla row op. case _ => 1000L