Skip to content

[SPARK-12639][SQL] Improve Explain for Datasources with Handled Predicates #10655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed

Conversation

RussellSpitzer
Copy link
Member

SPARK-11661 Makes all predicates pushed down to underlying Datasources
regardless of whether the source can handle them or not. This makes the
explain command slightly confusing as it will always list all filters
whether or not the underlying source can actually use them. Instead
now we should only list those filters which are expressly handled by the
underlying source.

All predicates are pushed down so there really isn't any value in listing them.

In the following example plate_number is a predicate which can be pushed to the underlying source and handled at the source level. mileage and acc cannot be handled in any way by the underlying source.

Previously

scala> df.filter("""plate_number = "OUTTATIME" and mileage > 100""").explain
16/01/08 09:20:05 INFO CassandraSourceRelation: filters: EqualTo(plate_number,OUTTATIME), GreaterThan(mileage,100.0)
16/01/08 09:20:05 INFO CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(plate_number,OUTTATIME))
== Physical Plan ==
Filter (mileage#5 > 100.0)
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7b1d3a05[plate_number#0,time#1,acc#2,lat#3,lng#4,mileage#5]  PushedFilters: [EqualTo(plate_number,OUTTATIME), GreaterThan(mileage,100.0)]

scala> df.filter("""mileage > 100 and acc = 1000""").explain
16/01/08 09:23:39 INFO CassandraSourceRelation: filters: GreaterThan(mileage,100.0)
16/01/08 09:23:39 INFO CassandraSourceRelation: pushdown filters: ArrayBuffer()
== Physical Plan ==
Filter (if (isnull(acc#2)) null else CASE 1000 WHEN 1 THEN acc#2 WHEN 0 THEN NOT acc#2 ELSE false && (mileage#5 > 100.0))
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7b1d3a05[plate_number#0,time#1,acc#2,lat#3,lng#4,mileage#5] PushedFilters: [GreaterThan(mileage,100.0)]

With this patch

scala> df.filter("""plate_number = "OUTTATIME" and mileage > 100""").explain
16/01/08 09:20:05 INFO CassandraSourceRelation: filters: EqualTo(plate_number,OUTTATIME), GreaterThan(mileage,100.0)
16/01/08 09:20:05 INFO CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(plate_number,OUTTATIME))
== Physical Plan ==
Filter (mileage#5 > 100.0)
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7b1d3a05[plate_number#0,time#1,acc#2,lat#3,lng#4,mileage#5] HandledFilters: [(plate_number#0 = OUTTATIME)]

scala> df.filter("""mileage > 100 and acc = 1000""").explain
16/01/08 09:23:39 INFO CassandraSourceRelation: filters: GreaterThan(mileage,100.0)
16/01/08 09:23:39 INFO CassandraSourceRelation: pushdown filters: ArrayBuffer()
== Physical Plan ==
Filter (if (isnull(acc#2)) null else CASE 1000 WHEN 1 THEN acc#2 WHEN 0 THEN NOT acc#2 ELSE false && (mileage#5 > 100.0))
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7b1d3a05[plate_number#0,time#1,acc#2,lat#3,lng#4,mileage#5]

TLDR;
CassandraSource

OLD
org.apache.spark.sql.cassandra.CassandraSourceRelation@7b1d3a05[plate_number#0,time#1,acc#2,lat#3,lng#4,mileage#5]  PushedFilters: [EqualTo(plate_number,OUTTATIME), GreaterThan(mileage,100.0)]

NEW
org.apache.spark.sql.cassandra.CassandraSourceRelation@7b1d3a05[plate_number#0,time#1,acc#2,lat#3,lng#4,mileage#5] HandledFilters: [(plate_number#0 = OUTTATIME)]

Parquet

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.filter("age > 5").explain
OLD
== Physical Plan ==
Filter (age#0L > 5)
+- Scan JSONRelation[age#0L,name#1] InputPaths: people.json, PushedFilters: [GreaterThan(age,5)]

NEW
== Physical Plan ==
Filter (age#6L > 5)
+- Scan JSONRelation[age#6L,name#7] InputPaths: people.json

Orc

case class Contact(name: String, phone: String)
case class Person(name: String, age: Int, contacts: Seq[Contact])
val records = (1 to 100).map { i =>;
   Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", s"phone_$m") })
}
sc.parallelize(records).toDF().write.format("orc").save("people")
val people = sqlContext.read.format("orc").load("people")
people.filter("age > 10").explain
OLD
== Physical Plan ==
Filter (age#10 > 10)
+- Scan OrcRelation[name#9,age#10,contacts#11] InputPaths: people, PushedFilters: [GreaterThan(age,10)]

NEW
== Physical Plan ==
Filter (age#7 > 10)
+- Scan OrcRelation[name#6,age#7,contacts#8] InputPaths: people

SPARK-11661 Makes all predicates pushed down to underlying Datasources
regardless of whether the source can handle them or not. This makes the
explain command slightly confusing as it will always list all filters
whether or not the underlying source can actually use them. Instead
now we should only list those filters which are expressly handled by the
underlying source.
if (pushedFilters.nonEmpty) {
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
if (handledPredicates.nonEmpty) {
pairs += (HANDLED_FILTERS -> handledPredicates.mkString("[", ", ", "]"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also keep pushed filters? For some data source like orc, a pushed filter will be evaluated at a coarse grain level instead of on every rows. I think it is better to keep that information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought 11663 meant all filters are pushed down, regardless so I wondered if that was redundant? It's also still a bit confusing since although it says the filters are "pushed" there is no guarantee that the underlying source will do anything with them at all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry. I think I understand the change now. handledPredicates contains all filters that are pushed to the data source except those filters returned by the unhandledFilters method. I think the change is good and HandledFilters is a proper name.

@rxin
Copy link
Contributor

rxin commented Jan 8, 2016

Can you in the pull request description includes a before/after change?

@RussellSpitzer
Copy link
Member Author

@rxin Added, basically I think the current "PushedFilters" list isn't very valuable if everything is listed there. So instead we should just list those filters which the source can actually do something with. If there is a possibility that a source might do something (bloom flitery) we should have a third category but currently there is no way of knowing (i think)

@rxin
Copy link
Contributor

rxin commented Jan 9, 2016

Thanks @RussellSpitzer.

I will let @yhuai review and merge this. One question, do you know why the filter is "if (isnull(acc#2)) null else CASE 1000 WHEN 1 THEN acc#2 WHEN 0 THEN NOT acc#2 ELSE false"? Seems so complicated for "acc = 1000"

@rxin
Copy link
Contributor

rxin commented Jan 9, 2016

OK I think I figured out why. "acc" is a boolean column.

@yhuai
Copy link
Contributor

yhuai commented Jan 11, 2016

@RussellSpitzer Thank you for the PR! The change looks good. Can you also try ORC and Parquet table and attach the before/after change to the PR description?

@yhuai
Copy link
Contributor

yhuai commented Jan 11, 2016

ok to test

@SparkQA
Copy link

SparkQA commented Jan 11, 2016

Test build #49155 has finished for PR 10655 at commit 42cc0e7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@RussellSpitzer
Copy link
Member Author

@yhuai I removed the PushedFilters and add the other examples. We could read-add the "PushedFilters" if you like. I wasn't sure if you still wanted that. I'm still not sure if it's very valuable info since everything is Pushed if I understood.

@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #49502 has finished for PR 10655 at commit 5a0daf6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jan 16, 2016

@RussellSpitzer Thanks for the change. I have thought about it again. My only concern is that unhandledPredicates actually contains filters that can be handled by the data source. For example, filters in orc are applied to a more coarse grain level. Also, any data source that does not override unhandledFilters method will cause HandledFilters shows nothing, which can cause confusion.

So, how about we still show PUSHED_FILTERS? But, we can add a special character (maybe *) to filters that belong to unhandledPredicates to indicate these filters may not be applied to every row?

@RussellSpitzer
Copy link
Member Author

I personally think the ambiguous PUSHED_FILTERS is more confusing. When we see a predicate there we have no idea whether or not it is a valid filter for the source at all. Like in the C* case this could contain clauses which have no way have being actually pushed down to the source.

In my mind their are 3 Categories of predicates

  • Those which cannot be pushed to the source at all
  • Those which can be pushed to the source but may have false positives
  • Those which can be pushed to the source and filter completely

Currently we can only tell whether or not a predicate is in on of the first two categories or if it is in the third. This leaves is awkwardly stating that the source has had a predicate Pushed to it even when that is impossible. I like just stating the Third category because thats the only thing we truly can be sure of given the current code. It would be better if the underlying source was able to qualify all filters into the above categories.

So to me it is more confusing to say something is Pushed when it technically can't be than to say something is not Pushed when it might be. But ymmv

If you want to just go with Asterisks thats fine with me too just wanted to make my argument :D

@yhuai
Copy link
Contributor

yhuai commented Jan 17, 2016

@RussellSpitzer Thank you for the comment. I totally agree with you.

As mentioned before, my only concern is that for ORC/Parquet, we will not be able to see pushed filters in the explain output after the current change in this PR. As a user of Parquet/ORC, I do want to see that a filter has been pushed down even if this filter will not be applied to every row.

How about we go with Asterisks for now? You may need to keep the expression that maps to a data source filter (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L536) to generate the string.

@RussellSpitzer
Copy link
Member Author

Haven't forgotten this will have a new pr soon :)

@yhuai
Copy link
Contributor

yhuai commented Jan 25, 2016

no problem! Thank you :)

@HyukjinKwon
Copy link
Member

Maybe we might have to correct the title just like the others [SPARK-XXXX][SQL] (this is described in https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark).

@HyukjinKwon
Copy link
Member

ping @RussellSpitzer

@RussellSpitzer
Copy link
Member Author

Sorry I forgot about this, I'll clean this up tomorrow and get it ready

@yhuai
Copy link
Contributor

yhuai commented Sep 12, 2016

test this please

@RussellSpitzer RussellSpitzer changed the title SPARK-12639 SQL Improve Explain for Datasources with Handled Predicates [SPARK-12639] [SQL] Improve Explain for Datasources with Handled Predicates Sep 12, 2016
@RussellSpitzer RussellSpitzer changed the title [SPARK-12639] [SQL] Improve Explain for Datasources with Handled Predicates [SPARK-12639][SQL] Improve Explain for Datasources with Handled Predicates Sep 12, 2016
@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65267 has finished for PR 10655 at commit 5a0daf6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@RussellSpitzer
Copy link
Member Author

We fixed this on a different pr #11317

@RussellSpitzer RussellSpitzer deleted the SPARK-12639 branch September 15, 2016 15:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants