Skip to content

[SPARK-33092][SQL] Support subexpression elimination in ProjectExec #29975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Oct 8, 2020

What changes were proposed in this pull request?

This patch proposes to add subexpression elimination support into ProjectExec. It can be controlled by spark.sql.subexpressionElimination.enabled config.

Before this change:

val df = spark.read.option("header", true).csv("/tmp/test.csv")
 df.withColumn("my_map", expr("str_to_map(foo, '&', '=')")).select(col("my_map")("foo"), col("my_map")("bar"), col("my_map")("baz")).debugCodegen

L27-40: first str_to_map.
L68:81: second str_to_map.
L109-122: third str_to_map.

/* 024 */   private void project_doConsume_0(InternalRow inputadapter_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 025 */     boolean project_isNull_0 = true;
/* 026 */     UTF8String project_value_0 = null;
/* 027 */     boolean project_isNull_1 = true;  
/* 028 */     MapData project_value_1 = null; 
/* 029 */                                                                             
/* 030 */     if (!project_exprIsNull_0_0) {
/* 031 */       project_isNull_1 = false; // resultCode could change nullability.
/* 032 */                                                                             
/* 033 */       UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 034 */       for(UTF8String kvEntry: project_kvs_0) {                                                                                                                     
/* 035 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[2] /* literal */), 2);
/* 036 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 037 */       }
/* 038 */       project_value_1 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).build();
/* 039 */
/* 040 */     }
/* 041 */     if (!project_isNull_1) {
/* 042 */       project_isNull_0 = false; // resultCode could change nullability.
/* 043 */
/* 044 */       final int project_length_0 = project_value_1.numElements();
/* 045 */       final ArrayData project_keys_0 = project_value_1.keyArray();
/* 046 */       final ArrayData project_values_0 = project_value_1.valueArray();
/* 047 */
/* 048 */       int project_index_0 = 0;
/* 049 */       boolean project_found_0 = false;
/* 050 */       while (project_index_0 < project_length_0 && !project_found_0) {
/* 051 */         final UTF8String project_key_0 = project_keys_0.getUTF8String(project_index_0);
/* 052 */         if (project_key_0.equals(((UTF8String) references[3] /* literal */))) {
/* 053 */           project_found_0 = true; 
/* 054 */         } else {
/* 055 */           project_index_0++;
/* 056 */         }
/* 057 */       }
/* 058 */
/* 059 */       if (!project_found_0 || project_values_0.isNullAt(project_index_0)) {
/* 060 */         project_isNull_0 = true;
/* 061 */       } else {
/* 062 */         project_value_0 = project_values_0.getUTF8String(project_index_0);
/* 063 */       }
/* 064 */
/* 065 */     }
/* 066 */     boolean project_isNull_6 = true;
/* 067 */     UTF8String project_value_6 = null;
/* 068 */     boolean project_isNull_7 = true;
/* 069 */     MapData project_value_7 = null;
/* 070 */
/* 071 */     if (!project_exprIsNull_0_0) {
/* 072 */       project_isNull_7 = false; // resultCode could change nullability.
/* 073 */
/* 074 */       UTF8String[] project_kvs_1 = project_expr_0_0.split(((UTF8String) references[5] /* literal */), -1);
/* 075 */       for(UTF8String kvEntry: project_kvs_1) {
/* 076 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[6] /* literal */), 2);
/* 077 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[4] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 078 */       }
/* 079 */       project_value_7 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[4] /* mapBuilder */).build();
/* 080 */
/* 081 */     }
/* 082 */     if (!project_isNull_7) {
/* 083 */       project_isNull_6 = false; // resultCode could change nullability.
/* 084 */
/* 085 */       final int project_length_1 = project_value_7.numElements();
/* 086 */       final ArrayData project_keys_1 = project_value_7.keyArray();
/* 087 */       final ArrayData project_values_1 = project_value_7.valueArray();
/* 088 */
/* 089 */       int project_index_1 = 0;
/* 090 */       boolean project_found_1 = false;
/* 091 */       while (project_index_1 < project_length_1 && !project_found_1) {
/* 092 */         final UTF8String project_key_1 = project_keys_1.getUTF8String(project_index_1);
/* 093 */         if (project_key_1.equals(((UTF8String) references[7] /* literal */))) {
/* 094 */           project_found_1 = true; 
/* 095 */         } else {
/* 096 */           project_index_1++;
/* 097 */         }
/* 098 */       }
/* 099 */
/* 100 */       if (!project_found_1 || project_values_1.isNullAt(project_index_1)) {
/* 101 */         project_isNull_6 = true;
/* 102 */       } else {
/* 103 */         project_value_6 = project_values_1.getUTF8String(project_index_1);
/* 104 */       }
/* 105 */
/* 106 */     }
/* 107 */     boolean project_isNull_12 = true;
/* 108 */     UTF8String project_value_12 = null;
/* 109 */     boolean project_isNull_13 = true;
/* 110 */     MapData project_value_13 = null;
/* 111 */
/* 112 */     if (!project_exprIsNull_0_0) {
/* 113 */       project_isNull_13 = false; // resultCode could change nullability.
/* 114 */
/* 115 */       UTF8String[] project_kvs_2 = project_expr_0_0.split(((UTF8String) references[9] /* literal */), -1);
/* 116 */       for(UTF8String kvEntry: project_kvs_2) {
/* 117 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[10] /* literal */), 2);
/* 118 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[8] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 119 */       }
/* 120 */       project_value_13 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[8] /* mapBuilder */).build();
/* 121 */
/* 122 */     }
...

