Skip to content

[SPARK-24402] [SQL] Optimize In expression when only one element in the collection or collection is empty #21442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

dbtsai
Copy link
Member

@dbtsai dbtsai commented May 28, 2018

What changes were proposed in this pull request?

Two new rules in the logical plan optimizers are added.

  1. When there is only one element in the Collection, the
    physical plan will be optimized to EqualTo, so predicate
    pushdown can be used.
    profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true)
    """
      |== Physical Plan ==
      |*(1) Project [profileID#0]
      |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
      |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet,
      |     PartitionFilters: [],
      |     PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
      |     ReadSchema: struct<profileID:int>
    """.stripMargin
  1. When the Collection is empty, and the input is nullable, the
    logical plan will be simplified to
    profileDF.filter( $"profileID".isInCollection(Set())).explain(true)
    """
      |== Optimized Logical Plan ==
      |Filter if (isnull(profileID#0)) null else false
      |+- Relation[profileID#0] parquet
    """.stripMargin

TODO:

  1. For multiple conditions with numbers less than certain thresholds,
    we should still allow predicate pushdown.
  2. Optimize the In using tableswitch or lookupswitch
    when the numbers of the categories are low, and they are Int,
    Long.
  3. The default immutable hash trees set is slow for query, and we
    should do benchmark for using different set implementation for faster
    query.
  4. filter(if (condition) null else false) can be optimized to false.

How was this patch tested?

Couple new tests are added.

@dbtsai
Copy link
Member Author

dbtsai commented May 28, 2018

+@cloud-fan

@SparkQA
Copy link

SparkQA commented May 28, 2018

Test build #91218 has finished for PR 21442 at commit 449613a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -191,4 +206,21 @@ class OptimizeInSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: In empty list gets transformed to " +
"If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) when value is nullable") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you make the title simpler?

@maropu
Copy link
Member

maropu commented May 28, 2018

Thanks for the work. One question; do we have actual performance changes with/without this pr?

// When v is not nullable, the following expression will be optimized
// to FalseLiteral which is tested in OptimizeInSuite.scala
If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
case In(v, Seq(elem @ Literal(_, _))) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved inside case expr @ In(v, list) if expr.inSetConvertible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you suggested, I'll move into case expr @ In(v, list) if expr.inSetConvertible.

Yes, I saw the test failure. The same code was passing the build in my another PR. 1332406 I will debug it tomorrow.

Thanks,

@dbtsai
Copy link
Member Author

dbtsai commented May 29, 2018

@maropu I didn't do a full performance benchmark. I believe the performance gain can be from predicate pushdown when only one element in the set. This can be a lot.

I forgot which one, I remember in impala or drill, they allow multiple predicates to be pushed down, and I believe this can be a win in many cases.

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #91243 has finished for PR 21442 at commit 7a354fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case expr @ In(v, list) if expr.inSetConvertible =>
val newList = ExpressionSet(list).toSeq
if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
if (newList.length == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When list.length == 1, we don't need to create ExpressionSet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too minor, I'd like to keep the current code and not break the code flow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #91258 has finished for PR 21442 at commit 7a354fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
if (newList.length == 1) {
EqualTo(v, newList.head)
} else if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: size => length
because we use length in the previous if

if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
if (newList.length == 1) {
EqualTo(v, newList.head)
} else if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
val hSet = newList.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
} else if (newList.size < list.size) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: In line 235 the comment
// newList.length == list.length
can be updated as
// newList.length == list.length && newList.length > 1

