-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-13990] Automatically pick serializer when caching RDDs #11801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13990] Automatically pick serializer when caching RDDs #11801
Conversation
/cc @rxin |
Test build #53478 has finished for PR 11801 at commit
|
@rxin is it okay to change this? |
Yes - that should be an internal module. |
Test build #53481 has finished for PR 11801 at commit
|
Test build #53494 has finished for PR 11801 at commit
|
Test build #53663 has finished for PR 11801 at commit
|
Test build #53694 has finished for PR 11801 at commit
|
@@ -703,13 +710,13 @@ private[spark] class BlockManager( | |||
* | |||
* @return true if the block was stored or false if an error occurred. | |||
*/ | |||
def putBytes( | |||
def putBytes[T: ClassTag]( // TODO(josh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this adddressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. During the original design I wasn't sure whether I wanted this to be an implicit ClassTag, but in this case it's actually desired / fine.
LGTM |
blockId: BlockId, | ||
data: ManagedBuffer, | ||
level: StorageLevel, | ||
classTag: ClassTag[_]): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
Alright, I'm going to merge this now and will address any problems in followups. |
Building on the `SerializerManager` introduced in SPARK-13926/ apache#11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks. When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager. There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#11801 from JoshRosen/pick-best-serializer-for-caching.
…Values() ## What changes were proposed in this pull request? This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException. We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long. This patch addresses the bug by modifying `BlockManager`'s `get()` and `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`) ## How was this patch tested? Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed. Author: Josh Rosen <joshrosen@databricks.com> Closes #14952 from JoshRosen/SPARK-17110. (cherry picked from commit 29cfab3) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…Values() ## What changes were proposed in this pull request? This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from apache#11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException. We already fixed a similar bug in apache#14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long. This patch addresses the bug by modifying `BlockManager`'s `get()` and `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`) ## How was this patch tested? Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#14952 from JoshRosen/SPARK-17110.
Building on the
SerializerManager
introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks.When storing a local block, the BlockManager
put()
methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager.There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be
ClassTag.Any
, but it might be worth looking more carefully at those places to see whether we should be more explicit.