After this change:

L27-40 evaluates the common map variable.

/* 024 */   private void project_doConsume_0(InternalRow inputadapter_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 025 */     // common sub-expressions
/* 026 */                               
/* 027 */     boolean project_isNull_0 = true;  
/* 028 */     MapData project_value_0 = null;                                                                      
/* 029 */                                                                                                          
/* 030 */     if (!project_exprIsNull_0_0) {                                                                       
/* 031 */       project_isNull_0 = false; // resultCode could change nullability.
/* 032 */                 
/* 033 */       UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 034 */       for(UTF8String kvEntry: project_kvs_0) {
/* 035 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[2] /* literal */), 2);
/* 036 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 037 */       }                                                                                                  
/* 038 */       project_value_0 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).build();
/* 039 */               
/* 040 */     }                                                                                                    
/* 041 */        
/* 042 */     boolean project_isNull_4 = true;
/* 043 */     UTF8String project_value_4 = null;
/* 044 */                                     
/* 045 */     if (!project_isNull_0) {          
/* 046 */       project_isNull_4 = false; // resultCode could change nullability.
/* 047 */                             
/* 048 */       final int project_length_0 = project_value_0.numElements();      
/* 049 */       final ArrayData project_keys_0 = project_value_0.keyArray();
/* 050 */       final ArrayData project_values_0 = project_value_0.valueArray();
/* 051 */                                                                                                          
/* 052 */       int project_index_0 = 0;                                                                           
/* 053 */       boolean project_found_0 = false;
/* 054 */       while (project_index_0 < project_length_0 && !project_found_0) {
/* 055 */         final UTF8String project_key_0 = project_keys_0.getUTF8String(project_index_0);
/* 056 */         if (project_key_0.equals(((UTF8String) references[3] /* literal */))) {
/* 057 */           project_found_0 = true;                                                                        
/* 058 */         } else {                                                                                         
/* 059 */           project_index_0++;
/* 060 */         }
/* 061 */       }
/* 062 */
/* 063 */       if (!project_found_0 || project_values_0.isNullAt(project_index_0)) {
/* 064 */         project_isNull_4 = true;
/* 065 */       } else {
/* 066 */         project_value_4 = project_values_0.getUTF8String(project_index_0);
/* 067 */       }
/* 068 */
/* 069 */     }
/* 070 */     boolean project_isNull_6 = true;
/* 071 */     UTF8String project_value_6 = null;
/* 072 */
/* 073 */     if (!project_isNull_0) {
/* 074 */       project_isNull_6 = false; // resultCode could change nullability.
/* 075 */
/* 076 */       final int project_length_1 = project_value_0.numElements();
/* 077 */       final ArrayData project_keys_1 = project_value_0.keyArray();
/* 078 */       final ArrayData project_values_1 = project_value_0.valueArray();
/* 079 */
/* 080 */       int project_index_1 = 0;
/* 081 */       boolean project_found_1 = false;
/* 082 */       while (project_index_1 < project_length_1 && !project_found_1) {
/* 083 */         final UTF8String project_key_1 = project_keys_1.getUTF8String(project_index_1);
/* 084 */         if (project_key_1.equals(((UTF8String) references[4] /* literal */))) {
/* 085 */           project_found_1 = true;
/* 086 */         } else {
/* 087 */           project_index_1++;
/* 088 */         }
/* 089 */       }
/* 090 */
/* 091 */       if (!project_found_1 || project_values_1.isNullAt(project_index_1)) {
/* 092 */         project_isNull_6 = true;
/* 093 */       } else {
/* 094 */         project_value_6 = project_values_1.getUTF8String(project_index_1);
/* 095 */       }
/* 096 */
/* 097 */     }
/* 098 */     boolean project_isNull_8 = true;
/* 099 */     UTF8String project_value_8 = null;
/* 100 */
...

When the code is split into separated method:

/* 026 */   private void project_doConsume_0(InternalRow inputadapter_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {       
/* 027 */     // common sub-expressions                                                                                                                                      
/* 028 */                                                                             
/* 029 */     MapData project_subExprValue_0 = project_subExpr_0(project_exprIsNull_0_0, project_expr_0_0);                                                                  
/* 030 */
...
/* 140 */   private MapData project_subExpr_0(boolean project_exprIsNull_0_0, org.apache.spark.unsafe.types.UTF8String project_expr_0_0) {                                   
/* 141 */     boolean project_isNull_0 = true;                                                                                                                               
/* 142 */     MapData project_value_0 = null;                                         
/* 143 */                                                                             
/* 144 */     if (!project_exprIsNull_0_0) {   
/* 145 */       project_isNull_0 = false; // resultCode could change nullability.
/* 146 */
/* 147 */       UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 148 */       for(UTF8String kvEntry: project_kvs_0) {
/* 149 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[2] /* literal */), 2);
/* 150 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 151 */       }
/* 152 */       project_value_0 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).build();
/* 153 */
/* 154 */     }
/* 155 */     project_subExprIsNull_0 = project_isNull_0;
/* 156 */     return project_value_0;
/* 157 */   }                                                                                                                 

Why are the changes needed?

Users occasionally write repeated expression in projection. It is also possibly that query optimizer optimizes a query to evaluate same expression many times in a Project. Currently in ProjectExec, we don't support subexpression elimination in Whole-stage codegen. We can support it to reduce redundant evaluation.

Does this PR introduce any user-facing change?

No

How was this patch tested?

spark.sql.subexpressionElimination.enabled is enabled by default. So that's said we should pass all tests with this change.

@viirya
Copy link
Member Author

viirya commented Oct 8, 2020

cc @cloud-fan @maropu @dongjoon-hyun

@@ -66,10 +66,22 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val exprs = bindReferences[Expression](projectList, child.output)
val resultVars = exprs.map(_.genCode(ctx))
val (subExprsCode, resultVars) = if (SQLConf.get.subexpressionEliminationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can access conf here conf.subexpressionEliminationEnabled

@viirya viirya changed the title [WIP][SPARK-33092][SQL] Support subexpression elimination in ProjectExec [SPARK-33092][SQL] Support subexpression elimination in ProjectExec Oct 8, 2020
} else {
("", exprs.map(_.genCode(ctx)))
}

// Evaluation of non-deterministic expressions can't be deferred.
val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute)
s"""
Copy link
Contributor

@cloud-fan cloud-fan Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give an example of this part of generated code for the query in https://issues.apache.org/jira/browse/SPARK-32989 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will update to the PR description.

Copy link
Contributor

@LuciferYang LuciferYang Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manual testing examples like SPARK-32989 , the performance is much better than before :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for testing! It is late in my timezone, I will update the generated code tomorrow.

@cloud-fan
Copy link
Contributor

cloud-fan commented Oct 8, 2020

sorry for my bad memory, is it an existing issue that subexpression elimination can cause perf regression because of the eager execution? e.g. an expression is inside a condition branch (e.g. CASE WHEN) and may not be evaluated.

cc @rednaxelafx

@SparkQA
Copy link

SparkQA commented Oct 8, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34150/

@SparkQA
Copy link

SparkQA commented Oct 8, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34151/

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Oct 8, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34150/

@SparkQA
Copy link

SparkQA commented Oct 8, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34151/

@maropu
Copy link
Member

maropu commented Oct 8, 2020

retest this please

@maropu
Copy link
Member

maropu commented Oct 8, 2020

(To check the running time of Jenkins jobs...)

@SparkQA
Copy link

SparkQA commented Oct 8, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34155/

@SparkQA
Copy link

SparkQA commented Oct 8, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34155/

@SparkQA

This comment has been minimized.

@rednaxelafx
Copy link
Contributor

#29975 (comment)

sorry for my bad memory, is it an existing issue that subexpression elimination can cause perf regression because of the eager execution? e.g. an expression is inside a condition branch (e.g. CASE WHEN) and may not be evaluated.

It's possible to cause performance regression, but I don't think conditional expressions are going to be an issue. From the latest Spark master:

// There are some special expressions that we should not recurse into all of its children.
// 1. CodegenFallback: it's children will not be used to generate code (call eval() instead)
// 2. If: common subexpressions will always be evaluated at the beginning, but the true and
// false expressions in `If` may not get accessed, according to the predicate
// expression. We should only recurse into the predicate expression.
// 3. CaseWhen: like `If`, the children of `CaseWhen` only get accessed in a certain
// condition. We should only recurse into the first condition expression as it
// will always get accessed.
// 4. Coalesce: it's also a conditional expression, we should only recurse into the first
// children, because others may not get accessed.
def childrenToRecurse: Seq[Expression] = expr match {
case _: CodegenFallback => Nil
case i: If => i.predicate :: Nil
case c: CaseWhen => c.children.head :: Nil
case c: Coalesce => c.children.head :: Nil
case other => other.children
}

You can see that CSE excludes the conditional branches from conditional expressions, so you won't have expressions accidentally hoisted from those branches.

@rednaxelafx
Copy link
Contributor

This PR looks good to me in general, thanks for making this change!

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34173/

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34173/

@SparkQA

This comment has been minimized.

@@ -268,7 +268,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}
}
// this input data will fail to read middle way.
val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j)
val input = spark.range(15).select(failingUdf('id).as('i)).select('i, -'i as 'j)
Copy link
Member Author

@viirya viirya Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failingUdf is evaluated twice for each row previously. Now it is only once. So we need to increase range to make it throw exception as before.

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34182/

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34182/

@SparkQA

This comment has been minimized.

@maropu
Copy link
Member

maropu commented Oct 9, 2020

retest this please

case class SubExprCodes(
codes: Seq[String],
states: Map[Expression, SubExprEliminationState],
exprCodesNeedEvaluate: Seq[ExprCode])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed to support subexpr elimination in ProjectExec? What I'm interested in is that why we didn't need this change when supporting it in HashAggregateExec.

Copy link
Member Author

@viirya viirya Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is needed. ProjectExec doesn't require all its child's outputs to be evaluated in advance. Instead it only early evaluates the outputs used more than twice (deferring evaluation). So we need to extract these variables used by subexpressions and evaluate them before subexpressions. In HashAggregateExec we don't need to consider that. The simplest way is to evaluate all child's outputs, of course.

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34189/

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34189/

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Test build #129584 has finished for PR 29975 at commit fe92bfe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val inputVarsForAllFuncs = commonExprs.map { expr =>
getLocalInputVariableValues(this, expr.head).toSeq
}
val (inputVarsForAllFuncs, exprCodesNeedEvaluate) = commonExprs.map { expr =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProjectExec doesn't require all its child's outputs to be evaluated in advance. Instead it only early evaluates the outputs used more than twice (deferring evaluation). So we need to extract these variables used by subexpressions and evaluate them before subexpressions

Could you leave some comments here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@@ -1739,8 +1747,11 @@ object CodeGenerator extends Logging {
def getLocalInputVariableValues(
ctx: CodegenContext,
expr: Expression,
subExprs: Map[Expression, SubExprEliminationState] = Map.empty): Set[VariableValue] = {
subExprs: Map[Expression, SubExprEliminationState] = Map.empty):
(Set[VariableValue], Set[ExprCode]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

      subExprs: Map[Expression, SubExprEliminationState] = Map.empty)
    : (Set[VariableValue], Set[ExprCode]) = {
    val argSet = mutable.Set[VariableValue]()

@@ -90,8 +90,13 @@ case class SubExprEliminationState(isNull: ExprValue, value: ExprValue)
* @param codes Strings representing the codes that evaluate common subexpressions.
* @param states Foreach expression that is participating in subexpression elimination,
* the state to use.
* @param exprCodesNeedEvaluate Some expression codes that need to be evaluate before
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: evaluate -> evaluated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

@@ -1739,8 +1747,11 @@ object CodeGenerator extends Logging {
def getLocalInputVariableValues(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe what's the second value of the returned value in the code comment above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


case e =>
stack.pushAll(e.children)
}
}

argSet.toSet
(argSet.toSet, exprCodesNeedEvaluate.toSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set instead of Seq here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already Set. You mean using Seq?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, my bad. Yes.

case class SubExprCodes(
codes: Seq[String],
states: Map[Expression, SubExprEliminationState],
exprCodesNeedEvaluate: Seq[ExprCode])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a suggestion: exprCodesNeedEvaluate -> exprCodesForEarlyEvals?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exprCodesForEarlyEvals sounds confusing. They are not for early evaluating something but needed for evaluating subexpressions.

val exprCode = ctx.currentVars(ref.ordinal)
// If the referred variable is not evaluated yet.
if (exprCode.code != EmptyBlock) {
exprCodesNeedEvaluate += exprCode.copy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this copy? A unnecessary copy can happen if exprCodesNeedEvaluate already has the same entry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying exprCode because we need the unevaluated code in exprCode, but we also need to empty code of exprCode. We need copied code so we can evaluate them before evaluating subexpressions. We need to empty code of exprCode so we don't re-evaluate the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to empty code of exprCode so we don't re-evaluate the code.

AFAIK when we empty the code, we also do a copy, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exprCode.code = EmptyBlock, do you mean this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I was wrong, the copy here is necessary.

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34254/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34254/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Test build #129650 has finished for PR 29975 at commit 2414bb0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 78c0967 Oct 12, 2020
@maropu
Copy link
Member

maropu commented Oct 12, 2020

Thanks! Merged to master.

@maropu
Copy link
Member

maropu commented Oct 12, 2020

NOTE: I also resolved SPARK-32989 with Fix Version/s=3.1.0, too.

@viirya
Copy link
Member Author

viirya commented Oct 12, 2020

Thanks all!

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
### What changes were proposed in this pull request?

This patch proposes to add subexpression elimination support into `ProjectExec`. It can be controlled by `spark.sql.subexpressionElimination.enabled` config.

Before this change:

```scala
val df = spark.read.option("header", true).csv("/tmp/test.csv")
 df.withColumn("my_map", expr("str_to_map(foo, '&', '=')")).select(col("my_map")("foo"), col("my_map")("bar"), col("my_map")("baz")).debugCodegen
```

L27-40: first `str_to_map`.
L68:81: second `str_to_map`.
L109-122: third `str_to_map`.

```
/* 024 */   private void project_doConsume_0(InternalRow inputadapter_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 025 */     boolean project_isNull_0 = true;
/* 026 */     UTF8String project_value_0 = null;
/* 027 */     boolean project_isNull_1 = true;
/* 028 */     MapData project_value_1 = null;
/* 029 */
/* 030 */     if (!project_exprIsNull_0_0) {
/* 031 */       project_isNull_1 = false; // resultCode could change nullability.
/* 032 */
/* 033 */       UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 034 */       for(UTF8String kvEntry: project_kvs_0) {
/* 035 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[2] /* literal */), 2);
/* 036 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 037 */       }
/* 038 */       project_value_1 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).build();
/* 039 */
/* 040 */     }
/* 041 */     if (!project_isNull_1) {
/* 042 */       project_isNull_0 = false; // resultCode could change nullability.
/* 043 */
/* 044 */       final int project_length_0 = project_value_1.numElements();
/* 045 */       final ArrayData project_keys_0 = project_value_1.keyArray();
/* 046 */       final ArrayData project_values_0 = project_value_1.valueArray();
/* 047 */
/* 048 */       int project_index_0 = 0;
/* 049 */       boolean project_found_0 = false;
/* 050 */       while (project_index_0 < project_length_0 && !project_found_0) {
/* 051 */         final UTF8String project_key_0 = project_keys_0.getUTF8String(project_index_0);
/* 052 */         if (project_key_0.equals(((UTF8String) references[3] /* literal */))) {
/* 053 */           project_found_0 = true;
/* 054 */         } else {
/* 055 */           project_index_0++;
/* 056 */         }
/* 057 */       }
/* 058 */
/* 059 */       if (!project_found_0 || project_values_0.isNullAt(project_index_0)) {
/* 060 */         project_isNull_0 = true;
/* 061 */       } else {
/* 062 */         project_value_0 = project_values_0.getUTF8String(project_index_0);
/* 063 */       }
/* 064 */
/* 065 */     }
/* 066 */     boolean project_isNull_6 = true;
/* 067 */     UTF8String project_value_6 = null;
/* 068 */     boolean project_isNull_7 = true;
/* 069 */     MapData project_value_7 = null;
/* 070 */
/* 071 */     if (!project_exprIsNull_0_0) {
/* 072 */       project_isNull_7 = false; // resultCode could change nullability.
/* 073 */
/* 074 */       UTF8String[] project_kvs_1 = project_expr_0_0.split(((UTF8String) references[5] /* literal */), -1);
/* 075 */       for(UTF8String kvEntry: project_kvs_1) {
/* 076 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[6] /* literal */), 2);
/* 077 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[4] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 078 */       }
/* 079 */       project_value_7 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[4] /* mapBuilder */).build();
/* 080 */
/* 081 */     }
/* 082 */     if (!project_isNull_7) {
/* 083 */       project_isNull_6 = false; // resultCode could change nullability.
/* 084 */
/* 085 */       final int project_length_1 = project_value_7.numElements();
/* 086 */       final ArrayData project_keys_1 = project_value_7.keyArray();
/* 087 */       final ArrayData project_values_1 = project_value_7.valueArray();
/* 088 */
/* 089 */       int project_index_1 = 0;
/* 090 */       boolean project_found_1 = false;
/* 091 */       while (project_index_1 < project_length_1 && !project_found_1) {
/* 092 */         final UTF8String project_key_1 = project_keys_1.getUTF8String(project_index_1);
/* 093 */         if (project_key_1.equals(((UTF8String) references[7] /* literal */))) {
/* 094 */           project_found_1 = true;
/* 095 */         } else {
/* 096 */           project_index_1++;
/* 097 */         }
/* 098 */       }
/* 099 */
/* 100 */       if (!project_found_1 || project_values_1.isNullAt(project_index_1)) {
/* 101 */         project_isNull_6 = true;
/* 102 */       } else {
/* 103 */         project_value_6 = project_values_1.getUTF8String(project_index_1);
/* 104 */       }
/* 105 */
/* 106 */     }
/* 107 */     boolean project_isNull_12 = true;
/* 108 */     UTF8String project_value_12 = null;
/* 109 */     boolean project_isNull_13 = true;
/* 110 */     MapData project_value_13 = null;
/* 111 */
/* 112 */     if (!project_exprIsNull_0_0) {
/* 113 */       project_isNull_13 = false; // resultCode could change nullability.
/* 114 */
/* 115 */       UTF8String[] project_kvs_2 = project_expr_0_0.split(((UTF8String) references[9] /* literal */), -1);
/* 116 */       for(UTF8String kvEntry: project_kvs_2) {
/* 117 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[10] /* literal */), 2);
/* 118 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[8] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 119 */       }
/* 120 */       project_value_13 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[8] /* mapBuilder */).build();
/* 121 */
/* 122 */     }
...
```
After this change:

L27-40 evaluates the common map variable.

```
/* 024 */   private void project_doConsume_0(InternalRow inputadapter_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 025 */     // common sub-expressions
/* 026 */
/* 027 */     boolean project_isNull_0 = true;
/* 028 */     MapData project_value_0 = null;
/* 029 */
/* 030 */     if (!project_exprIsNull_0_0) {
/* 031 */       project_isNull_0 = false; // resultCode could change nullability.
/* 032 */
/* 033 */       UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 034 */       for(UTF8String kvEntry: project_kvs_0) {
/* 035 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[2] /* literal */), 2);
/* 036 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 037 */       }
/* 038 */       project_value_0 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).build();
/* 039 */
/* 040 */     }
/* 041 */
/* 042 */     boolean project_isNull_4 = true;
/* 043 */     UTF8String project_value_4 = null;
/* 044 */
/* 045 */     if (!project_isNull_0) {
/* 046 */       project_isNull_4 = false; // resultCode could change nullability.
/* 047 */
/* 048 */       final int project_length_0 = project_value_0.numElements();
/* 049 */       final ArrayData project_keys_0 = project_value_0.keyArray();
/* 050 */       final ArrayData project_values_0 = project_value_0.valueArray();
/* 051 */
/* 052 */       int project_index_0 = 0;
/* 053 */       boolean project_found_0 = false;
/* 054 */       while (project_index_0 < project_length_0 && !project_found_0) {
/* 055 */         final UTF8String project_key_0 = project_keys_0.getUTF8String(project_index_0);
/* 056 */         if (project_key_0.equals(((UTF8String) references[3] /* literal */))) {
/* 057 */           project_found_0 = true;
/* 058 */         } else {
/* 059 */           project_index_0++;
/* 060 */         }
/* 061 */       }
/* 062 */
/* 063 */       if (!project_found_0 || project_values_0.isNullAt(project_index_0)) {
/* 064 */         project_isNull_4 = true;
/* 065 */       } else {
/* 066 */         project_value_4 = project_values_0.getUTF8String(project_index_0);
/* 067 */       }
/* 068 */
/* 069 */     }
/* 070 */     boolean project_isNull_6 = true;
/* 071 */     UTF8String project_value_6 = null;
/* 072 */
/* 073 */     if (!project_isNull_0) {
/* 074 */       project_isNull_6 = false; // resultCode could change nullability.
/* 075 */
/* 076 */       final int project_length_1 = project_value_0.numElements();
/* 077 */       final ArrayData project_keys_1 = project_value_0.keyArray();
/* 078 */       final ArrayData project_values_1 = project_value_0.valueArray();
/* 079 */
/* 080 */       int project_index_1 = 0;
/* 081 */       boolean project_found_1 = false;
/* 082 */       while (project_index_1 < project_length_1 && !project_found_1) {
/* 083 */         final UTF8String project_key_1 = project_keys_1.getUTF8String(project_index_1);
/* 084 */         if (project_key_1.equals(((UTF8String) references[4] /* literal */))) {
/* 085 */           project_found_1 = true;
/* 086 */         } else {
/* 087 */           project_index_1++;
/* 088 */         }
/* 089 */       }
/* 090 */
/* 091 */       if (!project_found_1 || project_values_1.isNullAt(project_index_1)) {
/* 092 */         project_isNull_6 = true;
/* 093 */       } else {
/* 094 */         project_value_6 = project_values_1.getUTF8String(project_index_1);
/* 095 */       }
/* 096 */
/* 097 */     }
/* 098 */     boolean project_isNull_8 = true;
/* 099 */     UTF8String project_value_8 = null;
/* 100 */
...
```

When the code is split into separated method:

```
/* 026 */   private void project_doConsume_0(InternalRow inputadapter_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 027 */     // common sub-expressions
/* 028 */
/* 029 */     MapData project_subExprValue_0 = project_subExpr_0(project_exprIsNull_0_0, project_expr_0_0);
/* 030 */
...
/* 140 */   private MapData project_subExpr_0(boolean project_exprIsNull_0_0, org.apache.spark.unsafe.types.UTF8String project_expr_0_0) {
/* 141 */     boolean project_isNull_0 = true;
/* 142 */     MapData project_value_0 = null;
/* 143 */
/* 144 */     if (!project_exprIsNull_0_0) {
/* 145 */       project_isNull_0 = false; // resultCode could change nullability.
/* 146 */
/* 147 */       UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[1] /* literal */), -1);
/* 148 */       for(UTF8String kvEntry: project_kvs_0) {
/* 149 */         UTF8String[] kv = kvEntry.split(((UTF8String) references[2] /* literal */), 2);
/* 150 */         ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).put(kv[0], kv.length == 2 ? kv[1] : null);
/* 151 */       }
/* 152 */       project_value_0 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[0] /* mapBuilder */).build();
/* 153 */
/* 154 */     }
/* 155 */     project_subExprIsNull_0 = project_isNull_0;
/* 156 */     return project_value_0;
/* 157 */   }
```

### Why are the changes needed?

Users occasionally write repeated expression in projection. It is also possibly that query optimizer optimizes a query to evaluate same expression many times in a Project. Currently in ProjectExec, we don't support subexpression elimination in Whole-stage codegen. We can support it to reduce redundant evaluation.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`spark.sql.subexpressionElimination.enabled` is enabled by default. So that's said we should pass all tests with this change.

Closes apache#29975 from viirya/SPARK-33092.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
@viirya viirya deleted the SPARK-33092 branch December 27, 2023 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants