Skip to content

[SPARK-21783][SQL] Turn on ORC filter push-down by default #20265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed

[SPARK-21783][SQL] Turn on ORC filter push-down by default #20265

wants to merge 4 commits into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Jan 14, 2018

What changes were proposed in this pull request?

ORC filter push-down is disabled by default from the beginning, SPARK-2883.

Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet (SPARK-9207) as a part of SPARK-20901, "Feature parity for ORC with Parquet".

How was this patch tested?

Pass the existing tests.

@dongjoon-hyun
Copy link
Member Author

cc @cloud-fan , @gatorsmile .

@SparkQA
Copy link

SparkQA commented Jan 14, 2018

Test build #86116 has finished for PR 20265 at commit dda5bdf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -483,6 +484,64 @@ object OrcReadBenchmark {
}
}

def filterPushDownBenchmark(values: Int, width: Int): Unit = {
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter push-down depends on various properties of data and predicates. This is just an example of filter push down performance in order to show some benefits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen any workload that predicate pushdown could be slower?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, useless predicates (selectivity 100%) only adds additional computation for both Parquet/ORC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for useless predicates too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, do you expect there will be much difference in some cases?
In general most common cases, it will be slightly slower as we can expect easily.

withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
import spark.implicits._
val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using some simple predicate? Like col > 5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important for this config PR?
Let’s focus on the original purpose of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of the best case for PPD, as the data is sorted. I'm fine with it, but let's add some more cases, at least == and >. We should follow other benchmarks in this file to make it completed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, @cloud-fan and @gatorsmile .

The best case for PPD is Spark needs to do lots of processing on the returned rows but ORC reader only returns one stripe with minimal CPU code.

So, I designed this benchmark in order to show the difference clearly.

  1. The push-downed predicate is only uniqueID = 0 (minimal). We can change that into uniqueID == or uniqueID >.
  2. LIKE predicate is chosed because it's not pushed down and makes Spark do more processing. It's just one of the example of that kind of operation. You can ignore thoses predicates. We can choose some UDFs instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean LIKE '%not%exist%' will not be optimized by LikeSimplification.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I missed the uniqueID part. So the like operation is just to make the difference larger? We don't need to do this, just a simple predicate like col = 1 or col < 1, to show normally how much PPD improves performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this benchmark is not to show the best case of PPD. We just want to see the perf difference of the most common cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. @cloud-fan and @gatorsmile .
For the most common cases, I also wondered that for Parquet, too.


Filter Pushdown: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Native ORC MR (Pushdown=false) 16169 / 16193 0.3 3084.0 1.0X
Copy link
Contributor

@cloud-fan cloud-fan Jan 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's focus on PPD for this benchmark and not disable vectorized reader. e.g.

col < 3
col < 3 (Pushdown)
col = 3
col = 3 (Pushdown)
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I see. Focusing on PPD on the best reader.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @gatorsmile .
Your questions are valid for all PPD cases. According to the comments, I added the following expressions (positive and negative) for both ORC/Parquet.

+    // Positive cases: Select one or no rows
+    Seq("id = 0", "id == 0", "id <= 0", "id < 1", "id IS NULL").foreach { expr =>
+      filterPushDownBenchmark(1024 * 1024 * 1, 20, expr)
+    }
+
+    // Negative cases: Select all rows which means the predicate is always true.
+    Seq("id > -1", "id != -1", "id IS NOT NULL").foreach { expr =>
+      filterPushDownBenchmark(1024 * 1024 * 1, 20, expr)
+    }

Filter Pushdown (id > -1): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 8346 / 8516 0.1 7959.8 1.0X
Parquet Vectorized (Pushdown) 8611 / 8630 0.1 8212.4 1.0X
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile . This shows the case you asked. It happens here for Parquet and happens in Line 169 for ORC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work! The benchmark suite is pretty useful.


def main(args: Array[String]): Unit = {
// Positive cases: Select one or no rows
Seq("id = 0", "id == 0", "id <= 0", "id < 1", "id IS NULL").foreach { expr =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"id == 0", "id <= 0", "id < 1" -> "id <= 1024 * 500", "id < 1024 * 500", "id > 1024 * 499 and id < 1024 * 500"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can use a 10% selectivity predicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to embrace both requests.

def main(args: Array[String]): Unit = {
// Positive cases: Select one or no rows
Seq("id = 0", "id == 0", "id <= 0", "id < 1", "id IS NULL").foreach { expr =>
filterPushDownBenchmark(1024 * 1024 * 1, 20, expr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 * 1024 * 1 -> 1024 * 1024

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

}

def main(args: Array[String]): Unit = {
// Positive cases: Select one or no rows
Copy link
Member

@gatorsmile gatorsmile Jan 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the positive case to multiple, as suggested above. We need to see the perf for different predicate types.

df.createOrReplaceTempView("t1")
prepareTable(dir, spark.sql("SELECT * FROM t1"))

Seq(false, true).foreach { value =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value -> pushDownEnabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Set default configs. Individual cases will change them if necessary.
spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, then we don't need to care about what the default value is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah we don't need it, we always set them in the benchmark cases.

filterPushDownBenchmark(1024 * 1024 * 1, 20, expr)
}

// Negative cases: Select all rows which means the predicate is always true.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a negative case, conceptually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe good cases vs bad cases?

Filter Pushdown (id != -1): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Vectorized 8088 / 8297 0.1 7713.2 1.0X
Parquet Vectorized (Pushdown) 7110 / 8674 0.1 6780.8 1.1X
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between the best and the avg is big. We need to increase minNumIters

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll increase from 2 to 5.

Parquet Vectorized 2267 / 2287 0.5 2162.0 1.0X
Parquet Vectorized (Pushdown) 735 / 803 1.4 701.1 3.1X
Native ORC Vectorized 1708 / 1718 0.6 1629.1 1.3X
Native ORC Vectorized (Pushdown) 83 / 88 12.7 79.0 27.4X
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is amazing, any idea why ORC is so much faster than parquet in this case?

Native ORC Vectorized 1708 / 1718 0.6 1629.1 1.3X
Native ORC Vectorized (Pushdown) 83 / 88 12.7 79.0 27.4X

Filter Pushdown (id == 0): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between id = 0 and id == 0? Do you want id <=> 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Yes. <=>.

@cloud-fan
Copy link
Contributor

I think we need to make sure parquet row group size and orc strip size is same, to make this benchmark fair.

@SparkQA
Copy link

SparkQA commented Jan 16, 2018

Test build #86143 has finished for PR 20265 at commit 440f76b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

I'll update the PR tomorrow.

@dongjoon-hyun
Copy link
Member Author

I updated the PR (except one RowGroupSize/OrcStripeSize part).

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86200 has finished for PR 20265 at commit 87af693.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


def main(args: Array[String]): Unit = {
val numRows = 1024 * 1024
val width = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 20 is not a common table width, how about 5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. I'll set to 5.

@cloud-fan
Copy link
Contributor

LGTM except one comment. Let's worry about row group/stripe size later, since both parquet and orc use default settings, I think it's still fair.

}

def main(args: Array[String]): Unit = {
val numRows = 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we increase the number of rows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid the resulting parquet/orc file is too small to benchmark PPD.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll increase to 1024 * 1024 * 15.

filterPushDownBenchmark(numRows, title, whereExpr)
}

val selectExpr = (1 to width).map(i => s"LENGTH(c$i)").mkString("SUM(", "+", ")")
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the data is increased, I used SUM(LENGTH(c1)+...) instead of * for the following cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simply max(c1), max(c2), ...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'll update and rerun.

@gatorsmile
Copy link
Member

ORC performs further better when the number of columns is small. Maybe also add test cases back to show this observations?

@dongjoon-hyun
Copy link
Member Author

@gatorsmile . The number of rows are also changed. Why do you think so?

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Jan 17, 2018

There might be many questions about ORC (or Parquet) performance benchmarks. We can do that later. We cannot enumerate all cases at one time. Also, users can do that for their own workload. In fact, Apache Spark didn't show this kind of benchmark when it turned on PPD for Parquet. If there is a benchmark for Parquet, this PR will be a piece of cake.

I think this PR is enough to show the benefit of ORC PPD for enabling the config.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86251 has finished for PR 20265 at commit a556169.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86257 has finished for PR 20265 at commit eb7035d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Jan 17, 2018
## What changes were proposed in this pull request?

ORC filter push-down is disabled by default from the beginning, [SPARK-2883](aa31e43#diff-41ef65b9ef5b518f77e2a03559893f4dR149
).

Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet ([SPARK-9207](https://issues.apache.org/jira/browse/SPARK-21783)) as a part of [SPARK-20901](https://issues.apache.org/jira/browse/SPARK-20901), "Feature parity for ORC with Parquet".

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20265 from dongjoon-hyun/SPARK-21783.

(cherry picked from commit 0f8a286)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in 0f8a286 Jan 17, 2018
@dongjoon-hyun
Copy link
Member Author

Thank you so much, @cloud-fan and @gatorsmile !

@dongjoon-hyun dongjoon-hyun deleted the SPARK-21783 branch January 17, 2018 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants