Skip to content

[SPARK-25784][SQL] Infer filters from constraints after rewriting predicate subquery #22778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
override protected val excludedOnceBatches: Set[String] =
Set(
"PartitionPruning",
"Rewrite Subquery",
"Extract Python UDFs")

protected def fixedPoint =
Expand Down Expand Up @@ -210,10 +211,14 @@ abstract class Optimizer(catalogManager: CatalogManager)
// The following batch should be executed after batch "Join Reorder" and "LocalRelation".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
Batch("Rewrite Subquery", Once,
Copy link
Member

@gatorsmile gatorsmile Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have a good answer for this PR. Ideally, we should run the whole batch operatorOptimizationBatch. However, running the whole batch could be very time consuming. I would suggest to add a new parameter for introducing the time bound limit for each batch.

cc @maryannxue WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Do you think its a good time to revisit Natt's PR to convert subquery expressions to Joins early in the optimization process ? Perhaps then we can take advantage of all the subsequent rules firing after the subquery rewrite ?

Copy link
Contributor

@maryannxue maryannxue Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I think @dilipbiswal's suggestion is the right way to go. If you think of this subquery rewriting as another kind of de-correlation, it should be a pre-optimization rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. That sounds also good to me. @dilipbiswal Could you take the PR #17520 over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Sure Sean.. Let me give it a try.

RewritePredicateSubquery,
ColumnPruning,
InferFiltersFromConstraints,
PushDownPredicates,
CollapseProject,
CombineFilters,
PruneFilters,
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{IsNull, ListQuery, Not}
import org.apache.spark.sql.catalyst.expressions.{IsNotNull, IsNull, ListQuery, Not}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.internal.SQLConf


class RewriteSubquerySuite extends PlanTest {
Expand All @@ -33,23 +34,52 @@ class RewriteSubquerySuite extends PlanTest {
Batch("Rewrite Subquery", FixedPoint(1),
RewritePredicateSubquery,
ColumnPruning,
InferFiltersFromConstraints,
PushDownPredicates,
CollapseProject,
CombineFilters,
RemoveNoopOperators) :: Nil
}

val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)

test("Column pruning after rewriting predicate subquery") {
val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to modify this existing test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, spark.sql.constraintPropagation.enabled=false to test ColumnPruning.
spark.sql.constraintPropagation.enabled=true to test ColumnPruning, InferFiltersFromConstraints and PushDownPredicate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks.

val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val optimized = Optimize.execute(query.analyze)

val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
.analyze

comparePlans(optimized, correctAnswer)
}
}

test("Infer filters and push down predicate after rewriting predicate subquery") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need the column pruning in the test title?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making the test title simple, then leaving comments about what's tested clearly here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about refactor these test to:

  val relation = LocalRelation('a.int, 'b.int)
  val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)

  test("Column pruning") {
    withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
      val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)

      val optimized = Optimize.execute(query.analyze)
      val correctAnswer = relation
        .select('a)
        .join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
        .analyze

      comparePlans(optimized, correctAnswer)
    }
  }

  test("Column pruning, infer filters and push down predicate") {
    withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
      val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)

      val optimized = Optimize.execute(query.analyze)
      val correctAnswer = relation
        .where(IsNotNull('a)).select('a)
        .join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x))
        .analyze

      comparePlans(optimized, correctAnswer)
    }
  }

withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val optimized = Optimize.execute(query.analyze)

val correctAnswer = relation
.where(IsNotNull('a)).select('a)
.join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x))
.analyze

val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
comparePlans(optimized, correctAnswer)
}
}

test("combine filters after rewriting predicate subquery") {
val query = relation.where('a.in(ListQuery(relInSubquery.select('x).where('y > 1)))).select('a)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
.analyze

comparePlans(optimized, correctAnswer)
val correctAnswer = relation
.where(IsNotNull('a)).select('a)
.join(relInSubquery.where(IsNotNull('x) && IsNotNull('y) && 'y > 1).select('x),
LeftSemi, Some('a === 'x))
}

test("NOT-IN subquery nested inside OR") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ TakeOrderedAndProject (58)
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk)]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk), IsNotNull(c_customer_sk)]
ReadSchema: struct<c_customer_sk:int,c_current_cdemo_sk:int,c_current_addr_sk:int>

(2) ColumnarToRow [codegen id : 1]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]

(3) Filter [codegen id : 1]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Condition : (isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4))
Condition : ((isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4)) AND isnotnull(c_customer_sk#3))

(4) Exchange
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Expand All @@ -85,29 +85,29 @@ Arguments: [c_customer_sk#3 ASC NULLS FIRST], false, 0
Output [2]: [ss_sold_date_sk#7, ss_customer_sk#8]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_sold_date_sk)]
PushedFilters: [IsNotNull(ss_sold_date_sk), IsNotNull(ss_customer_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>

(7) ColumnarToRow [codegen id : 4]
Input [2]: [ss_sold_date_sk#7, ss_customer_sk#8]

(8) Filter [codegen id : 4]
Input [2]: [ss_sold_date_sk#7, ss_customer_sk#8]
Condition : isnotnull(ss_sold_date_sk#7)
Condition : (isnotnull(ss_sold_date_sk#7) AND isnotnull(ss_customer_sk#8))

(9) Scan parquet default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy), EqualTo(d_year,2002), GreaterThanOrEqual(d_moy,1), LessThanOrEqual(d_moy,4), IsNotNull(d_date_sk)]
PushedFilters: [IsNotNull(d_moy), IsNotNull(d_year), EqualTo(d_year,2002), GreaterThanOrEqual(d_moy,1), LessThanOrEqual(d_moy,4), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int,d_moy:int>

(10) ColumnarToRow [codegen id : 3]
Input [3]: [d_date_sk#9, d_year#10, d_moy#11]

(11) Filter [codegen id : 3]
Input [3]: [d_date_sk#9, d_year#10, d_moy#11]
Condition : (((((isnotnull(d_year#10) AND isnotnull(d_moy#11)) AND (d_year#10 = 2002)) AND (d_moy#11 >= 1)) AND (d_moy#11 <= 4)) AND isnotnull(d_date_sk#9))
Condition : (((((isnotnull(d_moy#11) AND isnotnull(d_year#10)) AND (d_year#10 = 2002)) AND (d_moy#11 >= 1)) AND (d_moy#11 <= 4)) AND isnotnull(d_date_sk#9))

(12) Project [codegen id : 3]
Output [1]: [d_date_sk#9]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
InputAdapter
Exchange [c_customer_sk] #3
WholeStageCodegen (1)
Filter [c_current_addr_sk,c_current_cdemo_sk]
Filter [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.customer [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
Expand All @@ -37,7 +37,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
WholeStageCodegen (4)
Project [ss_customer_sk]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Filter [ss_customer_sk,ss_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ TakeOrderedAndProject (50)
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk)]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk), IsNotNull(c_customer_sk)]
ReadSchema: struct<c_customer_sk:int,c_current_cdemo_sk:int,c_current_addr_sk:int>

(2) ColumnarToRow [codegen id : 9]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]

(3) Filter [codegen id : 9]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Condition : (isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4))
Condition : ((isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4)) AND isnotnull(c_customer_sk#3))

(4) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#6, ss_customer_sk#7]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_sold_date_sk)]
PushedFilters: [IsNotNull(ss_sold_date_sk), IsNotNull(ss_customer_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>

(5) ColumnarToRow [codegen id : 2]
Input [2]: [ss_sold_date_sk#6, ss_customer_sk#7]

(6) Filter [codegen id : 2]
Input [2]: [ss_sold_date_sk#6, ss_customer_sk#7]
Condition : isnotnull(ss_sold_date_sk#6)
Condition : (isnotnull(ss_sold_date_sk#6) AND isnotnull(ss_customer_sk#7))

(7) Scan parquet default.date_dim
Output [3]: [d_date_sk#8, d_year#9, d_moy#10]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
BroadcastHashJoin [c_customer_sk,cs_ship_customer_sk]
BroadcastHashJoin [c_customer_sk,ws_bill_customer_sk]
BroadcastHashJoin [c_customer_sk,ss_customer_sk]
Filter [c_current_addr_sk,c_current_cdemo_sk]
Filter [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.customer [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
Expand All @@ -23,7 +23,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
WholeStageCodegen (2)
Project [ss_customer_sk]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Filter [ss_customer_sk,ss_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ Arguments: [ss_item_sk#2 ASC NULLS FIRST], false, 0
Output [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id)]
PushedFilters: [IsNotNull(i_brand_id), IsNotNull(i_class_id), IsNotNull(i_category_id), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(7) ColumnarToRow [codegen id : 17]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]

(8) Filter [codegen id : 17]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Condition : ((isnotnull(i_class_id#8) AND isnotnull(i_brand_id#7)) AND isnotnull(i_category_id#9))
Condition : (((isnotnull(i_brand_id#7) AND isnotnull(i_class_id#8)) AND isnotnull(i_category_id#9)) AND isnotnull(i_item_sk#6))

(9) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#1, ss_item_sk#2]
Expand Down Expand Up @@ -217,15 +217,15 @@ Input [3]: [ss_sold_date_sk#1, ss_item_sk#2, d_date_sk#10]
Output [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
PushedFilters: [IsNotNull(i_item_sk), IsNotNull(i_category_id), IsNotNull(i_brand_id), IsNotNull(i_class_id)]
PushedFilters: [IsNotNull(i_item_sk), IsNotNull(i_brand_id), IsNotNull(i_class_id), IsNotNull(i_category_id)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(20) ColumnarToRow [codegen id : 4]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]

(21) Filter [codegen id : 4]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Condition : (((isnotnull(i_item_sk#6) AND isnotnull(i_category_id#9)) AND isnotnull(i_brand_id#7)) AND isnotnull(i_class_id#8))
Condition : (((isnotnull(i_item_sk#6) AND isnotnull(i_brand_id#7)) AND isnotnull(i_class_id#8)) AND isnotnull(i_category_id#9))

(22) BroadcastExchange
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TakeOrderedAndProject [channel,i_brand_id,i_category_id,i_class_id,sum(number_sa
WholeStageCodegen (17)
Project [i_item_sk]
BroadcastHashJoin [brand_id,category_id,class_id,i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id,i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_brand_id,i_category_id,i_class_id,i_item_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ Condition : (isnotnull(ss_item_sk#2) AND isnotnull(ss_sold_date_sk#1))
Output [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]
Batched: true
Location [not included in comparison]/{warehouse_dir}/item]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id)]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(5) ColumnarToRow [codegen id : 11]
Input [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]

(6) Filter [codegen id : 11]
Input [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]
Condition : ((isnotnull(i_class_id#7) AND isnotnull(i_brand_id#6)) AND isnotnull(i_category_id#8))
Condition : (((isnotnull(i_class_id#7) AND isnotnull(i_brand_id#6)) AND isnotnull(i_category_id#8)) AND isnotnull(i_item_sk#5))

(7) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#1, ss_item_sk#2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TakeOrderedAndProject [channel,i_brand_id,i_category_id,i_class_id,sum(number_sa
WholeStageCodegen (11)
Project [i_item_sk]
BroadcastHashJoin [brand_id,category_id,class_id,i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id,i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_brand_id,i_category_id,i_class_id,i_item_sk]
Expand Down
Loading