-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark #22617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5bc905a
93892de
a26dbc7
0f676bc
0b04fa3
702e61c
87c4180
411174a
b0d829e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
================================================================================================ | ||
WITHOUT SPILL | ||
================================================================================================ | ||
|
||
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
ArrayBuffer 6378 / 6550 16.1 62.3 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 6196 / 6242 16.5 60.5 1.0X | ||
|
||
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
ArrayBuffer 11988 / 12027 21.9 45.7 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 37480 / 37574 7.0 143.0 0.3X | ||
|
||
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
ArrayBuffer 23536 / 23538 20.9 47.9 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 31275 / 31277 15.7 63.6 0.8X | ||
|
||
|
||
================================================================================================ | ||
WITH SPILL | ||
================================================================================================ | ||
|
||
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
UnsafeExternalSorter 29241 / 29279 9.0 111.5 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 14309 / 14313 18.3 54.6 2.0X | ||
|
||
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
UnsafeExternalSorter 11 / 11 14.8 67.4 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 9 / 9 17.6 56.8 1.2X | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution | |
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext} | ||
import org.apache.spark.benchmark.Benchmark | ||
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} | ||
import org.apache.spark.internal.config | ||
import org.apache.spark.memory.MemoryTestingUtils | ||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter | ||
|
||
object ExternalAppendOnlyUnsafeRowArrayBenchmark { | ||
/** | ||
* Benchmark ExternalAppendOnlyUnsafeRowArray. | ||
* To run this benchmark: | ||
* {{{ | ||
* 1. without sbt: | ||
* bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar> | ||
* 2. build/sbt build/sbt ";project sql;set javaOptions | ||
* in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>" | ||
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions | ||
* in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. I was confused with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, i did it a bit confusing way, but updated now to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ur, in PR description, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
* Results will be written to | ||
* "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt". | ||
* }}} | ||
*/ | ||
object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { | ||
|
||
def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { | ||
private val conf = new SparkConf(false) | ||
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test | ||
// for a bug we had with bytes written past the last object in a batch (SPARK-2792) | ||
.set("spark.serializer.objectStreamReset", "1") | ||
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") | ||
|
||
private def withFakeTaskContext(f: => Unit): Unit = { | ||
val sc = new SparkContext("local", "test", conf) | ||
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) | ||
TaskContext.setTaskContext(taskContext) | ||
f | ||
sc.stop() | ||
} | ||
|
||
private def testRows(numRows: Int): Seq[UnsafeRow] = { | ||
val random = new java.util.Random() | ||
val rows = (1 to numRows).map(_ => { | ||
(1 to numRows).map(_ => { | ||
val row = new UnsafeRow(1) | ||
row.pointTo(new Array[Byte](64), 16) | ||
row.setLong(0, random.nextLong()) | ||
row | ||
}) | ||
} | ||
|
||
val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows) | ||
def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { | ||
val rows = testRows(numRows) | ||
|
||
val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows, | ||
output = output) | ||
|
||
// Internally, `ExternalAppendOnlyUnsafeRowArray` will create an | ||
// in-memory buffer of size `numSpillThreshold`. This will mimic that | ||
|
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { | |
} | ||
} | ||
|
||
val conf = new SparkConf(false) | ||
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test | ||
// for a bug we had with bytes written past the last object in a batch (SPARK-2792) | ||
conf.set("spark.serializer.objectStreamReset", "1") | ||
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") | ||
|
||
val sc = new SparkContext("local", "test", conf) | ||
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) | ||
TaskContext.setTaskContext(taskContext) | ||
benchmark.run() | ||
sc.stop() | ||
withFakeTaskContext { | ||
benchmark.run() | ||
} | ||
} | ||
|
||
def testAgainstRawUnsafeExternalSorter( | ||
numSpillThreshold: Int, | ||
numRows: Int, | ||
iterations: Int): Unit = { | ||
val rows = testRows(numRows) | ||
|
||
val random = new java.util.Random() | ||
val rows = (1 to numRows).map(_ => { | ||
val row = new UnsafeRow(1) | ||
row.pointTo(new Array[Byte](64), 16) | ||
row.setLong(0, random.nextLong()) | ||
row | ||
}) | ||
|
||
val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows) | ||
val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows, | ||
output = output) | ||
|
||
benchmark.addCase("UnsafeExternalSorter") { _: Int => | ||
var sum = 0L | ||
|
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { | |
} | ||
} | ||
|
||
val conf = new SparkConf(false) | ||
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test | ||
// for a bug we had with bytes written past the last object in a batch (SPARK-2792) | ||
conf.set("spark.serializer.objectStreamReset", "1") | ||
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") | ||
|
||
val sc = new SparkContext("local", "test", conf) | ||
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) | ||
TaskContext.setTaskContext(taskContext) | ||
benchmark.run() | ||
sc.stop() | ||
withFakeTaskContext { | ||
benchmark.run() | ||
} | ||
} | ||
|
||
def main(args: Array[String]): Unit = { | ||
|
||
// ========================================================================================= // | ||
// WITHOUT SPILL | ||
// ========================================================================================= // | ||
|
||
val spillThreshold = 100 * 1000 | ||
|
||
/* | ||
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
||
Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
ArrayBuffer 7821 / 7941 33.5 29.8 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X | ||
*/ | ||
testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) | ||
|
||
/* | ||
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
||
Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
ArrayBuffer 19200 / 19206 25.6 39.1 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X | ||
*/ | ||
testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) | ||
|
||
/* | ||
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
||
Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
ArrayBuffer 5949 / 6028 17.2 58.1 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X | ||
*/ | ||
testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) | ||
|
||
// ========================================================================================= // | ||
// WITH SPILL | ||
// ========================================================================================= // | ||
|
||
/* | ||
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
||
Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X | ||
*/ | ||
testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) | ||
|
||
/* | ||
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
||
Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X | ||
ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X | ||
*/ | ||
testAgainstRawUnsafeExternalSorter( | ||
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) | ||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
runBenchmark("WITHOUT SPILL") { | ||
val spillThreshold = 100 * 1000 | ||
testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) | ||
testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) | ||
testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
runBenchmark("WITH SPILL") { | ||
testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) | ||
testAgainstRawUnsafeExternalSorter( | ||
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the original master branch and get the following. Since the trend is the same, this refactoring PR looks safe.