@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
2020import scala .collection .mutable .ArrayBuffer
2121
2222import org .apache .spark .{SparkConf , SparkContext , SparkEnv , TaskContext }
23- import org .apache .spark .benchmark .Benchmark
23+ import org .apache .spark .benchmark .{ Benchmark , BenchmarkBase }
2424import org .apache .spark .internal .config
2525import org .apache .spark .memory .MemoryTestingUtils
2626import org .apache .spark .sql .catalyst .expressions .UnsafeRow
2727import org .apache .spark .util .collection .unsafe .sort .UnsafeExternalSorter
2828
29- object ExternalAppendOnlyUnsafeRowArrayBenchmark {
29+ /**
30+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
31+ * To run this benchmark:
32+ * {{{
33+ * 1. without sbt:
34+ * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
35+ * 2. build/sbt build/sbt ";project sql;set javaOptions
36+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
37+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions
38+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
39+ * Results will be written to
40+ * "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
41+ * }}}
42+ */
43+ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
3044
31- def testAgainstRawArrayBuffer (numSpillThreshold : Int , numRows : Int , iterations : Int ): Unit = {
45+ private val conf = new SparkConf (false )
46+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
47+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
48+ .set(" spark.serializer.objectStreamReset" , " 1" )
49+ .set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
50+
51+ private def withFakeTaskContext (f : => Unit ): Unit = {
52+ val sc = new SparkContext (" local" , " test" , conf)
53+ val taskContext = MemoryTestingUtils .fakeTaskContext(SparkEnv .get)
54+ TaskContext .setTaskContext(taskContext)
55+ f
56+ sc.stop()
57+ }
58+
59+ private def testRows (numRows : Int ): Seq [UnsafeRow ] = {
3260 val random = new java.util.Random ()
33- val rows = (1 to numRows).map(_ => {
61+ (1 to numRows).map(_ => {
3462 val row = new UnsafeRow (1 )
3563 row.pointTo(new Array [Byte ](64 ), 16 )
3664 row.setLong(0 , random.nextLong())
3765 row
3866 })
67+ }
3968
40- val benchmark = new Benchmark (s " Array with $numRows rows " , iterations * numRows)
69+ def testAgainstRawArrayBuffer (numSpillThreshold : Int , numRows : Int , iterations : Int ): Unit = {
70+ val rows = testRows(numRows)
71+
72+ val benchmark = new Benchmark (s " Array with $numRows rows " , iterations * numRows,
73+ output = output)
4174
4275 // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
4376 // in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
82115 }
83116 }
84117
85- val conf = new SparkConf (false )
86- // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
87- // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
88- conf.set(" spark.serializer.objectStreamReset" , " 1" )
89- conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
90-
91- val sc = new SparkContext (" local" , " test" , conf)
92- val taskContext = MemoryTestingUtils .fakeTaskContext(SparkEnv .get)
93- TaskContext .setTaskContext(taskContext)
94- benchmark.run()
95- sc.stop()
118+ withFakeTaskContext {
119+ benchmark.run()
120+ }
96121 }
97122
98123 def testAgainstRawUnsafeExternalSorter (
99124 numSpillThreshold : Int ,
100125 numRows : Int ,
101126 iterations : Int ): Unit = {
127+ val rows = testRows(numRows)
102128
103- val random = new java.util.Random ()
104- val rows = (1 to numRows).map(_ => {
105- val row = new UnsafeRow (1 )
106- row.pointTo(new Array [Byte ](64 ), 16 )
107- row.setLong(0 , random.nextLong())
108- row
109- })
110-
111- val benchmark = new Benchmark (s " Spilling with $numRows rows " , iterations * numRows)
129+ val benchmark = new Benchmark (s " Spilling with $numRows rows " , iterations * numRows,
130+ output = output)
112131
113132 benchmark.addCase(" UnsafeExternalSorter" ) { _ : Int =>
114133 var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
158177 }
159178 }
160179
161- val conf = new SparkConf (false )
162- // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
163- // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
164- conf.set(" spark.serializer.objectStreamReset" , " 1" )
165- conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
166-
167- val sc = new SparkContext (" local" , " test" , conf)
168- val taskContext = MemoryTestingUtils .fakeTaskContext(SparkEnv .get)
169- TaskContext .setTaskContext(taskContext)
170- benchmark.run()
171- sc.stop()
180+ withFakeTaskContext {
181+ benchmark.run()
182+ }
172183 }
173184
174- def main (args : Array [String ]): Unit = {
175-
176- // ========================================================================================= //
177- // WITHOUT SPILL
178- // ========================================================================================= //
179-
180- val spillThreshold = 100 * 1000
181-
182- /*
183- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
184-
185- Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
186- ------------------------------------------------------------------------------------------------
187- ArrayBuffer 7821 / 7941 33.5 29.8 1.0X
188- ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X
189- */
190- testAgainstRawArrayBuffer(spillThreshold, 1000 , 1 << 18 )
191-
192- /*
193- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
194-
195- Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
196- ------------------------------------------------------------------------------------------------
197- ArrayBuffer 19200 / 19206 25.6 39.1 1.0X
198- ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X
199- */
200- testAgainstRawArrayBuffer(spillThreshold, 30 * 1000 , 1 << 14 )
201-
202- /*
203- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
204-
205- Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
206- ------------------------------------------------------------------------------------------------
207- ArrayBuffer 5949 / 6028 17.2 58.1 1.0X
208- ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X
209- */
210- testAgainstRawArrayBuffer(spillThreshold, 100 * 1000 , 1 << 10 )
211-
212- // ========================================================================================= //
213- // WITH SPILL
214- // ========================================================================================= //
215-
216- /*
217- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
218-
219- Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
220- ------------------------------------------------------------------------------------------------
221- UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X
222- ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X
223- */
224- testAgainstRawUnsafeExternalSorter(100 * 1000 , 1000 , 1 << 18 )
225-
226- /*
227- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
228-
229- Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
230- ------------------------------------------------------------------------------------------------
231- UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X
232- ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X
233- */
234- testAgainstRawUnsafeExternalSorter(
235- config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD .defaultValue.get, 10 * 1000 , 1 << 4 )
185+ override def runBenchmarkSuite (mainArgs : Array [String ]): Unit = {
186+ runBenchmark(" WITHOUT SPILL" ) {
187+ val spillThreshold = 100 * 1000
188+ testAgainstRawArrayBuffer(spillThreshold, 100 * 1000 , 1 << 10 )
189+ testAgainstRawArrayBuffer(spillThreshold, 1000 , 1 << 18 )
190+ testAgainstRawArrayBuffer(spillThreshold, 30 * 1000 , 1 << 14 )
191+ }
192+
193+ runBenchmark(" WITH SPILL" ) {
194+ testAgainstRawUnsafeExternalSorter(100 * 1000 , 1000 , 1 << 18 )
195+ testAgainstRawUnsafeExternalSorter(
196+ config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD .defaultValue.get, 10 * 1000 , 1 << 4 )
197+ }
236198 }
237199}
0 commit comments