Skip to content

[SPARK-7873] Allow KryoSerializerInstance to create multiple streams at the same time #6415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

JoshRosen
Copy link
Contributor

This is a somewhat obscure bug, but I think that it will seriously impact KryoSerializer users who use custom registrators which disabled auto-reset. When auto-reset is disabled, then this breaks things in some of our shuffle paths which actually end up creating multiple OutputStreams from the same shared SerializerInstance (which is unsafe).

This was introduced by a patch (SPARK-3386) which enables serializer re-use in some of the shuffle paths, since constructing new serializer instances is actually pretty costly for KryoSerializer. We had already fixed another corner-case (SPARK-7766) bug related to this, but missed this one.

I think that the root problem here is that KryoSerializerInstance can be used in a way which is unsafe even within a single thread, e.g. by creating multiple open OutputStreams from the same instance or by interleaving deserialize and deserializeStream calls. I considered a smaller patch which adds assertions to guard against this type of "misuse" but abandoned that approach after I realized how convoluted the Scaladoc became.

This patch fixes this bug by making it legal to create multiple streams from the same KryoSerializerInstance. Internally, KryoSerializerInstance now implements a borrowKryo() / releaseKryo() API that's backed by a "pool" of capacity 1. Each call to a KryoSerializerInstance method will borrow the Kryo, do its work, then release the serializer instance back to the pool. If the pool is empty and we need an instance, it will allocate a new Kryo on-demand. This makes it safe for multiple OutputStreams to be opened from the same serializer. If we try to release a Kryo back to the pool but the pool already contains a Kryo, then we'll just discard the new Kryo. I don't think there's a clear benefit to having a larger pool since our usages tend to fall into two cases, a) where we only create a single OutputStream and b) where we create a huge number of OutputStreams with the same lifecycle, then destroy the KryoSerializerInstance (this is what's happening in the bypassMergeSort code path that my regression test hits).

@SparkQA
Copy link

SparkQA commented May 26, 2015

Test build #33528 has finished for PR 6415 at commit 7350886.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pwendell
Copy link
Contributor

/cc @rxin

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

Pushed a new test demonstrating another problem: back-to-back deserialize / deserializeStream calls aren't safe, even if you close the stream first. This was a problem in the old code, too, and can lead to silent data corruption.

@SparkQA
Copy link

SparkQA commented May 26, 2015

Test build #33529 has finished for PR 6415 at commit 9816e8f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • implicit class DslLogicalPlan(val logicalPlan: LogicalPlan)

This makes it safe to invoke all SerializerInstance methods at any time,
including the creation of multiple open OutputStreams from the same
KryoSerializerInstance.
@SparkQA
Copy link

SparkQA commented May 26, 2015

Test build #33534 has finished for PR 6415 at commit ab457ca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KryoSerializationStream(
    • class KryoDeserializationStream(

@JoshRosen
Copy link
Contributor Author

Alright, this should be ready for a first pass of reviews. I'm going to work on updating the comments shortly.

@JoshRosen
Copy link
Contributor Author

Whoops, meant to post this comment earlier:

I think that the root problem here is that KryoSerializerInstance can be used in a way which is unsafe even within a single thread, e.g. by creating multiple open OutputStreams from the same instance or by interleaving deserialize and deserializeStream calls. I considered a smaller patch which adds assertions to guard against this type of "misuse" but abandoned that approach after I realized how convoluted the Scaladoc became.

I just pushed a WIP commit which illustrates my proposed fix. In a nutshell, I think that rather than using a single Kryo instance in KryoSerializerInstance, we should implement a borrowKryo() / releaseKryo() API that's backed by a "pool" of capacity 1. In the common case, every call to a KryoSerializerInstnace method, do its work, then release the serializer instance back to the pool. If the pool is empty and we need an instance, we'll allocate a new Kryo on-demand. This makes it safe for multiple OutputStreams to be opened from the same serializer. If we try to release a Kryo back to the pool but the pool already contains a Kryo, then we'll just discard the new Kryo. I don't think there's a clear benefit to having a larger pool since our usages tend to fall into two cases, a) where we only create a single OutputStream and b) where we create a huge number of OutputStreams with the same lifecycle, then destroy the KryoSerializerInstance (this is what's happening in the bypassMergeSort code path that my failing test hits).

@SparkQA
Copy link

SparkQA commented May 26, 2015

Test build #33542 has finished for PR 6415 at commit 3f1da96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KryoSerializationStream(
    • class KryoDeserializationStream(

@zsxwing
Copy link
Member

zsxwing commented May 27, 2015

LGTM

@JoshRosen
Copy link
Contributor Author

Thanks for reviewing. Don't merge this yet; I need to update the description and comments.

@JoshRosen
Copy link
Contributor Author

I've revised the pull request description and pushed a new commit which adds a few more comments.

@JoshRosen JoshRosen changed the title [SPARK-7873] [WIP] Fix another bug related to KryoSerializerInstance re-use in sort-shuffle [SPARK-7873] Fix another bug related to KryoSerializerInstance re-use in sort-shuffle May 27, 2015
@JoshRosen JoshRosen changed the title [SPARK-7873] Fix another bug related to KryoSerializerInstance re-use in sort-shuffle [SPARK-7873] Allow KryoSerializerInstance to create multiple streams at the same time May 27, 2015
@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33603 has finished for PR 6415 at commit ba55d20.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KryoSerializationStream(
    • class KryoDeserializationStream(

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33610 has finished for PR 6415 at commit ba55d20.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33620 has finished for PR 6415 at commit 00b402e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KryoSerializationStream(
    • class KryoDeserializationStream(

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

Flaky Python test...

@SparkQA
Copy link

SparkQA commented May 28, 2015

Test build #33628 has finished for PR 6415 at commit 00b402e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KryoSerializationStream(
    • class KryoDeserializationStream(

@JoshRosen
Copy link
Contributor Author

@zsxwing @pwendell unless you have additional feedback, I think that this should now be good to go for the next RC.

@zsxwing
Copy link
Member

zsxwing commented May 28, 2015

LGTM

@pwendell
Copy link
Contributor

Okay, merging this - thanks guys.

asfgit pushed a commit that referenced this pull request May 28, 2015
…at the same time

This is a somewhat obscure bug, but I think that it will seriously impact KryoSerializer users who use custom registrators which disabled auto-reset. When auto-reset is disabled, then this breaks things in some of our shuffle paths which actually end up creating multiple OutputStreams from the same shared SerializerInstance (which is unsafe).

This was introduced by a patch (SPARK-3386) which enables serializer re-use in some of the shuffle paths, since constructing new serializer instances is actually pretty costly for KryoSerializer.  We had already fixed another corner-case (SPARK-7766) bug related to this, but missed this one.

I think that the root problem here is that KryoSerializerInstance can be used in a way which is unsafe even within a single thread, e.g. by creating multiple open OutputStreams from the same instance or by interleaving deserialize and deserializeStream calls. I considered a smaller patch which adds assertions to guard against this type of "misuse" but abandoned that approach after I realized how convoluted the Scaladoc became.

This patch fixes this bug by making it legal to create multiple streams from the same KryoSerializerInstance.  Internally, KryoSerializerInstance now implements a  `borrowKryo()` / `releaseKryo()` API that's backed by a "pool" of capacity 1. Each call to a KryoSerializerInstance method will borrow the Kryo, do its work, then release the serializer instance back to the pool. If the pool is empty and we need an instance, it will allocate a new Kryo on-demand. This makes it safe for multiple OutputStreams to be opened from the same serializer. If we try to release a Kryo back to the pool but the pool already contains a Kryo, then we'll just discard the new Kryo. I don't think there's a clear benefit to having a larger pool since our usages tend to fall into two cases, a) where we only create a single OutputStream and b) where we create a huge number of OutputStreams with the same lifecycle, then destroy the KryoSerializerInstance (this is what's happening in the bypassMergeSort code path that my regression test hits).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6415 from JoshRosen/SPARK-7873 and squashes the following commits:

00b402e [Josh Rosen] Initialize eagerly to fix a failing test
ba55d20 [Josh Rosen] Add explanatory comments
3f1da96 [Josh Rosen] Guard against duplicate close()
ab457ca [Josh Rosen] Sketch a loan/release based solution.
9816e8f [Josh Rosen] Add a failing test showing how deserialize() and deserializeStream() can interfere.
7350886 [Josh Rosen] Add failing regression test for SPARK-7873

(cherry picked from commit 852f4de)
Signed-off-by: Patrick Wendell <patrick@databricks.com>
@asfgit asfgit closed this in 852f4de May 28, 2015
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…at the same time

This is a somewhat obscure bug, but I think that it will seriously impact KryoSerializer users who use custom registrators which disabled auto-reset. When auto-reset is disabled, then this breaks things in some of our shuffle paths which actually end up creating multiple OutputStreams from the same shared SerializerInstance (which is unsafe).

This was introduced by a patch (SPARK-3386) which enables serializer re-use in some of the shuffle paths, since constructing new serializer instances is actually pretty costly for KryoSerializer.  We had already fixed another corner-case (SPARK-7766) bug related to this, but missed this one.

I think that the root problem here is that KryoSerializerInstance can be used in a way which is unsafe even within a single thread, e.g. by creating multiple open OutputStreams from the same instance or by interleaving deserialize and deserializeStream calls. I considered a smaller patch which adds assertions to guard against this type of "misuse" but abandoned that approach after I realized how convoluted the Scaladoc became.

This patch fixes this bug by making it legal to create multiple streams from the same KryoSerializerInstance.  Internally, KryoSerializerInstance now implements a  `borrowKryo()` / `releaseKryo()` API that's backed by a "pool" of capacity 1. Each call to a KryoSerializerInstance method will borrow the Kryo, do its work, then release the serializer instance back to the pool. If the pool is empty and we need an instance, it will allocate a new Kryo on-demand. This makes it safe for multiple OutputStreams to be opened from the same serializer. If we try to release a Kryo back to the pool but the pool already contains a Kryo, then we'll just discard the new Kryo. I don't think there's a clear benefit to having a larger pool since our usages tend to fall into two cases, a) where we only create a single OutputStream and b) where we create a huge number of OutputStreams with the same lifecycle, then destroy the KryoSerializerInstance (this is what's happening in the bypassMergeSort code path that my regression test hits).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6415 from JoshRosen/SPARK-7873 and squashes the following commits:

00b402e [Josh Rosen] Initialize eagerly to fix a failing test
ba55d20 [Josh Rosen] Add explanatory comments
3f1da96 [Josh Rosen] Guard against duplicate close()
ab457ca [Josh Rosen] Sketch a loan/release based solution.
9816e8f [Josh Rosen] Add a failing test showing how deserialize() and deserializeStream() can interfere.
7350886 [Josh Rosen] Add failing regression test for SPARK-7873
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…at the same time

This is a somewhat obscure bug, but I think that it will seriously impact KryoSerializer users who use custom registrators which disabled auto-reset. When auto-reset is disabled, then this breaks things in some of our shuffle paths which actually end up creating multiple OutputStreams from the same shared SerializerInstance (which is unsafe).

This was introduced by a patch (SPARK-3386) which enables serializer re-use in some of the shuffle paths, since constructing new serializer instances is actually pretty costly for KryoSerializer.  We had already fixed another corner-case (SPARK-7766) bug related to this, but missed this one.

I think that the root problem here is that KryoSerializerInstance can be used in a way which is unsafe even within a single thread, e.g. by creating multiple open OutputStreams from the same instance or by interleaving deserialize and deserializeStream calls. I considered a smaller patch which adds assertions to guard against this type of "misuse" but abandoned that approach after I realized how convoluted the Scaladoc became.

This patch fixes this bug by making it legal to create multiple streams from the same KryoSerializerInstance.  Internally, KryoSerializerInstance now implements a  `borrowKryo()` / `releaseKryo()` API that's backed by a "pool" of capacity 1. Each call to a KryoSerializerInstance method will borrow the Kryo, do its work, then release the serializer instance back to the pool. If the pool is empty and we need an instance, it will allocate a new Kryo on-demand. This makes it safe for multiple OutputStreams to be opened from the same serializer. If we try to release a Kryo back to the pool but the pool already contains a Kryo, then we'll just discard the new Kryo. I don't think there's a clear benefit to having a larger pool since our usages tend to fall into two cases, a) where we only create a single OutputStream and b) where we create a huge number of OutputStreams with the same lifecycle, then destroy the KryoSerializerInstance (this is what's happening in the bypassMergeSort code path that my regression test hits).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6415 from JoshRosen/SPARK-7873 and squashes the following commits:

00b402e [Josh Rosen] Initialize eagerly to fix a failing test
ba55d20 [Josh Rosen] Add explanatory comments
3f1da96 [Josh Rosen] Guard against duplicate close()
ab457ca [Josh Rosen] Sketch a loan/release based solution.
9816e8f [Josh Rosen] Add a failing test showing how deserialize() and deserializeStream() can interfere.
7350886 [Josh Rosen] Add failing regression test for SPARK-7873
@JoshRosen JoshRosen deleted the SPARK-7873 branch July 7, 2017 23:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants