Skip to content

[SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark #22617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
================================================================================================
WITHOUT SPILL
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 6378 / 6550 16.1 62.3 1.0X
ExternalAppendOnlyUnsafeRowArray 6196 / 6242 16.5 60.5 1.0X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 11988 / 12027 21.9 45.7 1.0X
ExternalAppendOnlyUnsafeRowArray 37480 / 37574 7.0 143.0 0.3X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 23536 / 23538 20.9 47.9 1.0X
ExternalAppendOnlyUnsafeRowArray 31275 / 31277 15.7 63.6 0.8X


================================================================================================
WITH SPILL
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 29241 / 29279 9.0 111.5 1.0X
ExternalAppendOnlyUnsafeRowArray 14309 / 14313 18.3 54.6 2.0X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 11 / 11 14.8 67.4 1.0X
ExternalAppendOnlyUnsafeRowArray 9 / 9 17.6 56.8 1.2X

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the original master branch and get the following. Since the trend is the same, this refactoring PR looks safe.

$ bin/spark-submit --class org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark --jars core/target/scala-2.12/spark-core_2.12-3.0.0-SNAPSHOT-tests.jar sql/core/target/scala-2.12/spark-sql_2.12-3.0.0-SNAPSHOT-tests.jar
...
Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   9556 / 9633         27.4          36.5       1.0X
ExternalAppendOnlyUnsafeRowArray            18514 / 18700         14.2          70.6       0.5X

Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                 22180 / 22195         22.2          45.1       1.0X
ExternalAppendOnlyUnsafeRowArray            24254 / 24331         20.3          49.3       0.9X

Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ArrayBuffer                                   4998 / 5052         20.5          48.8       1.0X
ExternalAppendOnlyUnsafeRowArray              4778 / 4821         21.4          46.7       1.0X

Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                        17536 / 17596         14.9          66.9       1.0X
ExternalAppendOnlyUnsafeRowArray            10380 / 10451         25.3          39.6       1.7X

Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter                             6 /    7         25.3          39.5       1.0X
ExternalAppendOnlyUnsafeRowArray                 6 /    7         26.3          38.0       1.0X


Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

object ExternalAppendOnlyUnsafeRowArrayBenchmark {
/**
* Benchmark ExternalAppendOnlyUnsafeRowArray.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
* 2. build/sbt build/sbt ";project sql;set javaOptions
* in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions
* in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I was confused with -=.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i did it a bit confusing way, but updated now to += ...=false in a new commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, in PR description, runMain is repeated twice; test:runMain test:runMain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* Results will be written to
* "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
* }}}
*/
object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {

def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
private val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
.set("spark.serializer.objectStreamReset", "1")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

private def withFakeTaskContext(f: => Unit): Unit = {
val sc = new SparkContext("local", "test", conf)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
f
sc.stop()
}

private def testRows(numRows: Int): Seq[UnsafeRow] = {
val random = new java.util.Random()
val rows = (1 to numRows).map(_ => {
(1 to numRows).map(_ => {
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](64), 16)
row.setLong(0, random.nextLong())
row
})
}

val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows)
def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
val rows = testRows(numRows)

val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows,
output = output)

// Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
// in-memory buffer of size `numSpillThreshold`. This will mimic that
Expand Down Expand Up @@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}

val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val sc = new SparkContext("local", "test", conf)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
benchmark.run()
sc.stop()
withFakeTaskContext {
benchmark.run()
}
}

def testAgainstRawUnsafeExternalSorter(
numSpillThreshold: Int,
numRows: Int,
iterations: Int): Unit = {
val rows = testRows(numRows)

val random = new java.util.Random()
val rows = (1 to numRows).map(_ => {
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](64), 16)
row.setLong(0, random.nextLong())
row
})

val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows)
val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows,
output = output)

benchmark.addCase("UnsafeExternalSorter") { _: Int =>
var sum = 0L
Expand Down Expand Up @@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}

val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val sc = new SparkContext("local", "test", conf)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
benchmark.run()
sc.stop()
withFakeTaskContext {
benchmark.run()
}
}

def main(args: Array[String]): Unit = {

// ========================================================================================= //
// WITHOUT SPILL
// ========================================================================================= //

val spillThreshold = 100 * 1000

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 7821 / 7941 33.5 29.8 1.0X
ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X
*/
testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 19200 / 19206 25.6 39.1 1.0X
ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X
*/
testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ArrayBuffer 5949 / 6028 17.2 58.1 1.0X
ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X
*/
testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)

// ========================================================================================= //
// WITH SPILL
// ========================================================================================= //

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X
ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X
*/
testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)

/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X
ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X
*/
testAgainstRawUnsafeExternalSorter(
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("WITHOUT SPILL") {
val spillThreshold = 100 * 1000
testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the original sequence; 1000 -> 30 * 1000 -> 100 * 1000. Increasing order is more intuitive.
Ah, I got it. This is reordered by the calculation. Please forgot about the above comment.

>>> 1000 * (1<<18)
262144000
>>> 30 * 1000 * (1<<14)
491520000
>>> 100 * 1000 * (1<<10)
102400000

}

runBenchmark("WITH SPILL") {
testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
testAgainstRawUnsafeExternalSorter(
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
}
}
}