Skip to content

[SPARK-27033][SQL]Add Optimize rule RewriteArithmeticFiltersOnIntegralColumn #23942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

WangGuangxin
Copy link
Contributor

@WangGuangxin WangGuangxin commented Mar 3, 2019

What changes were proposed in this pull request?

Currently, filters like select * from table where a + 1 = 3 cannot be pushed down, this optimizer can convert it to select * from table where a = 3 - 1, and then optimized to select * from table where a = 2 by other optimizer, so that it can be pushed down to parquet or other file format.

The comparison supports =, !=. The operation supports Add and Subtract. It only supports integral-type (Byte, Short,INT and LONG), it doesn't support FLOAT/DOUBLE for precision issues.

How was this patch tested?

Unit test by RewriteArithmeticFiltersOnIntegralColumnSuite.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @WangGuangxin . Thank you for your first contribution.

  • First of all, could you create another file for this optimizer?
  • Second, could you rename this optimizer name to more specific one? TransformBinaryComparison looks too broad claim to me because this optimizer only aims +/- on Int/Long. It cannot handle * and / and other many data types.

@WangGuangxin WangGuangxin changed the title [SPARK-27033][SQL]Add Optimize rule TransformBinaryComparison [SPARK-27033][SQL]Add Optimize rule RewriteArithmeticFiltersOnIntOrLongColumn Mar 4, 2019
@WangGuangxin
Copy link
Contributor Author

Hi, @WangGuangxin . Thank you for your first contribution.

  • First of all, could you create another file for this optimizer?
  • Second, could you rename this optimizer name to more specific one? TransformBinaryComparison looks too broad claim to me because this optimizer only aims +/- on Int/Long. It cannot handle * and / and other many data types.

I renamed it to RewriteArithmeticFiltersOnIntOrLongColumn and put it into a single file.

@@ -128,7 +128,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
RemoveRedundantAliases,
RemoveNoopOperators,
SimplifyExtractValueOps,
CombineConcats) ++
CombineConcats,
RewriteArithmeticFiltersOnIntOrLongColumn) ++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to put this rule just before ConstantFolding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I've put it before ConstantFolding

@maropu
Copy link
Member

maropu commented Mar 4, 2019

ok to test

@SparkQA
Copy link

SparkQA commented Mar 4, 2019

Test build #102980 has finished for PR 23942 at commit aaf2b8a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2019

Test build #102994 has finished for PR 23942 at commit 82ff2a1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WangGuangxin
Copy link
Contributor Author

Test build #102994 has finished for PR 23942 at commit 82ff2a1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

This fails the test org.apache.spark.sql.hive.OptimizeHiveMetadataOnlyQuerySuite.SPARK-23877: filter on projected expression becasue it supposes part + 1 < 5 will not be pushed down, which is just what this PR does. cc @rdblue for confirmation.

@dongjoon-hyun
Copy link
Member

Please update SPARK-23877: filter on projected expression to use other expressions, @WangGuangxin .

@SparkQA
Copy link

SparkQA commented Mar 5, 2019

Test build #103016 has finished for PR 23942 at commit 1fe9a41.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 5, 2019

Test build #103024 has finished for PR 23942 at commit 597d6d7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

* SELECT * FROM table WHERE i = 2
* }}}
*/
object RewriteArithmeticFiltersOnIntOrLongColumn extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this restrictive to Filter only? Looks like it rewrites all qualified expressions in all logical plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is filter only. I've changed it to only work on Filter

@SparkQA
Copy link

SparkQA commented Mar 5, 2019

Test build #103042 has finished for PR 23942 at commit 03c522e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WangGuangxin
Copy link
Contributor Author

Please update SPARK-23877: filter on projected expression to use other expressions, @WangGuangxin .

Done. Update two related unit tests.

assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
// SPARK-27033: Add Optimize rule RewriteArithmeticFiltersOnIntOrLongColumn
assert(checkPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
assert(checkPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "!=" also supported? The PR description only mentions "=, >=, <=, >, <".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, updated the PR

* {{{
* SELECT * FROM table WHERE i = 2
* }}}
*/
Copy link
Contributor

@SongYadong SongYadong Mar 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be good to doc supported comparison operators here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice. I have updated the comments here

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f: Filter =>
f transformExpressionsUp {
case e @ BinaryComparison(left: BinaryArithmetic, right: Literal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about checking if it is foldable instead of a Literal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, foldable is better to accelerate convergence, I'll change it

@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103090 has finished for PR 23942 at commit 0f61953.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

private def isDataTypeSafe(dataType: DataType): Boolean = dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only integer and longs are accepted?

Copy link
Contributor Author

@WangGuangxin WangGuangxin Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Float and Double has precision issues. For example, a + 3.2 < 4.0 will be convert to a < 0.7999999999999998.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the other integral types, e.g., short?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, integral types(byte, short, int, long) are all ok. I'll add support for byte and short type as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the precision issue would be there anyway, when executed at runtime, am I wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the precision issue would be there anyway, when executed at runtime, am I wrong?

I have a simple test on a table with a Double type column a, it has two records: 0.7999999999999998 and 0.8
image

with a + 3.2 = 4.0, it returns both two records. But if we optimized it to a = 0.7999999999999998, the result will be wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example:

scala> spark.sql("select float(1E-8) + float(1E+10) <= float(1E+10)").show()
+----------------------------------------------------------------------+
|((CAST(1E-8 AS FLOAT) + CAST(1E+10 AS FLOAT)) <= CAST(1E+10 AS FLOAT))|
+----------------------------------------------------------------------+
|                                                                  true|
+----------------------------------------------------------------------+


scala> spark.sql("select float(1E-8) <= float(1E+10) - float(1E+10)").show()
+----------------------------------------------------------------------+
|(CAST(1E-8 AS FLOAT) <= (CAST(1E+10 AS FLOAT) - CAST(1E+10 AS FLOAT)))|
+----------------------------------------------------------------------+
|                                                                 false|
+----------------------------------------------------------------------+

Although float(1E-8) + float(1E+10) <= float(1E+10) should return false. This may lead to inconsistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f: Filter =>
f transformExpressionsUp {
case e @ BinaryComparison(left: BinaryArithmetic, right: Expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to do it also for non-deterministic expressions?

Copy link
Contributor Author

@WangGuangxin WangGuangxin Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foldable is enough because in ConstantFolding it will convert all folding expressions to Literal. And in fact non-deterministic is not foldable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but what if the remaining part of left is non-determistic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a check in transformRight and transformLeft to make sure the other part of BinaryArithmetic is foldable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was thinking about the AttributeReference, but it is always deterministic. So I think it is fine, thanks.

@@ -65,11 +65,11 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto

// verify the matching partitions
val partitions = spark.internalCreateDataFrame(Distinct(Filter(($"x" < 5).expr,
Project(Seq(($"part" + 1).as("x").expr.asInstanceOf[NamedExpression]),
Project(Seq(($"part" * 1).as("x").expr.asInstanceOf[NamedExpression]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because with this optimizer in this PR, part + 1 < 5 will be optimized to 'part < 4' , where part is a partition column, so it only need to fetch 4 partitions instead of 11, so the last assert assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount == 11) will fail.
From the comments in this test, it wants to test the case where verify that the partition predicate was not pushed down to the metastore, so I changed to part * 1, which will not be optimized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the spark.sql.optimizer.excludedRules config instead of changing the existing tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the spark.sql.optimizer.excludedRules config instead of changing the existing tests?

Thanks for your advice. I've make change accordingly.

@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103096 has finished for PR 23942 at commit 5d0a5e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Rewrite arithmetic filters on int or long column to its equivalent form,
* leaving attribute alone in one side, so that we can push it down to
* parquet or other file format.
Copy link
Member

@maropu maropu Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about this?

/**
 * Rewrite arithmetic filters on an integral-type (e.g., int and long) column to its equivalent
 * form, leaving attribute alone in a left side, so that we can push it down to
 * datasources (e.g., Parquet and ORC).
 *
 * For example, this rule can optimize a query as follows:
 * {{{
 *   SELECT * FROM table WHERE i + 3 = 5
 *   ==> SELECT * FROM table WHERE i = 5 - 3
 * }}}
 *
 * Then, the [[ConstantFolding]] rule will further optimize it as follows:
 * {{{
 *   SELECT * FROM table WHERE i = 2
 * }}}
 *
 * Note:
 * 1. This rule supports `Add` and `Subtract` in arithmetic expressions.
 * 2. This rule supports `=`, `>=`, `<=`, `>`, `<`, and `!=` in comparators.
 * 3. This rule supports `INT` and `LONG` types only. It doesn't support `FLOAT` or `DOUBLE`
 *    because of precision issues.
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more clearly. I've updated it.

@WangGuangxin WangGuangxin changed the title [SPARK-27033][SQL]Add Optimize rule RewriteArithmeticFiltersOnIntOrLongColumn [SPARK-27033][SQL]Add Optimize rule RewriteArithmeticFiltersOnIntegralColumn Mar 8, 2019
@SparkQA
Copy link

SparkQA commented Mar 8, 2019

Test build #103189 has finished for PR 23942 at commit 3927dec.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2019

Test build #103197 has finished for PR 23942 at commit 2c02777.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

retest this please

@maropu
Copy link
Member

maropu commented Mar 8, 2019

How do you handle this behaviour change?

// v2.4.0
scala> Seq(0, Int.MaxValue).toDF("v").write.saveAsTable("t")
scala> sql("select * from t").show
+----------+
|         v|
+----------+
|         0|
|2147483647|
+----------+

scala> sql("select * from t where v + 1 > 0").show
+---+
|  v|
+---+
|  0|
+---+

// this pr
scala> sql("select * from t where v + 1 > 0").show
+----------+
|         v|
+----------+
|         0|
|2147483647|
+----------+

}
}

private def isAddSafe[T](left: Any, right: Any, minValue: T, maxValue: T)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the need for this and the next. As of now, we are not handling overflows with integers (you can see #21599 is still open). So I think we can get rid of these checks. It may be worth, though, to add a comment (like a TODO) in order to remind that this issue can arise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With some cases, it doesn't necessarily cause overflow if we don't rewrite it. So there's potential inconsistency again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check here is to make sure if overflow may occur after rewrite, it will not rewrite this expression.

@SparkQA
Copy link

SparkQA commented Mar 8, 2019

Test build #103201 has finished for PR 23942 at commit 2c02777.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WangGuangxin
Copy link
Contributor Author

How do you handle this behaviour change?

// v2.4.0
scala> Seq(0, Int.MaxValue).toDF("v").write.saveAsTable("t")
scala> sql("select * from t").show
+----------+
|         v|
+----------+
|         0|
|2147483647|
+----------+

scala> sql("select * from t where v + 1 > 0").show
+---+
|  v|
+---+
|  0|
+---+

// this pr
scala> sql("select * from t where v + 1 > 0").show
+----------+
|         v|
+----------+
|         0|
|2147483647|
+----------+

This is a bad case I didn't think about it before. I found there are four kinds of cases.

  • v + 1 > 0 => v > -1 and v <= Int.MAX - 1
  • v - 1 > 0 => v > 1 or (v < Int.MIN + 1 && v > 0 - 1 + Int.MIN - Int.MAX )
  • v + 1 < 0 => v < -1 or (v > Int.MAX -1 && v < 0 - 1 + Int.MAX - Int.MIN)
  • v - 1 < 0 => v < 1 and v >= Int.MIN + 1

For one inequality, after rewrite, there may need two or three inequalities, which makes expressions much more complex. So I think it doesn't worth to convert inequality. We may only handle = or != here. What do you think?

@maropu
Copy link
Member

maropu commented Mar 11, 2019

I'm neutral on this, but I feel there is not many queries that this rule could optimize..., WDYT? cc: @cloud-fan

@cloud-fan
Copy link
Contributor

I think it's hard to rewrite the comparison expressions to match the behavior of overflow. What's worse, the overflow behavior may get changed in the future, to follow SQL standard and throw exception when overflow happens.

Only handling equal SGTM.

@WangGuangxin
Copy link
Contributor Author

retest it please

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103542 has finished for PR 23942 at commit 43892ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Rewrite arithmetic filters on an integral-type (e.g., byte, short, int and long)
* column to its equivalent form, leaving attribute alone in a left side, so that
* we can push it down to datasources (e.g., Parquet and ORC).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @liancheng per #8165

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 1, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 1, 2020
@github-actions github-actions bot closed this Jan 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.