@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
20
20
import scala .collection .mutable .ArrayBuffer
21
21
22
22
import org .apache .spark .{SparkConf , SparkContext , SparkEnv , TaskContext }
23
- import org .apache .spark .benchmark .Benchmark
23
+ import org .apache .spark .benchmark .{ Benchmark , BenchmarkBase }
24
24
import org .apache .spark .internal .config
25
25
import org .apache .spark .memory .MemoryTestingUtils
26
26
import org .apache .spark .sql .catalyst .expressions .UnsafeRow
27
27
import org .apache .spark .util .collection .unsafe .sort .UnsafeExternalSorter
28
28
29
- object ExternalAppendOnlyUnsafeRowArrayBenchmark {
29
+ /**
30
+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
31
+ * To run this benchmark:
32
+ * {{{
33
+ * 1. without sbt:
34
+ * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
35
+ * 2. build/sbt build/sbt ";project sql;set javaOptions
36
+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
37
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions
38
+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
39
+ * Results will be written to
40
+ * "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
41
+ * }}}
42
+ */
43
+ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
30
44
31
- def testAgainstRawArrayBuffer (numSpillThreshold : Int , numRows : Int , iterations : Int ): Unit = {
45
+ private val conf = new SparkConf (false )
46
+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
47
+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
48
+ .set(" spark.serializer.objectStreamReset" , " 1" )
49
+ .set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
50
+
51
+ private def withFakeTaskContext (f : => Unit ): Unit = {
52
+ val sc = new SparkContext (" local" , " test" , conf)
53
+ val taskContext = MemoryTestingUtils .fakeTaskContext(SparkEnv .get)
54
+ TaskContext .setTaskContext(taskContext)
55
+ f
56
+ sc.stop()
57
+ }
58
+
59
+ private def testRows (numRows : Int ): Seq [UnsafeRow ] = {
32
60
val random = new java.util.Random ()
33
- val rows = (1 to numRows).map(_ => {
61
+ (1 to numRows).map(_ => {
34
62
val row = new UnsafeRow (1 )
35
63
row.pointTo(new Array [Byte ](64 ), 16 )
36
64
row.setLong(0 , random.nextLong())
37
65
row
38
66
})
67
+ }
39
68
40
- val benchmark = new Benchmark (s " Array with $numRows rows " , iterations * numRows)
69
+ def testAgainstRawArrayBuffer (numSpillThreshold : Int , numRows : Int , iterations : Int ): Unit = {
70
+ val rows = testRows(numRows)
71
+
72
+ val benchmark = new Benchmark (s " Array with $numRows rows " , iterations * numRows,
73
+ output = output)
41
74
42
75
// Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
43
76
// in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
82
115
}
83
116
}
84
117
85
- val conf = new SparkConf (false )
86
- // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
87
- // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
88
- conf.set(" spark.serializer.objectStreamReset" , " 1" )
89
- conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
90
-
91
- val sc = new SparkContext (" local" , " test" , conf)
92
- val taskContext = MemoryTestingUtils .fakeTaskContext(SparkEnv .get)
93
- TaskContext .setTaskContext(taskContext)
94
- benchmark.run()
95
- sc.stop()
118
+ withFakeTaskContext {
119
+ benchmark.run()
120
+ }
96
121
}
97
122
98
123
def testAgainstRawUnsafeExternalSorter (
99
124
numSpillThreshold : Int ,
100
125
numRows : Int ,
101
126
iterations : Int ): Unit = {
127
+ val rows = testRows(numRows)
102
128
103
- val random = new java.util.Random ()
104
- val rows = (1 to numRows).map(_ => {
105
- val row = new UnsafeRow (1 )
106
- row.pointTo(new Array [Byte ](64 ), 16 )
107
- row.setLong(0 , random.nextLong())
108
- row
109
- })
110
-
111
- val benchmark = new Benchmark (s " Spilling with $numRows rows " , iterations * numRows)
129
+ val benchmark = new Benchmark (s " Spilling with $numRows rows " , iterations * numRows,
130
+ output = output)
112
131
113
132
benchmark.addCase(" UnsafeExternalSorter" ) { _ : Int =>
114
133
var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
158
177
}
159
178
}
160
179
161
- val conf = new SparkConf (false )
162
- // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
163
- // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
164
- conf.set(" spark.serializer.objectStreamReset" , " 1" )
165
- conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
166
-
167
- val sc = new SparkContext (" local" , " test" , conf)
168
- val taskContext = MemoryTestingUtils .fakeTaskContext(SparkEnv .get)
169
- TaskContext .setTaskContext(taskContext)
170
- benchmark.run()
171
- sc.stop()
180
+ withFakeTaskContext {
181
+ benchmark.run()
182
+ }
172
183
}
173
184
174
- def main (args : Array [String ]): Unit = {
175
-
176
- // ========================================================================================= //
177
- // WITHOUT SPILL
178
- // ========================================================================================= //
179
-
180
- val spillThreshold = 100 * 1000
181
-
182
- /*
183
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
184
-
185
- Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
186
- ------------------------------------------------------------------------------------------------
187
- ArrayBuffer 7821 / 7941 33.5 29.8 1.0X
188
- ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X
189
- */
190
- testAgainstRawArrayBuffer(spillThreshold, 1000 , 1 << 18 )
191
-
192
- /*
193
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
194
-
195
- Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
196
- ------------------------------------------------------------------------------------------------
197
- ArrayBuffer 19200 / 19206 25.6 39.1 1.0X
198
- ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X
199
- */
200
- testAgainstRawArrayBuffer(spillThreshold, 30 * 1000 , 1 << 14 )
201
-
202
- /*
203
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
204
-
205
- Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
206
- ------------------------------------------------------------------------------------------------
207
- ArrayBuffer 5949 / 6028 17.2 58.1 1.0X
208
- ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X
209
- */
210
- testAgainstRawArrayBuffer(spillThreshold, 100 * 1000 , 1 << 10 )
211
-
212
- // ========================================================================================= //
213
- // WITH SPILL
214
- // ========================================================================================= //
215
-
216
- /*
217
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
218
-
219
- Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
220
- ------------------------------------------------------------------------------------------------
221
- UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X
222
- ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X
223
- */
224
- testAgainstRawUnsafeExternalSorter(100 * 1000 , 1000 , 1 << 18 )
225
-
226
- /*
227
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
228
-
229
- Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
230
- ------------------------------------------------------------------------------------------------
231
- UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X
232
- ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X
233
- */
234
- testAgainstRawUnsafeExternalSorter(
235
- config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD .defaultValue.get, 10 * 1000 , 1 << 4 )
185
+ override def runBenchmarkSuite (mainArgs : Array [String ]): Unit = {
186
+ runBenchmark(" WITHOUT SPILL" ) {
187
+ val spillThreshold = 100 * 1000
188
+ testAgainstRawArrayBuffer(spillThreshold, 100 * 1000 , 1 << 10 )
189
+ testAgainstRawArrayBuffer(spillThreshold, 1000 , 1 << 18 )
190
+ testAgainstRawArrayBuffer(spillThreshold, 30 * 1000 , 1 << 14 )
191
+ }
192
+
193
+ runBenchmark(" WITH SPILL" ) {
194
+ testAgainstRawUnsafeExternalSorter(100 * 1000 , 1000 , 1 << 18 )
195
+ testAgainstRawUnsafeExternalSorter(
196
+ config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD .defaultValue.get, 10 * 1000 , 1 << 4 )
197
+ }
236
198
}
237
199
}
0 commit comments