Skip to content

SPARK-25299: Add Reader Benchmarks #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 80 commits into from
Mar 28, 2019

Conversation

yifeih
Copy link

@yifeih yifeih commented Mar 5, 2019

No description provided.

@yifeih yifeih changed the base branch from yh/add-benchmarks-and-ci to spark-25299 March 14, 2019 20:58
Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives us pretty high fidelity into the shuffle read performance. Nice work. Left some feedback below.


private var tempDir: File = _

private val SHUFFLE_ID: Int = 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to declare types here? There's a lot of places in this class where type declaration isn't necessary.

dependency = dependency)

// We cannot mock the TaskContext because it taskMetrics() gets called at every next()
// call on the reader, and Mockito will try to log all calls to taskMetrics(), thus OOM-ing
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this even the case when you add a then behavior on the mock? If so then we're not given much of a choice here huh - I don't think this reads as well when inlined like this - perhaps make this a separate inner class entirely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, i can make it a separate inner class

println("Generating test data with num records: " + size)
val random = new Random(123)
val dataOutput = new FileOutputStream(file)
val coutingOutput = new CountingOutputStream(dataOutput)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling

serializedOutput.writeValue(x)
}}
}
finally {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move finally up a line

aggregationBenchmark.run()


val sorter = Ordering.String
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it's fine enough to inline this.

}

def runWithLargeDataset(): Unit = {
val size = TEST_DATA_SIZE
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be inlined


def runWithLargeDataset(): Unit = {
val size = TEST_DATA_SIZE
val tempDataFile: File = File.createTempFile("test-data", "", tempDir)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to declare type

assert(numRead == size * NUM_MAPS)
}
sortingBenchmark.addTimerCase("remote rpc fetch") { timer =>
val reader = setupReader(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the exercise we did for writers, let's try to factor out more common logic to helper methods. No need for anything fancy, just reducing code duplication.

def generateDataOnDisk(size: Int, file: File, recordOffset: Int): (Long, Long) = {
// scalastyle:off println
println("Generating test data with num records: " + size)
class ManualCloseFileOutputStream(file: File) extends FileOutputStream(file, true) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this out of the method def, a tiny bit easier to read. Bottom of the class, perhaps.

timer.startTiming()
val numRead = reader.read().length
timer.stopTiming()
if (assertSize.isDefined) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just do assertSize.foreach

def addBenchmarkCase(
benchmark: Benchmark,
name: String,
func: () => BlockStoreShuffleReader[String, String],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func can be shuffleReaderSupplier to indicate that this is just a factory.

What happens if we make this the following type?

def addBenchmarkCase(
    benchmark: Benchmark,
    name: String,
    shuffleReaderSupplier: => BlockStoreShuffleReader[String, String], // Notice there's no () here
    assertSize: Option[Int] = None): Unit = {... }

But don't know my Scala offhand to know if this works, or if that changes the way one invokes such a method.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hope here is that instead of passing in () => setupReader(...), you can instead pass in setupReader(), and we preserve the behavior of the reader being lazily instantiated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah it worked! nice :)

benchmark: Benchmark,
name: String,
func: () => BlockStoreShuffleReader[String, String],
assertSize: Option[Int] = Option.empty): Unit = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default to None instead - a little more concise.

@bulldozer-bot bulldozer-bot bot merged commit adc1918 into spark-25299 Mar 28, 2019
@bulldozer-bot bulldozer-bot bot deleted the yh/add-benchmarks-and-ci-reader branch March 28, 2019 00:57
Copy link

@svc-spark-25299 svc-spark-25299 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

================================================================================================
BlockStoreShuffleReader reader
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
no aggregation or sorting:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
local fetch                                       10614          10773         141          0.9        1061.4       1.0X
remote rpc fetch                                  10632          10686          38          0.9        1063.2       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with aggregation:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
local fetch                                       29734          30320         440          0.1       14867.0       1.0X
remote rpc fetch                                  29787          30162         247          0.1       14893.3       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with sorting:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
local fetch                                       31650          32065         563          0.1       15825.0       1.0X
remote rpc fetch                                  31713          32201         294          0.1       15856.7       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with seek:                                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
seek to last record                                   1              1           1       3480.0           0.3       1.0X

================================================================================================
BypassMergeSortShuffleWriter write
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite without spill:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without disk spill                      2              3           2          0.5        2075.4       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite with spill:   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo                                 7460           7540          43          0.9        1111.6       1.0X
with transferTo                                    7475           7524          29          0.9        1113.9       1.0X

================================================================================================
SortShuffleWriter writer
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter without spills:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills                         11             15           4          0.1       10576.8       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter with spills:            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
no map side combine                               14029          14144          92          0.5        2090.4       1.0X
with map side aggregation                         13997          14157         182          0.5        2085.8       1.0X
with map side sort                                13988          14093          60          0.5        2084.4       1.0X

================================================================================================
UnsafeShuffleWriter write
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter without spills:       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills                         21             25           4          0.0       21124.1       1.0X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter with spills:          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo                                15837          15997         145          0.8        1180.0       1.0X
with transferTo                                   15852          15943          78          0.8        1181.1       1.0X



Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants