Skip to content

Commit a6575de

Browse files
committed
Benchmark column index
1 parent 40be254 commit a6575de

File tree

2 files changed

+366
-1
lines changed

2 files changed

+366
-1
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ private void initializeInternal() throws IOException, UnsupportedOperationExcept
298298

299299
private void checkEndOfRowGroup() throws IOException {
300300
if (rowsReturned != totalCountLoadedSoFar) return;
301-
PageReadStore pages = reader.readNextRowGroup();
301+
PageReadStore pages = reader.readNextFilteredRowGroup();
302302
if (pages == null) {
303303
throw new IOException("expecting more rows but reached last block. Read "
304304
+ rowsReturned + " out of " + totalRowCount);
Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.benchmark
19+
20+
import java.io.File
21+
22+
import scala.util.Random
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.benchmark.Benchmark
26+
import org.apache.spark.internal.config.UI._
27+
import org.apache.spark.sql.{DataFrame, SparkSession}
28+
import org.apache.spark.sql.functions.monotonically_increasing_id
29+
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
31+
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType}
32+
33+
/**
34+
* Benchmark to measure read performance with Filter pushdown.
35+
* To run this benchmark:
36+
* {{{
37+
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
38+
* 2. build/sbt "sql/test:runMain <this class>"
39+
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
40+
* Results will be written to "benchmarks/ParquetFilterPushdownBenchmark-results.txt".
41+
* }}}
42+
*/
43+
object ParquetFilterPushdownBenchmark extends SqlBasedBenchmark {
44+
45+
override def getSparkSession: SparkSession = {
46+
val conf = new SparkConf()
47+
.setAppName(this.getClass.getSimpleName)
48+
// Since `spark.master` always exists, overrides this value
49+
.set("spark.master", "local[1]")
50+
.setIfMissing("spark.driver.memory", "3g")
51+
.setIfMissing("spark.executor.memory", "3g")
52+
.setIfMissing(UI_ENABLED, false)
53+
.setIfMissing("orc.compression", "snappy")
54+
.setIfMissing("spark.sql.parquet.compression.codec", "snappy")
55+
56+
SparkSession.builder().config(conf).getOrCreate()
57+
}
58+
59+
private val numRows = 1024 * 1024 * 15
60+
private val width = 5
61+
private val mid = numRows / 2
62+
// For Parquet/ORC, we will use the same value for block size and compression size
63+
private val blockSize = org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE
64+
65+
def withTempTable(tableNames: String*)(f: => Unit): Unit = {
66+
try f finally tableNames.foreach(spark.catalog.dropTempView)
67+
}
68+
69+
private def prepareTable(
70+
dir: File, numRows: Int, width: Int, useStringForValue: Boolean): Unit = {
71+
import spark.implicits._
72+
val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
73+
val valueCol = if (useStringForValue) {
74+
monotonically_increasing_id().cast("string")
75+
} else {
76+
monotonically_increasing_id()
77+
}
78+
val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*)
79+
.withColumn("value", valueCol)
80+
.sort("value")
81+
82+
saveAsTable(df, dir)
83+
}
84+
85+
private def prepareStringDictTable(
86+
dir: File, numRows: Int, numDistinctValues: Int, width: Int): Unit = {
87+
val selectExpr = (0 to width).map {
88+
case 0 => s"CAST(id % $numDistinctValues AS STRING) AS value"
89+
case i => s"CAST(rand() AS STRING) c$i"
90+
}
91+
val df = spark.range(numRows).selectExpr(selectExpr: _*).sort("value")
92+
93+
saveAsTable(df, dir, true)
94+
}
95+
96+
private def saveAsTable(df: DataFrame, dir: File, useDictionary: Boolean = false): Unit = {
97+
val parquetPath = dir.getCanonicalPath + "/parquet"
98+
df.write.mode("overwrite")
99+
.option("parquet.block.size", blockSize).parquet(parquetPath)
100+
spark.read.parquet(parquetPath).createOrReplaceTempView("parquetTable")
101+
}
102+
103+
def filterPushDownBenchmark(
104+
values: Int,
105+
title: String,
106+
whereExpr: String,
107+
selectExpr: String = "*"): Unit = {
108+
val benchmark = new Benchmark(title, values, minNumIters = 5, output = output)
109+
110+
Seq(false, true).foreach { pushDownEnabled =>
111+
val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}"
112+
benchmark.addCase(name) { _ =>
113+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") {
114+
spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").collect()
115+
}
116+
}
117+
}
118+
benchmark.run()
119+
}
120+
121+
private def runIntBenchmark(numRows: Int, width: Int, mid: Int): Unit = {
122+
Seq("value IS NULL", s"$mid < value AND value < $mid").foreach { whereExpr =>
123+
val title = s"Select 0 int row ($whereExpr)".replace("value AND value", "value")
124+
filterPushDownBenchmark(numRows, title, whereExpr)
125+
}
126+
127+
Seq(
128+
s"value = $mid",
129+
s"value <=> $mid",
130+
s"$mid <= value AND value <= $mid",
131+
s"${mid - 1} < value AND value < ${mid + 1}"
132+
).foreach { whereExpr =>
133+
val title = s"Select 1 int row ($whereExpr)".replace("value AND value", "value")
134+
filterPushDownBenchmark(numRows, title, whereExpr)
135+
}
136+
137+
val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
138+
139+
Seq(10, 50, 90).foreach { percent =>
140+
filterPushDownBenchmark(
141+
numRows,
142+
s"Select $percent% int rows (value < ${numRows * percent / 100})",
143+
s"value < ${numRows * percent / 100}",
144+
selectExpr
145+
)
146+
}
147+
148+
Seq("value IS NOT NULL", "value > -1", "value != -1").foreach { whereExpr =>
149+
filterPushDownBenchmark(
150+
numRows,
151+
s"Select all int rows ($whereExpr)",
152+
whereExpr,
153+
selectExpr)
154+
}
155+
}
156+
157+
private def runStringBenchmark(
158+
numRows: Int, width: Int, searchValue: Int, colType: String): Unit = {
159+
Seq("value IS NULL", s"'$searchValue' < value AND value < '$searchValue'")
160+
.foreach { whereExpr =>
161+
val title = s"Select 0 $colType row ($whereExpr)".replace("value AND value", "value")
162+
filterPushDownBenchmark(numRows, title, whereExpr)
163+
}
164+
165+
Seq(
166+
s"value = '$searchValue'",
167+
s"value <=> '$searchValue'",
168+
s"'$searchValue' <= value AND value <= '$searchValue'"
169+
).foreach { whereExpr =>
170+
val title = s"Select 1 $colType row ($whereExpr)".replace("value AND value", "value")
171+
filterPushDownBenchmark(numRows, title, whereExpr)
172+
}
173+
174+
val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
175+
176+
Seq("value IS NOT NULL").foreach { whereExpr =>
177+
filterPushDownBenchmark(
178+
numRows,
179+
s"Select all $colType rows ($whereExpr)",
180+
whereExpr,
181+
selectExpr)
182+
}
183+
}
184+
185+
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
186+
runBenchmark("Pushdown for many distinct value case") {
187+
withTempPath { dir =>
188+
withTempTable("parquetTable") {
189+
Seq(true, false).foreach { useStringForValue =>
190+
prepareTable(dir, numRows, width, useStringForValue)
191+
if (useStringForValue) {
192+
runStringBenchmark(numRows, width, mid, "string")
193+
} else {
194+
runIntBenchmark(numRows, width, mid)
195+
}
196+
}
197+
}
198+
}
199+
}
200+
201+
runBenchmark("Pushdown for few distinct value case (use dictionary encoding)") {
202+
withTempPath { dir =>
203+
val numDistinctValues = 200
204+
205+
withTempTable("parquetTable") {
206+
prepareStringDictTable(dir, numRows, numDistinctValues, width)
207+
runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string")
208+
}
209+
}
210+
}
211+
212+
runBenchmark("Pushdown benchmark for StringStartsWith") {
213+
withTempPath { dir =>
214+
withTempTable("parquetTable") {
215+
prepareTable(dir, numRows, width, true)
216+
Seq(
217+
"value like '10%'",
218+
"value like '1000%'",
219+
s"value like '${mid.toString.substring(0, mid.toString.length - 1)}%'"
220+
).foreach { whereExpr =>
221+
val title = s"StringStartsWith filter: ($whereExpr)"
222+
filterPushDownBenchmark(numRows, title, whereExpr)
223+
}
224+
}
225+
}
226+
}
227+
228+
runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
229+
withTempPath { dir =>
230+
Seq(
231+
s"decimal(${Decimal.MAX_INT_DIGITS}, 2)",
232+
s"decimal(${Decimal.MAX_LONG_DIGITS}, 2)",
233+
s"decimal(${DecimalType.MAX_PRECISION}, 2)"
234+
).foreach { dt =>
235+
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
236+
val valueCol = if (dt.equalsIgnoreCase(s"decimal(${Decimal.MAX_INT_DIGITS}, 2)")) {
237+
monotonically_increasing_id() % 9999999
238+
} else {
239+
monotonically_increasing_id()
240+
}
241+
val df = spark.range(numRows)
242+
.selectExpr(columns: _*).withColumn("value", valueCol.cast(dt))
243+
withTempTable("parquetTable") {
244+
saveAsTable(df, dir)
245+
246+
Seq(s"value = $mid").foreach { whereExpr =>
247+
val title = s"Select 1 $dt row ($whereExpr)".replace("value AND value", "value")
248+
filterPushDownBenchmark(numRows, title, whereExpr)
249+
}
250+
251+
val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
252+
Seq(10, 50, 90).foreach { percent =>
253+
filterPushDownBenchmark(
254+
numRows,
255+
s"Select $percent% $dt rows (value < ${numRows * percent / 100})",
256+
s"value < ${numRows * percent / 100}",
257+
selectExpr
258+
)
259+
}
260+
}
261+
}
262+
}
263+
}
264+
265+
runBenchmark("Pushdown benchmark for InSet -> InFilters") {
266+
withTempPath { dir =>
267+
withTempTable("parquetTable") {
268+
prepareTable(dir, numRows, width, false)
269+
Seq(5, 10, 50, 100).foreach { count =>
270+
Seq(10, 50, 90).foreach { distribution =>
271+
val filter =
272+
Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100))
273+
val whereExpr = s"value in(${filter.mkString(",")})"
274+
val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)"
275+
filterPushDownBenchmark(numRows, title, whereExpr)
276+
}
277+
}
278+
}
279+
}
280+
}
281+
282+
runBenchmark(s"Pushdown benchmark for ${ByteType.simpleString}") {
283+
withTempPath { dir =>
284+
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
285+
val df = spark.range(numRows).selectExpr(columns: _*)
286+
.withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType))
287+
.orderBy("value")
288+
withTempTable("parquetTable") {
289+
saveAsTable(df, dir)
290+
291+
Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})")
292+
.foreach { whereExpr =>
293+
val title = s"Select 1 ${ByteType.simpleString} row ($whereExpr)"
294+
.replace("value AND value", "value")
295+
filterPushDownBenchmark(numRows, title, whereExpr)
296+
}
297+
298+
val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
299+
Seq(10, 50, 90).foreach { percent =>
300+
filterPushDownBenchmark(
301+
numRows,
302+
s"Select $percent% ${ByteType.simpleString} rows " +
303+
s"(value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString}))",
304+
s"value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString})",
305+
selectExpr
306+
)
307+
}
308+
}
309+
}
310+
}
311+
312+
runBenchmark(s"Pushdown benchmark for Timestamp") {
313+
withTempPath { dir =>
314+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) {
315+
ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType =>
316+
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
317+
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
318+
val df = spark.range(numRows).selectExpr(columns: _*)
319+
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
320+
withTempTable("parquetTable") {
321+
saveAsTable(df, dir)
322+
323+
Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
324+
val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
325+
.replace("value AND value", "value")
326+
filterPushDownBenchmark(numRows, title, whereExpr)
327+
}
328+
329+
val selectExpr = (1 to width)
330+
.map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)")
331+
Seq(10, 50, 90).foreach { percent =>
332+
filterPushDownBenchmark(
333+
numRows,
334+
s"Select $percent% timestamp stored as $fileType rows " +
335+
s"(value < CAST(${numRows * percent / 100} AS timestamp))",
336+
s"value < CAST(${numRows * percent / 100} as timestamp)",
337+
selectExpr
338+
)
339+
}
340+
}
341+
}
342+
}
343+
}
344+
}
345+
}
346+
347+
runBenchmark(s"Pushdown benchmark with many filters") {
348+
val numRows = 1
349+
val width = 500
350+
351+
withTempPath { dir =>
352+
val columns = (1 to width).map(i => s"id c$i")
353+
val df = spark.range(1).selectExpr(columns: _*)
354+
withTempTable("parquetTable") {
355+
saveAsTable(df, dir)
356+
Seq(1, 250, 500).foreach { numFilter =>
357+
val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ")
358+
// Note: InferFiltersFromConstraints will add more filters to this given filters
359+
filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr)
360+
}
361+
}
362+
}
363+
}
364+
}
365+
}

0 commit comments

Comments
 (0)