Skip to content

Commit 2be1564

Browse files
wakunGitHub Enterprise
authored andcommitted
[CARMEL-6752] Support more subexpression elimination cases (apache#266)
1 parent b510cf9 commit 2be1564

File tree

19 files changed

+372
-675
lines changed

19 files changed

+372
-675
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import java.util.Objects
2121

2222
import scala.collection.mutable
2323

24+
import org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.supportedExpression
2425
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
26+
import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue
2527
import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable
2628
import org.apache.spark.sql.internal.SQLConf
2729
import org.apache.spark.util.Utils
@@ -151,32 +153,9 @@ class EquivalentExpressions(
151153
// 2. ConditionalExpression: use its children that will always be evaluated.
152154
private def childrenToRecurse(expr: Expression): Seq[Expression] = expr match {
153155
case _: CodegenFallback => Nil
154-
case c: ConditionalExpression => c.alwaysEvaluatedInputs.map(skipForShortcut)
155156
case other => skipForShortcut(other).children
156157
}
157158

158-
// For some special expressions we cannot just recurse into all of its children, but we can
159-
// recursively add the common expressions shared between all of its children.
160-
private def commonChildrenToRecurse(expr: Expression): Seq[Seq[Expression]] = expr match {
161-
case _: CodegenFallback => Nil
162-
case c: ConditionalExpression => c.branchGroups
163-
case _ => Nil
164-
}
165-
166-
private def supportedExpression(e: Expression) = {
167-
!e.exists {
168-
// `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the
169-
// loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning.
170-
case _: LambdaVariable => true
171-
172-
// `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
173-
// can cause error like NPE.
174-
case _: PlanExpression[_] => Utils.isInRunningSparkTask
175-
176-
case _ => false
177-
}
178-
}
179-
180159
/**
181160
* Adds the expression to this data structure recursively. Stops if a matching expression
182161
* is found. That is, if `expr` has already been added, its children are not added.
@@ -198,7 +177,6 @@ class EquivalentExpressions(
198177
if (!skip && !updateExprInMap(expr, map, useCount)) {
199178
val uc = useCount.signum
200179
childrenToRecurse(expr).foreach(updateExprTree(_, map, uc))
201-
commonChildrenToRecurse(expr).filter(_.nonEmpty).foreach(updateCommonExprs(_, map, uc))
202180
}
203181
}
204182

@@ -240,6 +218,23 @@ class EquivalentExpressions(
240218
}
241219
}
242220

221+
object EquivalentExpressions {
222+
def supportedExpression(e: Expression): Boolean = {
223+
!e.exists {
224+
// `LambdaVariable` is usually used as a loop variable and `NamedLambdaVariable` is used in
225+
// higher-order functions, which can't be evaluated ahead of the execution.
226+
case _: LambdaVariable => true
227+
case _: NamedLambdaVariable => true
228+
229+
// `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
230+
// can cause error like NPE.
231+
case _: PlanExpression[_] => Utils.isInRunningSparkTask
232+
233+
case _ => false
234+
}
235+
}
236+
}
237+
243238
/**
244239
* Wrapper around an Expression that provides semantic equality.
245240
*/
@@ -267,4 +262,11 @@ case class ExpressionEquals(e: Expression) {
267262
* Instead of appending to a mutable list/buffer of Expressions, just update the "flattened"
268263
* useCount in this wrapper in-place.
269264
*/
270-
case class ExpressionStats(expr: Expression)(var useCount: Int)
265+
case class ExpressionStats(expr: Expression)(
266+
var useCount: Int,
267+
var initialized: Option[String] = None,
268+
var isNull: Option[ExprValue] = None,
269+
var value: Option[ExprValue] = None,
270+
var funcName: Option[String] = None,
271+
var params: Option[Seq[Class[_]]] = None,
272+
var addedFunction: Boolean = false)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,40 @@ abstract class Expression extends TreeNode[Expression] {
196196
}.getOrElse {
197197
val isNull = ctx.freshName("isNull")
198198
val value = ctx.freshName("value")
199-
val eval = doGenCode(ctx, ExprCode(
200-
JavaCode.isNullVariable(isNull),
201-
JavaCode.variable(value, dataType)))
199+
val eval =
200+
if (EquivalentExpressions.supportedExpression(this)) {
201+
ctx.commonExpressions.get(ExpressionEquals(this)) match {
202+
case Some(stats) =>
203+
// We should reuse the currentVar references which code is not empty
204+
val nonEmptyRefs = this.exists {
205+
case BoundReference(ordinal, _, _) =>
206+
ctx.currentVars != null && ctx.currentVars(ordinal) != null &&
207+
ctx.currentVars(ordinal).code != EmptyBlock
208+
case _ => false
209+
}
210+
val eval = doGenCode(ctx, ExprCode(
211+
JavaCode.isNullVariable(isNull),
212+
JavaCode.variable(value, dataType)))
213+
if (eval.code != EmptyBlock && !nonEmptyRefs) {
214+
ctx.genReusedCode(stats, eval)
215+
} else {
216+
eval
217+
}
218+
219+
case None =>
220+
doGenCode(ctx, ExprCode(
221+
JavaCode.isNullVariable(isNull),
222+
JavaCode.variable(value, dataType)))
223+
}
224+
} else {
225+
doGenCode(ctx, ExprCode(
226+
JavaCode.isNullVariable(isNull),
227+
JavaCode.variable(value, dataType)))
228+
}
229+
230+
231+
232+
202233
reduceCodeSize(ctx, eval)
203234
if (eval.code.toString.nonEmpty) {
204235
// Add `this` in the comment.

0 commit comments

Comments
 (0)