Skip to content

[SPARK-7041] Avoid writing empty files in BypassMergeSortShuffleWriter #5622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from

Conversation

JoshRosen
Copy link
Contributor

In BypassMergeSortShuffleWriter, we may end up opening disk writers files for empty partitions; this occurs because we manually call open() after creating the writer, causing serialization and compression input streams to be created; these streams may write headers to the output stream, resulting in non-zero-length files being created for partitions that contain no records. This is unnecessary, though, since the disk object writer will automatically open itself when the first write is performed. Removing this eager open() call and rewriting the consumers to cope with the non-existence of empty files results in a large performance benefit for certain sparse workloads when using sort-based shuffle. This has an impact for small-scale Spark SQL jobs in unit tests and spark-shell.

@JoshRosen JoshRosen force-pushed the file-handle-optimizations branch from b650ab2 to 3c9c944 Compare April 21, 2015 23:29
@JoshRosen JoshRosen changed the title [WIP] [SPARK-7041] Avoid writing empty files in ExternalSorter [SPARK-7041] Avoid writing empty files in ExternalSorter Apr 21, 2015
truncateStream.getChannel.truncate(initialPosition)
} finally {
truncateStream.close()
val truncateStream = new FileOutputStream(file, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into the if (initialized) block because there should be no need to truncate unless we've written to the file.

@JoshRosen
Copy link
Contributor Author

Doh, looks like this legitimately failed tests in ExternalSorterSuite due to the empty files not being present. I'll investigate.

@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30709 has finished for PR 5622 at commit b650ab2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30710 has finished for PR 5622 at commit 3c9c944.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
    • protected[spark] abstract class NativeType extends DataType
  • This patch does not change any dependencies.

if (!file.exists()) {
lengths(i) = 0
} else {
val out = new FileOutputStream(outputFile, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be opened only once not once per partition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good catch.

@SparkQA
Copy link

SparkQA commented Apr 22, 2015

Test build #30722 has finished for PR 5622 at commit ad13424.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
    • protected[spark] abstract class NativeType extends DataType
  • This patch does not change any dependencies.

lengths(i) = 0
} else {
val in = new FileInputStream(file)
util.Utils.tryWithSafeFinally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we avoid using partial package names like this? Just import spark.util.Utils.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a carryover from the old code, but I can fix this up.

@JoshRosen
Copy link
Contributor Author

I'm going to close this pull request for now, since I think that we're going to need to spend more time thinking through a more general strategy of how to handle empty partitions / output files. In some local benchmarking, the changes in this patch made a large difference for out-of-the-box single-machine spark-shell SQL performance, but I think that we'll want to re-assess perf. benchmarking for that workload after the other 1.4 improvements land.

@JoshRosen JoshRosen closed this May 9, 2015
@JoshRosen JoshRosen reopened this Jun 5, 2015
@JoshRosen JoshRosen force-pushed the file-handle-optimizations branch from ad13424 to 5c777cf Compare June 5, 2015 21:39
@JoshRosen JoshRosen changed the title [SPARK-7041] Avoid writing empty files in ExternalSorter [SPARK-7041] Avoid writing empty files in BypassMergeSortShuffleWriter Jun 5, 2015
@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34314 has finished for PR 5622 at commit 5c777cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34311 has finished for PR 5622 at commit ad13424.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.
  • This patch adds the following new dependencies:
    • activation-1.1.jar
    • compress-lzf-1.0.0.jar
    • jackson-core-asl-1.8.8.jar
    • jackson-jaxrs-1.8.8.jar
    • jackson-mapper-asl-1.8.8.jar
    • jackson-xc-1.8.8.jar
    • jaxb-api-2.2.2.jar
    • jaxb-impl-2.2.3-1.jar
    • lz4-1.2.0.jar
    • mesos-0.21.0-shaded-protobuf.jar
    • parquet-column-1.6.0rc3.jar
    • parquet-common-1.6.0rc3.jar
    • parquet-encoding-1.6.0rc3.jar
    • parquet-format-2.2.0-rc1.jar
    • parquet-generator-1.6.0rc3.jar
    • parquet-hadoop-1.6.0rc3.jar
    • parquet-jackson-1.6.0rc3.jar
    • protobuf-java-2.4.1.jar
    • pyrolite-2.0.1.jar
    • quasiquotes_2.10-2.0.1.jar
    • spark-bagel_2.10-1.4.0-SNAPSHOT.jar
    • spark-catalyst_2.10-1.4.0-SNAPSHOT.jar
    • spark-core_2.10-1.4.0-SNAPSHOT.jar
    • spark-graphx_2.10-1.4.0-SNAPSHOT.jar
    • spark-launcher_2.10-1.4.0-SNAPSHOT.jar
    • spark-mllib_2.10-1.4.0-SNAPSHOT.jar
    • spark-network-common_2.10-1.4.0-SNAPSHOT.jar
    • spark-network-shuffle_2.10-1.4.0-SNAPSHOT.jar
    • spark-repl_2.10-1.4.0-SNAPSHOT.jar
    • spark-sql_2.10-1.4.0-SNAPSHOT.jar
    • spark-streaming_2.10-1.4.0-SNAPSHOT.jar
    • tachyon-0.5.0.jar
    • tachyon-client-0.5.0.jar
  • This patch removes the following dependencies:
    • asm-3.1.jar
    • commons-compiler-2.7.8.jar
    • compress-lzf-1.0.3.jar
    • jackson-core-asl-1.9.13.jar
    • jackson-jaxrs-1.9.13.jar
    • jackson-mapper-asl-1.9.13.jar
    • jackson-xc-1.9.13.jar
    • janino-2.7.8.jar
    • javax.activation-1.1.0.v201105071233.jar
    • javax.mail.glassfish-1.4.1.v201005082020.jar
    • javax.transaction-1.1.1.v201105210645.jar
    • jaxb-api-2.2.7.jar
    • jaxb-core-2.2.7.jar
    • jaxb-impl-2.2.7.jar
    • jetty-continuation-8.1.14.v20131031.jar
    • jetty-http-8.1.14.v20131031.jar
    • jetty-io-8.1.14.v20131031.jar
    • jetty-jndi-8.1.14.v20131031.jar
    • jetty-plus-8.1.14.v20131031.jar
    • jetty-security-8.1.14.v20131031.jar
    • jetty-server-8.1.14.v20131031.jar
    • jetty-servlet-8.1.14.v20131031.jar
    • jetty-util-8.1.14.v20131031.jar
    • jetty-webapp-8.1.14.v20131031.jar
    • jetty-xml-8.1.14.v20131031.jar
    • lz4-1.3.0.jar
    • mesos-0.21.1-shaded-protobuf.jar
    • parquet-column-1.7.0.jar
    • parquet-common-1.7.0.jar
    • parquet-encoding-1.7.0.jar
    • parquet-format-2.3.0-incubating.jar
    • parquet-generator-1.7.0.jar
    • parquet-hadoop-1.7.0.jar
    • parquet-jackson-1.7.0.jar
    • pmml-agent-1.1.15.jar
    • pmml-model-1.1.15.jar
    • pmml-schema-1.1.15.jar
    • protobuf-java-2.5.0.jar
    • pyrolite-4.4.jar
    • quasiquotes_2.10-2.0.0-M8.jar
    • spark-bagel_2.10-1.5.0-SNAPSHOT.jar
    • spark-catalyst_2.10-1.5.0-SNAPSHOT.jar
    • spark-core_2.10-1.5.0-SNAPSHOT.jar
    • spark-graphx_2.10-1.5.0-SNAPSHOT.jar
    • spark-launcher_2.10-1.5.0-SNAPSHOT.jar
    • spark-mllib_2.10-1.5.0-SNAPSHOT.jar
    • spark-network-common_2.10-1.5.0-SNAPSHOT.jar
    • spark-network-shuffle_2.10-1.5.0-SNAPSHOT.jar
    • spark-repl_2.10-1.5.0-SNAPSHOT.jar
    • spark-sql_2.10-1.5.0-SNAPSHOT.jar
    • spark-streaming_2.10-1.5.0-SNAPSHOT.jar
    • spark-unsafe_2.10-1.5.0-SNAPSHOT.jar
    • tachyon-0.6.4.jar
    • tachyon-client-0.6.4.jar

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34340 has finished for PR 5622 at commit 5c777cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34454 has finished for PR 5622 at commit 5c777cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over
// an extremely small number of records then Spark SQL's default parallelism of 200 would
// result in slower out-of-the-box performance due to these constant-factor overheads. This
// optimization speeds up local microbenchmarking and SQL unit tests.
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you are still calling open() here :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, I guess mis-resolved one of those merge conflicts :)

@SparkQA
Copy link

SparkQA commented Jun 9, 2015

Test build #34520 has finished for PR 5622 at commit aaa51bf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jun 9, 2015

looks like a real failure

@JoshRosen
Copy link
Contributor Author

Yeah, I know how to fix it and will do so in a bit.

@JoshRosen JoshRosen reopened this Nov 11, 2015
@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45642 has finished for PR 5622 at commit 966cc15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 24, 2015

Test build #46561 has finished for PR 5622 at commit 618ac7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 24, 2015

Test build #46571 has finished for PR 5622 at commit 5fdfcda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

If anything, this seemed to make thing slower. Closing for now.

@JoshRosen JoshRosen closed this Nov 24, 2015
@JoshRosen JoshRosen deleted the file-handle-optimizations branch January 13, 2016 22:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants