-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-32945][SQL] Avoid collapsing projects if reaching max allowed common exprs #29950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f418714
98843dd
1b567e7
76509b3
43eb50d
4bf4dc2
9bfafc7
c2c01e4
4990375
e8f18f8
58e71d8
bbaae3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -214,7 +214,8 @@ abstract class Optimizer(catalogManager: CatalogManager) | |
// The following batch should be executed after batch "Join Reorder" and "LocalRelation". | ||
Batch("Check Cartesian Products", Once, | ||
CheckCartesianProducts) :+ | ||
Batch("RewriteSubquery", Once, | ||
// `CollapseProject` cannot collapse all projects in once. So we need `fixedPoint` here. | ||
Batch("RewriteSubquery", fixedPoint, | ||
RewritePredicateSubquery, | ||
ColumnPruning, | ||
CollapseProject, | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
@@ -724,20 +725,17 @@ object ColumnPruning extends Rule[LogicalPlan] { | |
/** | ||
* Combines two [[Project]] operators into one and perform alias substitution, | ||
* merging the expressions into one single expression for the following cases. | ||
* 1. When two [[Project]] operators are adjacent. | ||
* 1. When two [[Project]] operators are adjacent, if the number of common expressions in the | ||
* combined [[Project]] is not more than `spark.sql.optimizer.maxCommonExprsInCollapseProject`. | ||
* 2. When two [[Project]] operators have LocalLimit/Sample/Repartition operator between them | ||
* and the upper project consists of the same number of columns which is equal or aliasing. | ||
* `GlobalLimit(LocalLimit)` pattern is also considered. | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
object CollapseProject extends Rule[LogicalPlan] with AliasHelper { | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
case p1 @ Project(_, p2: Project) => | ||
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { | ||
p1 | ||
} else { | ||
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList)) | ||
} | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to change from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the previous comment about supporting |
||
case p @ Project(_, _: Project) => | ||
collapseProjects(p) | ||
case p @ Project(_, agg: Aggregate) => | ||
if (haveCommonNonDeterministicOutput(p.projectList, agg.aggregateExpressions)) { | ||
p | ||
|
@@ -758,6 +756,42 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { | |
s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, p2.projectList))) | ||
} | ||
|
||
private def collapseProjects(plan: LogicalPlan): LogicalPlan = plan match { | ||
case p1 @ Project(_, p2: Project) => | ||
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) || | ||
moreThanMaxAllowedCommonOutput(p1.projectList, p2.projectList)) { | ||
p1 | ||
} else { | ||
collapseProjects( | ||
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))) | ||
} | ||
case _ => plan | ||
} | ||
|
||
private def collectAliases(projectList: Seq[NamedExpression]): AttributeMap[Alias] = { | ||
AttributeMap(projectList.collect { | ||
case a: Alias => a.toAttribute -> a | ||
}) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could extend to other cases like |
||
|
||
// Whether the largest times common outputs from lower operator used in upper operators is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// larger than allowed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
private def moreThanMaxAllowedCommonOutput( | ||
upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = { | ||
val aliases = collectAliases(lower) | ||
val exprMap = mutable.HashMap.empty[Attribute, Int] | ||
|
||
upper.foreach(_.collect { | ||
case a: Attribute if aliases.contains(a) => exprMap.update(a, exprMap.getOrElse(a, 0) + 1) | ||
}) | ||
|
||
if (exprMap.nonEmpty) { | ||
exprMap.maxBy(_._2)._2 > SQLConf.get.maxCommonExprsInCollapseProject | ||
} else { | ||
false | ||
} | ||
} | ||
|
||
private def haveCommonNonDeterministicOutput( | ||
upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = { | ||
val aliases = getAliasMap(lower) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1963,6 +1963,27 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT = | ||
buildConf("spark.sql.optimizer.maxCommonExprsInCollapseProject") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we set this value to 1, all the existing tests can pass? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess not. We might have at lease few common expressions in collapsed projection. If set to 1, any duplicated expression is not allowed. |
||
.doc("An integer number indicates the maximum allowed number of common input expression " + | ||
"from lower Project when being collapsed into upper Project by optimizer rule " + | ||
"`CollapseProject`. Normally `CollapseProject` will collapse adjacent Project " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Just a comment) Even if we set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but currently if we exclude There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm I see. Yea, updating the doc sounds nice to me. |
||
"and merge expressions. But in some edge cases, expensive expressions might be " + | ||
"duplicated many times in merged Project by this optimization. This config sets " + | ||
"a maximum number. Once an expression is duplicated more than this number " + | ||
"if merging two Project, Spark SQL will skip the merging. Note that normally " + | ||
"in whole-stage codegen Project operator will de-duplicate expressions internally, " + | ||
"but in edge cases Spark cannot do whole-stage codegen and fallback to interpreted " + | ||
"mode. In such cases, users can use this config to avoid duplicate expressions. " + | ||
"Note that even users exclude `CollapseProject` rule using " + | ||
"`spark.sql.optimizer.excludedRules`, at physical planning phase Spark will still " + | ||
"collapse projections. This config is also effective on collapsing projections in " + | ||
"the physical planning.") | ||
.version("3.1.0") | ||
.intConf | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.checkValue(_ > 0, "The value of maxCommonExprsInCollapseProject must be larger than zero.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.createWithDefault(Int.MaxValue) | ||
|
||
val DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = | ||
buildConf("spark.sql.decimalOperations.allowPrecisionLoss") | ||
.internal() | ||
|
@@ -3405,6 +3426,8 @@ class SQLConf extends Serializable with Logging { | |
|
||
def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER) | ||
|
||
def maxCommonExprsInCollapseProject: Int = getConf(MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT) | ||
|
||
def decimalOperationsAllowPrecisionLoss: Boolean = getConf(DECIMAL_OPERATIONS_ALLOW_PREC_LOSS) | ||
|
||
def literalPickMinimumPrecision: Boolean = getConf(LITERAL_PICK_MINIMUM_PRECISION) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer | |
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
import org.apache.spark.sql.catalyst.dsl.plans._ | ||
import org.apache.spark.sql.catalyst.expressions.{Alias, Rand} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.PlanTest | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules.RuleExecutor | ||
import org.apache.spark.sql.types.MetadataBuilder | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.{MetadataBuilder, StructType} | ||
|
||
class CollapseProjectSuite extends PlanTest { | ||
object Optimize extends RuleExecutor[LogicalPlan] { | ||
|
@@ -170,4 +171,59 @@ class CollapseProjectSuite extends PlanTest { | |
val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 'c)).analyze | ||
comparePlans(optimized, expected) | ||
} | ||
|
||
test("SPARK-32945: avoid collapsing projects if reaching max allowed common exprs") { | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val options = Map.empty[String, String] | ||
val schema = StructType.fromDDL("a int, b int, c string, d long") | ||
|
||
Seq("1", "2", "3", "4").foreach { maxCommonExprs => | ||
withSQLConf(SQLConf.MAX_COMMON_EXPRS_IN_COLLAPSE_PROJECT.key -> maxCommonExprs) { | ||
// If we collapse two Projects, `JsonToStructs` will be repeated three times. | ||
val relation = LocalRelation('json.string) | ||
val query1 = relation.select( | ||
JsonToStructs(schema, options, 'json).as("struct")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation? Maybe, the following is better? - val query1 = relation.select(
- JsonToStructs(schema, options, 'json).as("struct"))
- .select(
+ val query1 = relation.select(JsonToStructs(schema, options, 'json).as("struct"))
+ .select( |
||
.select( | ||
GetStructField('struct, 0).as("a"), | ||
GetStructField('struct, 1).as("b"), | ||
GetStructField('struct, 2).as("c"), | ||
GetStructField('struct, 3).as("d")).analyze | ||
val optimized1 = Optimize.execute(query1) | ||
|
||
val query2 = relation | ||
.select('json, JsonToStructs(schema, options, 'json).as("struct")) | ||
.select('json, 'struct, GetStructField('struct, 0).as("a")) | ||
.select('json, 'struct, 'a, GetStructField('struct, 1).as("b")) | ||
.select('json, 'struct, 'a, 'b, GetStructField('struct, 2).as("c")) | ||
.analyze | ||
val optimized2 = Optimize.execute(query2) | ||
|
||
if (maxCommonExprs.toInt < 4) { | ||
val expected1 = query1 | ||
comparePlans(optimized1, expected1) | ||
|
||
val expected2 = relation | ||
.select('json, JsonToStructs(schema, options, 'json).as("struct")) | ||
.select('json, 'struct, | ||
GetStructField('struct, 0).as("a"), | ||
GetStructField('struct, 1).as("b"), | ||
GetStructField('struct, 2).as("c")) | ||
.analyze | ||
comparePlans(optimized2, expected2) | ||
} else { | ||
val expected1 = relation.select( | ||
GetStructField(JsonToStructs(schema, options, 'json), 0).as("a"), | ||
GetStructField(JsonToStructs(schema, options, 'json), 1).as("b"), | ||
GetStructField(JsonToStructs(schema, options, 'json), 2).as("c"), | ||
GetStructField(JsonToStructs(schema, options, 'json), 3).as("d")).analyze | ||
comparePlans(optimized1, expected1) | ||
|
||
val expected2 = relation.select('json, JsonToStructs(schema, options, 'json).as("struct"), | ||
GetStructField(JsonToStructs(schema, options, 'json), 0).as("a"), | ||
GetStructField(JsonToStructs(schema, options, 'json), 1).as("b"), | ||
GetStructField(JsonToStructs(schema, options, 'json), 2).as("c")).analyze | ||
comparePlans(optimized2, expected2) | ||
} | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.