-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-21783][SQL] Turn on ORC filter push-down by default #20265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan , @gatorsmile . |
Test build #86116 has finished for PR 20265 at commit
|
@@ -483,6 +484,64 @@ object OrcReadBenchmark { | |||
} | |||
} | |||
|
|||
def filterPushDownBenchmark(values: Int, width: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter push-down depends on various properties of data and predicates. This is just an example of filter push down performance in order to show some benefits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you seen any workload that predicate pushdown could be slower?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically, useless predicates (selectivity 100%) only adds additional computation for both Parquet/ORC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test case for useless predicates too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, do you expect there will be much difference in some cases?
In general most common cases
, it will be slightly slower as we can expect easily.
withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { | ||
import spark.implicits._ | ||
val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") | ||
val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using some simple predicate? Like col > 5
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it important for this config PR?
Let’s focus on the original purpose of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of the best case for PPD, as the data is sorted. I'm fine with it, but let's add some more cases, at least ==
and >
. We should follow other benchmarks in this file to make it completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, @cloud-fan and @gatorsmile .
The best case for PPD is Spark needs to do lots of processing on the returned rows but ORC reader only returns one stripe with minimal CPU code.
So, I designed this benchmark in order to show the difference clearly.
- The push-downed predicate is only
uniqueID = 0
(minimal). We can change that intouniqueID ==
oruniqueID >
. LIKE
predicate is chosed because it's not pushed down and makes Spark do more processing. It's just one of the example of that kind of operation. You can ignore thoses predicates. We can choose some UDFs instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean LIKE '%not%exist%'
will not be optimized by LikeSimplification
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I missed the uniqueID
part. So the like
operation is just to make the difference larger? We don't need to do this, just a simple predicate like col = 1
or col < 1
, to show normally how much PPD improves performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this benchmark is not to show the best case of PPD. We just want to see the perf difference of the most common cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. @cloud-fan and @gatorsmile .
For the most common cases, I also wondered that for Parquet, too.
|
||
Filter Pushdown: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
Native ORC MR (Pushdown=false) 16169 / 16193 0.3 3084.0 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's focus on PPD for this benchmark and not disable vectorized reader. e.g.
col < 3
col < 3 (Pushdown)
col = 3
col = 3 (Pushdown)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I see. Focusing on PPD on the best reader.
Hi, @cloud-fan and @gatorsmile .
|
Filter Pushdown (id > -1): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
Parquet Vectorized 8346 / 8516 0.1 7959.8 1.0X | ||
Parquet Vectorized (Pushdown) 8611 / 8630 0.1 8212.4 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile . This shows the case you asked. It happens here for Parquet and happens in Line 169 for ORC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work! The benchmark suite is pretty useful.
|
||
def main(args: Array[String]): Unit = { | ||
// Positive cases: Select one or no rows | ||
Seq("id = 0", "id == 0", "id <= 0", "id < 1", "id IS NULL").foreach { expr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"id == 0", "id <= 0", "id < 1"
-> "id <= 1024 * 500", "id < 1024 * 500", "id > 1024 * 499 and id < 1024 * 500"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can use a 10% selectivity predicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to embrace both requests.
def main(args: Array[String]): Unit = { | ||
// Positive cases: Select one or no rows | ||
Seq("id = 0", "id == 0", "id <= 0", "id < 1", "id IS NULL").foreach { expr => | ||
filterPushDownBenchmark(1024 * 1024 * 1, 20, expr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1024 * 1024 * 1
-> 1024 * 1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
} | ||
|
||
def main(args: Array[String]): Unit = { | ||
// Positive cases: Select one or no rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the positive case to multiple, as suggested above. We need to see the perf for different predicate types.
df.createOrReplaceTempView("t1") | ||
prepareTable(dir, spark.sql("SELECT * FROM t1")) | ||
|
||
Seq(false, true).foreach { value => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value
-> pushDownEnabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
// Set default configs. Individual cases will change them if necessary. | ||
spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") | ||
spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine, then we don't need to care about what the default value is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah we don't need it, we always set them in the benchmark cases.
filterPushDownBenchmark(1024 * 1024 * 1, 20, expr) | ||
} | ||
|
||
// Negative cases: Select all rows which means the predicate is always true. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a negative case, conceptually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe good cases
vs bad cases
?
Filter Pushdown (id != -1): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
Parquet Vectorized 8088 / 8297 0.1 7713.2 1.0X | ||
Parquet Vectorized (Pushdown) 7110 / 8674 0.1 6780.8 1.1X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference between the best and the avg is big. We need to increase minNumIters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll increase from 2 to 5.
Parquet Vectorized 2267 / 2287 0.5 2162.0 1.0X | ||
Parquet Vectorized (Pushdown) 735 / 803 1.4 701.1 3.1X | ||
Native ORC Vectorized 1708 / 1718 0.6 1629.1 1.3X | ||
Native ORC Vectorized (Pushdown) 83 / 88 12.7 79.0 27.4X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is amazing, any idea why ORC is so much faster than parquet in this case?
Native ORC Vectorized 1708 / 1718 0.6 1629.1 1.3X | ||
Native ORC Vectorized (Pushdown) 83 / 88 12.7 79.0 27.4X | ||
|
||
Filter Pushdown (id == 0): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between id = 0
and id == 0
? Do you want id <=> 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Yes. <=>
.
I think we need to make sure parquet row group size and orc strip size is same, to make this benchmark fair. |
Test build #86143 has finished for PR 20265 at commit
|
I'll update the PR tomorrow. |
I updated the PR (except one RowGroupSize/OrcStripeSize part). |
Test build #86200 has finished for PR 20265 at commit
|
|
||
def main(args: Array[String]): Unit = { | ||
val numRows = 1024 * 1024 | ||
val width = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 20
is not a common table width, how about 5
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem. I'll set to 5.
LGTM except one comment. Let's worry about row group/stripe size later, since both parquet and orc use default settings, I think it's still fair. |
} | ||
|
||
def main(args: Array[String]): Unit = { | ||
val numRows = 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we increase the number of rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid the resulting parquet/orc file is too small to benchmark PPD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll increase to 1024 * 1024 * 15
.
filterPushDownBenchmark(numRows, title, whereExpr) | ||
} | ||
|
||
val selectExpr = (1 to width).map(i => s"LENGTH(c$i)").mkString("SUM(", "+", ")") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the data is increased, I used SUM(LENGTH(c1)+...)
instead of *
for the following cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe simply max(c1), max(c2), ...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I'll update and rerun.
ORC performs further better when the number of columns is small. Maybe also add test cases back to show this observations? |
@gatorsmile . The number of rows are also changed. Why do you think so? |
There might be many questions about ORC (or Parquet) performance benchmarks. We can do that later. We cannot enumerate all cases at one time. Also, users can do that for their own workload. In fact, Apache Spark didn't show this kind of benchmark when it turned on PPD for Parquet. If there is a benchmark for Parquet, this PR will be a piece of cake. I think this PR is enough to show the benefit of ORC PPD for enabling the config. |
Test build #86251 has finished for PR 20265 at commit
|
Test build #86257 has finished for PR 20265 at commit
|
thanks, merging to master/2.3! |
## What changes were proposed in this pull request? ORC filter push-down is disabled by default from the beginning, [SPARK-2883](aa31e43#diff-41ef65b9ef5b518f77e2a03559893f4dR149 ). Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet ([SPARK-9207](https://issues.apache.org/jira/browse/SPARK-21783)) as a part of [SPARK-20901](https://issues.apache.org/jira/browse/SPARK-20901), "Feature parity for ORC with Parquet". ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #20265 from dongjoon-hyun/SPARK-21783. (cherry picked from commit 0f8a286) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thank you so much, @cloud-fan and @gatorsmile ! |
What changes were proposed in this pull request?
ORC filter push-down is disabled by default from the beginning, SPARK-2883.
Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet (SPARK-9207) as a part of SPARK-20901, "Feature parity for ORC with Parquet".
How was this patch tested?
Pass the existing tests.