-
Notifications
You must be signed in to change notification settings - Fork 51
SPARK-25299: Add Reader Benchmarks #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gives us pretty high fidelity into the shuffle read performance. Nice work. Left some feedback below.
|
||
private var tempDir: File = _ | ||
|
||
private val SHUFFLE_ID: Int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to declare types here? There's a lot of places in this class where type declaration isn't necessary.
dependency = dependency) | ||
|
||
// We cannot mock the TaskContext because it taskMetrics() gets called at every next() | ||
// call on the reader, and Mockito will try to log all calls to taskMetrics(), thus OOM-ing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this even the case when you add a then
behavior on the mock? If so then we're not given much of a choice here huh - I don't think this reads as well when inlined like this - perhaps make this a separate inner class entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, i can make it a separate inner class
println("Generating test data with num records: " + size) | ||
val random = new Random(123) | ||
val dataOutput = new FileOutputStream(file) | ||
val coutingOutput = new CountingOutputStream(dataOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling
serializedOutput.writeValue(x) | ||
}} | ||
} | ||
finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move finally
up a line
aggregationBenchmark.run() | ||
|
||
|
||
val sorter = Ordering.String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it's fine enough to inline this.
} | ||
|
||
def runWithLargeDataset(): Unit = { | ||
val size = TEST_DATA_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be inlined
|
||
def runWithLargeDataset(): Unit = { | ||
val size = TEST_DATA_SIZE | ||
val tempDataFile: File = File.createTempFile("test-data", "", tempDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to declare type
assert(numRead == size * NUM_MAPS) | ||
} | ||
sortingBenchmark.addTimerCase("remote rpc fetch") { timer => | ||
val reader = setupReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the exercise we did for writers, let's try to factor out more common logic to helper methods. No need for anything fancy, just reducing code duplication.
core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala
Show resolved
Hide resolved
def generateDataOnDisk(size: Int, file: File, recordOffset: Int): (Long, Long) = { | ||
// scalastyle:off println | ||
println("Generating test data with num records: " + size) | ||
class ManualCloseFileOutputStream(file: File) extends FileOutputStream(file, true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this out of the method def, a tiny bit easier to read. Bottom of the class, perhaps.
timer.startTiming() | ||
val numRead = reader.read().length | ||
timer.stopTiming() | ||
if (assertSize.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Just do assertSize.foreach
def addBenchmarkCase( | ||
benchmark: Benchmark, | ||
name: String, | ||
func: () => BlockStoreShuffleReader[String, String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func
can be shuffleReaderSupplier
to indicate that this is just a factory.
What happens if we make this the following type?
def addBenchmarkCase(
benchmark: Benchmark,
name: String,
shuffleReaderSupplier: => BlockStoreShuffleReader[String, String], // Notice there's no () here
assertSize: Option[Int] = None): Unit = {... }
But don't know my Scala offhand to know if this works, or if that changes the way one invokes such a method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hope here is that instead of passing in () => setupReader(...)
, you can instead pass in setupReader()
, and we preserve the behavior of the reader being lazily instantiated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah it worked! nice :)
benchmark: Benchmark, | ||
name: String, | ||
func: () => BlockStoreShuffleReader[String, String], | ||
assertSize: Option[Int] = Option.empty): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default to None
instead - a little more concise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
================================================================================================
BlockStoreShuffleReader reader
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
no aggregation or sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
local fetch 10614 10773 141 0.9 1061.4 1.0X
remote rpc fetch 10632 10686 38 0.9 1063.2 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with aggregation: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
local fetch 29734 30320 440 0.1 14867.0 1.0X
remote rpc fetch 29787 30162 247 0.1 14893.3 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
local fetch 31650 32065 563 0.1 15825.0 1.0X
remote rpc fetch 31713 32201 294 0.1 15856.7 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with seek: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
seek to last record 1 1 1 3480.0 0.3 1.0X
================================================================================================
BypassMergeSortShuffleWriter write
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite without spill: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without disk spill 2 3 2 0.5 2075.4 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite with spill: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo 7460 7540 43 0.9 1111.6 1.0X
with transferTo 7475 7524 29 0.9 1113.9 1.0X
================================================================================================
SortShuffleWriter writer
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter without spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills 11 15 4 0.1 10576.8 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter with spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
no map side combine 14029 14144 92 0.5 2090.4 1.0X
with map side aggregation 13997 14157 182 0.5 2085.8 1.0X
with map side sort 13988 14093 60 0.5 2084.4 1.0X
================================================================================================
UnsafeShuffleWriter write
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter without spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills 21 25 4 0.0 21124.1 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter with spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo 15837 15997 145 0.8 1180.0 1.0X
with transferTo 15852 15943 78 0.8 1181.1 1.0X
No description provided.