-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-7041] Avoid writing empty files in BypassMergeSortShuffleWriter #5622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b650ab2
to
3c9c944
Compare
truncateStream.getChannel.truncate(initialPosition) | ||
} finally { | ||
truncateStream.close() | ||
val truncateStream = new FileOutputStream(file, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this into the if (initialized)
block because there should be no need to truncate unless we've written to the file.
Doh, looks like this legitimately failed tests in ExternalSorterSuite due to the empty files not being present. I'll investigate. |
Test build #30709 has finished for PR 5622 at commit
|
Test build #30710 has finished for PR 5622 at commit
|
if (!file.exists()) { | ||
lengths(i) = 0 | ||
} else { | ||
val out = new FileOutputStream(outputFile, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be opened only once not once per partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch.
Test build #30722 has finished for PR 5622 at commit
|
lengths(i) = 0 | ||
} else { | ||
val in = new FileInputStream(file) | ||
util.Utils.tryWithSafeFinally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we avoid using partial package names like this? Just import spark.util.Utils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a carryover from the old code, but I can fix this up.
I'm going to close this pull request for now, since I think that we're going to need to spend more time thinking through a more general strategy of how to handle empty partitions / output files. In some local benchmarking, the changes in this patch made a large difference for out-of-the-box single-machine spark-shell SQL performance, but I think that we'll want to re-assess perf. benchmarking for that workload after the other 1.4 improvements land. |
ad13424
to
5c777cf
Compare
Test build #34314 has finished for PR 5622 at commit
|
Test build #34311 has finished for PR 5622 at commit
|
Jenkins, retest this please. |
Test build #34340 has finished for PR 5622 at commit
|
Jenkins, retest this please. |
Test build #34454 has finished for PR 5622 at commit
|
// around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over | ||
// an extremely small number of records then Spark SQL's default parallelism of 200 would | ||
// result in slower out-of-the-box performance due to these constant-factor overheads. This | ||
// optimization speeds up local microbenchmarking and SQL unit tests. | ||
partitionWriters[i] = | ||
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like you are still calling open()
here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh, I guess mis-resolved one of those merge conflicts :)
Test build #34520 has finished for PR 5622 at commit
|
looks like a real failure |
Yeah, I know how to fix it and will do so in a bit. |
Test build #45642 has finished for PR 5622 at commit
|
Test build #46561 has finished for PR 5622 at commit
|
Test build #46571 has finished for PR 5622 at commit
|
If anything, this seemed to make thing slower. Closing for now. |
In BypassMergeSortShuffleWriter, we may end up opening disk writers files for empty partitions; this occurs because we manually call
open()
after creating the writer, causing serialization and compression input streams to be created; these streams may write headers to the output stream, resulting in non-zero-length files being created for partitions that contain no records. This is unnecessary, though, since the disk object writer will automatically open itself when the first write is performed. Removing this eager open() call and rewriting the consumers to cope with the non-existence of empty files results in a large performance benefit for certain sparse workloads when using sort-based shuffle. This has an impact for small-scale Spark SQL jobs in unit tests andspark-shell
.