-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25784][SQL] Infer filters from constraints after rewriting predicate subquery #22778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c8d1b91
6596327
db519c3
80bf621
d41df58
6b2a2da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer | |
|
||
import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
import org.apache.spark.sql.catalyst.dsl.plans._ | ||
import org.apache.spark.sql.catalyst.expressions.{IsNull, ListQuery, Not} | ||
import org.apache.spark.sql.catalyst.expressions.{IsNotNull, IsNull, ListQuery, Not} | ||
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, LeftSemi, PlanTest} | ||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} | ||
import org.apache.spark.sql.catalyst.rules.RuleExecutor | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
|
||
class RewriteSubquerySuite extends PlanTest { | ||
|
@@ -33,23 +34,52 @@ class RewriteSubquerySuite extends PlanTest { | |
Batch("Rewrite Subquery", FixedPoint(1), | ||
RewritePredicateSubquery, | ||
ColumnPruning, | ||
InferFiltersFromConstraints, | ||
PushDownPredicates, | ||
CollapseProject, | ||
CombineFilters, | ||
RemoveNoopOperators) :: Nil | ||
} | ||
|
||
val relation = LocalRelation('a.int, 'b.int) | ||
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int) | ||
|
||
test("Column pruning after rewriting predicate subquery") { | ||
val relation = LocalRelation('a.int, 'b.int) | ||
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int) | ||
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to modify this existing test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. Thanks. |
||
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a) | ||
val optimized = Optimize.execute(query.analyze) | ||
|
||
val correctAnswer = relation | ||
.select('a) | ||
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x)) | ||
.analyze | ||
|
||
comparePlans(optimized, correctAnswer) | ||
} | ||
} | ||
|
||
test("Infer filters and push down predicate after rewriting predicate subquery") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need the column pruning in the test title? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about making the test title simple, then leaving comments about what's tested clearly here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about refactor these test to: val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)
test("Column pruning") {
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
.analyze
comparePlans(optimized, correctAnswer)
}
}
test("Column pruning, infer filters and push down predicate") {
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.where(IsNotNull('a)).select('a)
.join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x))
.analyze
comparePlans(optimized, correctAnswer)
}
} |
||
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") { | ||
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a) | ||
val optimized = Optimize.execute(query.analyze) | ||
|
||
val correctAnswer = relation | ||
.where(IsNotNull('a)).select('a) | ||
.join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x)) | ||
.analyze | ||
|
||
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
} | ||
|
||
test("combine filters after rewriting predicate subquery") { | ||
val query = relation.where('a.in(ListQuery(relInSubquery.select('x).where('y > 1)))).select('a) | ||
val optimized = Optimize.execute(query.analyze) | ||
val correctAnswer = relation | ||
.select('a) | ||
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x)) | ||
.analyze | ||
|
||
comparePlans(optimized, correctAnswer) | ||
val correctAnswer = relation | ||
.where(IsNotNull('a)).select('a) | ||
.join(relInSubquery.where(IsNotNull('x) && IsNotNull('y) && 'y > 1).select('x), | ||
LeftSemi, Some('a === 'x)) | ||
} | ||
|
||
test("NOT-IN subquery nested inside OR") { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not have a good answer for this PR. Ideally, we should run the whole batch
operatorOptimizationBatch
. However, running the whole batch could be very time consuming. I would suggest to add a new parameter for introducing the time bound limit for each batch.cc @maryannxue WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Do you think its a good time to revisit Natt's PR to convert subquery expressions to Joins early in the optimization process ? Perhaps then we can take advantage of all the subsequent rules firing after the subquery rewrite ?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I think @dilipbiswal's suggestion is the right way to go. If you think of this subquery rewriting as another kind of de-correlation, it should be a pre-optimization rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. That sounds also good to me. @dilipbiswal Could you take the PR #17520 over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Sure Sean.. Let me give it a try.