Skip to content

Commit

Permalink
[GLUTEN-7359][VL] Enable partial project in RAS (#7574)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Oct 18, 2024
1 parent cdd45fe commit 00b71e8
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private object VeloxRuleApi {

// Gluten RAS: Post rules.
injector.inject(_ => RemoveTransitions)
injector.inject(c => PartialProjectRule.apply(c.session))
injector.inject(_ => RemoveNativeWriteFilesSortAndProject())
injector.inject(c => RewriteTransformer.apply(c.session))
injector.inject(_ => PushDownFilterToScan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
return ValidationResult.failed("Contains expression not supported")
}
if (
ExpressionUtils.isComplexExpression(
ExpressionUtils.hasComplexExpressions(
original,
GlutenConfig.getConf.fallbackExpressionsThreshold)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,21 @@ import org.apache.spark.sql.functions.udf

import java.io.File

class UDFPartialProjectSuite extends WholeStageTransformerSuite {
class UDFPartialProjectSuiteRasOff extends UDFPartialProjectSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.gluten.ras.enabled", "false")
}
}

class UDFPartialProjectSuiteRasOn extends UDFPartialProjectSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.gluten.ras.enabled", "true")
}
}

abstract class UDFPartialProjectSuite extends WholeStageTransformerSuite {
disableFallbackCheck
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object ExpressionUtils {
}
}

def isComplexExpression(plan: SparkPlan, threshold: Int): Boolean = {
def hasComplexExpressions(plan: SparkPlan, threshold: Int): Boolean = {
plan.expressions.exists(e => ExpressionUtils.getExpressionTreeDepth(e) > threshold)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object Validators {

private class FallbackComplexExpressions(threshold: Int) extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = {
if (ExpressionUtils.isComplexExpression(plan, threshold)) {
if (ExpressionUtils.hasComplexExpressions(plan, threshold)) {
return fail(
s"Disabled because at least one present expression exceeded depth threshold: " +
s"${plan.nodeName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.planner.cost
import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil

import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.{ColumnarToRowExec, ProjectExec, RowToColumnarExec, SparkPlan}

/**
* A cost model that is supposed to drive RAS planner create the same query plan with legacy
Expand All @@ -37,6 +37,10 @@ class LegacyCostModel extends LongCostModel {
case ColumnarToRowLike(_) => 10L
case RowToColumnarLike(_) => 10L
case p if PlanUtil.isGlutenColumnarOp(p) => 10L
// 1. 100L << 1000L, to keep the pulled out non-offload-able projects if the main op
// turns into offload-able after pulling.
// 2. 100L >> 10L, to offload project op itself eagerly.
case ProjectExec(_, _) => 100L
case p if PlanUtil.isVanillaColumnarOp(p) => 1000L
// Other row ops. Usually a vanilla row op.
case _ => 1000L
Expand Down

0 comments on commit 00b71e8

Please sign in to comment.