Skip to content

Commit a28a399

Browse files
cloud-fancmonkey
authored andcommitted
[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset
## What changes were proposed in this pull request? Currently `DatasetBenchmark` use `case class Data(l: Long, s: String)` as the record type of `RDD` and `Dataset`, which introduce serialization overhead only to `Dataset` and is unfair. This PR use `Long` as the record type, to be fairer for `Dataset` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16391 from cloud-fan/benchmark.
1 parent 09ca5b8 commit a28a399

File tree

1 file changed

+42
-33
lines changed

1 file changed

+42
-33
lines changed

sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.{SparkConf, SparkContext}
2120
import org.apache.spark.sql.expressions.Aggregator
2221
import org.apache.spark.sql.expressions.scalalang.typed
2322
import org.apache.spark.sql.functions._
@@ -34,11 +33,13 @@ object DatasetBenchmark {
3433
def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
3534
import spark.implicits._
3635

37-
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
36+
val rdd = spark.sparkContext.range(0, numRows)
37+
val ds = spark.range(0, numRows)
38+
val df = ds.toDF("l")
39+
val func = (l: Long) => l + 1
40+
3841
val benchmark = new Benchmark("back-to-back map", numRows)
39-
val func = (d: Data) => Data(d.l + 1, d.s)
4042

41-
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
4243
benchmark.addCase("RDD") { iter =>
4344
var res = rdd
4445
var i = 0
@@ -53,14 +54,14 @@ object DatasetBenchmark {
5354
var res = df
5455
var i = 0
5556
while (i < numChains) {
56-
res = res.select($"l" + 1 as "l", $"s")
57+
res = res.select($"l" + 1 as "l")
5758
i += 1
5859
}
5960
res.queryExecution.toRdd.foreach(_ => Unit)
6061
}
6162

6263
benchmark.addCase("Dataset") { iter =>
63-
var res = df.as[Data]
64+
var res = ds.as[Long]
6465
var i = 0
6566
while (i < numChains) {
6667
res = res.map(func)
@@ -75,14 +76,14 @@ object DatasetBenchmark {
7576
def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
7677
import spark.implicits._
7778

78-
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
79+
val rdd = spark.sparkContext.range(0, numRows)
80+
val ds = spark.range(0, numRows)
81+
val df = ds.toDF("l")
82+
val func = (l: Long, i: Int) => l % (100L + i) == 0L
83+
val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) }
84+
7985
val benchmark = new Benchmark("back-to-back filter", numRows)
80-
val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
81-
val funcs = 0.until(numChains).map { i =>
82-
(d: Data) => func(d, i)
83-
}
8486

85-
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
8687
benchmark.addCase("RDD") { iter =>
8788
var res = rdd
8889
var i = 0
@@ -104,7 +105,7 @@ object DatasetBenchmark {
104105
}
105106

106107
benchmark.addCase("Dataset") { iter =>
107-
var res = df.as[Data]
108+
var res = ds.as[Long]
108109
var i = 0
109110
while (i < numChains) {
110111
res = res.filter(funcs(i))
@@ -133,24 +134,29 @@ object DatasetBenchmark {
133134
def aggregate(spark: SparkSession, numRows: Long): Benchmark = {
134135
import spark.implicits._
135136

136-
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
137+
val rdd = spark.sparkContext.range(0, numRows)
138+
val ds = spark.range(0, numRows)
139+
val df = ds.toDF("l")
140+
137141
val benchmark = new Benchmark("aggregate", numRows)
138142

139-
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
140143
benchmark.addCase("RDD sum") { iter =>
141-
rdd.aggregate(0L)(_ + _.l, _ + _)
144+
rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit)
142145
}
143146

144147
benchmark.addCase("DataFrame sum") { iter =>
145-
df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
148+
df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
146149
}
147150

148151
benchmark.addCase("Dataset sum using Aggregator") { iter =>
149-
df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit)
152+
val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity))
153+
result.queryExecution.toRdd.foreach(_ => Unit)
150154
}
151155

156+
val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data]
152157
benchmark.addCase("Dataset complex Aggregator") { iter =>
153-
df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit)
158+
val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn)
159+
result.queryExecution.toRdd.foreach(_ => Unit)
154160
}
155161

156162
benchmark
@@ -170,36 +176,39 @@ object DatasetBenchmark {
170176
val benchmark3 = aggregate(spark, numRows)
171177

172178
/*
173-
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
174-
Intel Xeon E3-12xx v2 (Ivy Bridge)
179+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
180+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
181+
175182
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
176183
------------------------------------------------------------------------------------------------
177-
RDD 3448 / 3646 29.0 34.5 1.0X
178-
DataFrame 2647 / 3116 37.8 26.5 1.3X
179-
Dataset 4781 / 5155 20.9 47.8 0.7X
184+
RDD 3963 / 3976 25.2 39.6 1.0X
185+
DataFrame 826 / 834 121.1 8.3 4.8X
186+
Dataset 5178 / 5198 19.3 51.8 0.8X
180187
*/
181188
benchmark.run()
182189

183190
/*
184-
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
185-
Intel Xeon E3-12xx v2 (Ivy Bridge)
191+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
192+
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
193+
186194
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
187195
------------------------------------------------------------------------------------------------
188-
RDD 1346 / 1618 74.3 13.5 1.0X
189-
DataFrame 59 / 72 1695.4 0.6 22.8X
190-
Dataset 2777 / 2805 36.0 27.8 0.5X
196+
RDD 533 / 587 187.6 5.3 1.0X
197+
DataFrame 79 / 91 1269.0 0.8 6.8X
198+
Dataset 550 / 559 181.7 5.5 1.0X
191199
*/
192200
benchmark2.run()
193201

194202
/*
195203
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
196204
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
205+
197206
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
198207
------------------------------------------------------------------------------------------------
199-
RDD sum 1913 / 1942 52.3 19.1 1.0X
200-
DataFrame sum 46 / 61 2157.7 0.5 41.3X
201-
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
202-
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
208+
RDD sum 2297 / 2440 43.5 23.0 1.0X
209+
DataFrame sum 630 / 637 158.7 6.3 3.6X
210+
Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X
211+
Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X
203212
*/
204213
benchmark3.run()
205214
}

0 commit comments

Comments
 (0)