case expr @ In(v, list) if expr.inSetConvertible =>
val newList = ExpressionSet(list).toSeq
if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
if (newList.length == 1) {
EqualTo(v, newList.head)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail, since the schema mismatches when the data type is struct. The test cases were added a few days ago. #21425

@HyukjinKwon
Copy link
Member

retest this please

@gatorsmile
Copy link
Member

gatorsmile commented Jul 16, 2018

@HyukjinKwon The code has a bug. Please read the discussion before you trigger the test.

@HyukjinKwon
Copy link
Member

@gatorsmile, can I just see if it the build passes against the latest build?

@gatorsmile
Copy link
Member

@HyukjinKwon All the involved reviewers will get a ping. This is annoying to see many pings within one hour, right? My suggestion is to read the comments before triggering the test

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 16, 2018

This should get updated, right? I should give a ping here anyway. I wonder why triggering retesting so matters for some PRs.

Probably, you are mostly talking about "ok to test" that I left for the old PRs which gave pings for guys. In this case, I think the root cause is that we started to block the Jenkins tests for some old PRs for the reason I don't know. This already gives a lot of pings though so far. If the author is willing to update its PR, then it's a blocker again to ask Jenkins build.

@HyukjinKwon
Copy link
Member

Also, see the stale PRs and see what we have delayed to take a look. Please consider partly this is what we should have checked and reviewed earlier, and/or partly authors haven't updated their PRs. Both cases need pings, right? This is what we should do but we couldn't. Mostly these were inactive for more than a month.

@gatorsmile
Copy link
Member

@HyukjinKwon Thank you for trying to trigger the tests but it will not help if you got many pings within one hour. To save the resource, we need to do more investigation before blindly triggering the test.

@HyukjinKwon
Copy link
Member

I am not blindly triggering the test. I skimmed and only re-triggered tests some PRs while I am skimming stale PRs.

For "ok to test", I haven't also blindly re-triggered. I did where any committer initially triggered or I see some values while skimming stale PRs.

Getting pings is annoying of course but the root cause is we didn't check the PRs so far diligently, or authors were being inactive.

@gatorsmile
Copy link
Member

gatorsmile commented Jul 16, 2018

If you want to help this, we should ping the reviewers/committers based on the priority of these PRs. Also, we should not trigger too many pings within a short period. Please reduce the size of the batch to 10s per day?

We need to seriously consider which new features we really want to introduce in the upcoming release. We only have 15 days left before the code freeze of Spark 2.4.

@HyukjinKwon
Copy link
Member

Of course, the time and priority matter but there are pending PRs queued up due to the time and priority matter so far.

Shall we check the PRs and see if there are important ones for Spark 2.4 before the branch is cut out since the release is being close? Otherwise, most of them will probably be missed by the time and priority matter again.

I don't think I have done ping and retriggering things for roughly about the last year in such batch.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 16, 2018

Also, the root cause of getting a lot of pings is that we somehow started to block the Jenkins build for some old PRs. Probably I missed some threads in dev mailing list but I still don't know who and why started this.

In a way to be honest, I kind of felt getting annoyed by this too somehow probably by the similar reason you gave, even though I haven't expressed this so far for my reasons above.

@gatorsmile
Copy link
Member

gatorsmile commented Jul 16, 2018

@HyukjinKwon Currently, in my opinion, the highest priority PRs include Parquet nested column pruning (https://github.com/apache/spark/pull/21320/files), new built-in avro, high-order functions, data correctness issue of Shuffle+Repartition on RDD, gang scheduling, the tasks of native K8S support, and so on. If you have bandwidth, please also help the reviews in these PRs.

@cloud-fan
Copy link
Contributor

@dbtsai I think this PR can be unblocked if we only turn In to EqualTo if the list is not a ListQuery. We can work around the type mismatch problem for now.

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93076 has finished for PR 21442 at commit 7a354fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dbtsai
Copy link
Member Author

dbtsai commented Jul 16, 2018

@HyukjinKwon thanks for bringing this to my attention. @gatorsmile I thought the bug is found by this PR, and not in this PR. This PR is blocked until SPARK-24443 is addressed. I'll unblocck this PR by turning In to EqualTo if the list is not a ListQuery suggested by @cloud-fan . Thanks all.

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93130 has finished for PR 21442 at commit 5079833.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93136 has finished for PR 21442 at commit 9174a30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 16, 2018

retest this please.

@gatorsmile
Copy link
Member

LGTM. The test failure is not related to this PR.

Thanks! Merged to master.

@asfgit asfgit closed this in 0f0d186 Jul 16, 2018
@SparkQA
Copy link

SparkQA commented Jul 17, 2018

Test build #93143 has finished for PR 21442 at commit 9174a30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 17, 2018

Seems like a related test failure? cc @dbtsai

@HyukjinKwon
Copy link
Member

This hasn't get any test pass even once. The test is broken by this commit:

Before:

$ build/sbt "~sql/test-only *SQLQueryTestSuite -- -z not-in-unit-tests-multi-column-literal.sql"
...
[info] SQLQueryTestSuite:
[info] - subquery/in-subquery/not-in-unit-tests-multi-column-literal.sql (4 seconds, 602 milliseconds)
...
[info] All tests passed.

After:

$ build/sbt "~sql/test-only *SQLQueryTestSuite -- -z not-in-unit-tests-multi-column-literal.sql"
...
[info] SQLQueryTestSuite:
[info] - subquery/in-subquery/not-in-unit-tests-multi-column-literal.sql *** FAILED *** (4 seconds, 388 milliseconds)
[info]   Expected "struct<[a:int,b:decimal(2,1)]>", but got "struct<[]>" Schema did not match for query #1
[info]   -- Case 5
[info]     -- (one null column with no match -> row is returned)
[info]   SELECT *
[info]   FROM   m
[info]   WHERE  b = 1.0 -- Matches (null, 1.0)
[info]          AND (a, b) NOT IN ((2, 3.0)) (SQLQueryTestSuite.scala:246)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
[info]   at org.scalatest.Assertions$class.assertResult(Assertions.scala:1003)
...
[error] Failed: Total 1, Failed 1, Errors 0, Passed 0
[error] Failed tests:
[error] 	org.apache.spark.sql.SQLQueryTestSuite

Reverting this since this will probably break the build and this PR hasn't got any successful build in any event.

@cloud-fan
Copy link
Contributor

@dbtsai can you open a new PR? thanks!

@dbtsai
Copy link
Member Author

dbtsai commented Jul 17, 2018

I opened a new PR at https://github.com/apache/spark/pull/21797/files Will work on the test issue there. Thanks.

dongjoon-hyun added a commit that referenced this pull request Mar 4, 2019
… dates

## What changes were proposed in this pull request?

This PR optimizes `InSet` expressions for byte, short, integer, date types. It is a follow-up on PR #21442 from dbtsai.

`In` expressions are compiled into a sequence of if-else statements, which results in O\(n\) time complexity. `InSet` is an optimized version of `In`, which is supposed to improve the performance if all values are literals and the number of elements is big enough. However, `InSet` actually worsens the performance in many cases due to various reasons.

The main idea of this PR is to use Java `switch` statements to significantly improve the performance of `InSet` expressions for bytes, shorts, ints, dates. All `switch` statements are compiled into `tableswitch` and `lookupswitch` bytecode instructions. We will have O\(1\) time complexity if our case values are compact and `tableswitch` can be used. Otherwise, `lookupswitch` will give us O\(log n\).

Locally, I tried Spark `OpenHashSet` and primitive collections from `fastutils` in order to solve the boxing issue in `InSet`. Both options significantly decreased the memory consumption and `fastutils` improved the time compared to `HashSet` from Scala. However, the switch-based approach was still more than two times faster even on 500+ non-compact elements.

I also noticed that applying the switch-based approach on less than 10 elements gives a relatively minor improvement compared to the if-else approach. Therefore, I placed the switch-based logic into `InSet` and added a new config to track when it is applied. Even if we migrate to primitive collections at some point, the switch logic will be still faster unless the number of elements is really big. Another option is to have a separate `InSwitch` expression. However, this would mean we need to modify other places (e.g., `DataSourceStrategy`).

See [here](https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10) and [here](https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch) for more information.

This PR does not cover long values as Java `switch` statements cannot be used on them. However, we can have a follow-up PR with an approach similar to binary search.

## How was this patch tested?

There are new tests that verify the logic of the proposed optimization.

The performance was evaluated using existing benchmarks. This PR was also tested on an EC2 instance (OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 4.14.77-70.59.amzn1.x86_64, Intel(R) Xeon(R) CPU E5-2686 v4  2.30GHz).

## Notes

- [This link](http://hg.openjdk.java.net/jdk8/jdk8/langtools/file/30db5e0aaf83/src/share/classes/com/sun/tools/javac/jvm/Gen.java#l1153) contains source code that decides between `tableswitch` and `lookupswitch`. The logic was re-used in the benchmarks. See the `isLookupSwitch` method.

Closes #23171 from aokolnychyi/spark-26205.

Lead-authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants