Skip to content

[SPARK-5360] [SPARK-6606] Eliminate duplicate objects in serialized CoGroupedRDD #4145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

kayousterhout
Copy link
Contributor

CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle. The partition is serialized separately from the RDD, so when the RDD and partition arrive on the worker, the references in the partition and in the RDD no longer point to the same object.

This is a relatively minor performance issue (the closure can be 2x larger than it needs to be because the rdds and partitions are serialized twice; see numbers below) but is more annoying as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle on the worker side, subtle bugs can appear due to the fact that the references to the RDD / ShuffleHandle in the RDD and in the partition point to separate objects. I'm not sure if this is enough of a potential future problem to fix this old and central part of the code, so hoping to get input from others here.

I did some simple experiments to see how much this effects closure size. For this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my change. The difference comes from eliminating duplicate serialization of the shuffle handle.

For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies.

The ShuffleHandle includes the ShuffleDependency, so this difference will get larger if a ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to None by default). It would also get bigger for a big RDD -- although I can't think of any examples where the RDD object gets large. The difference is not affected by the size of the function the user specifies, which (based on my understanding) is typically the source of large task closures.

@SparkQA
Copy link

SparkQA commented Jan 21, 2015

Test build #25917 has finished for PR 4145 at commit 912d48d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor

@JoshRosen Can someone verify this patch?

@kayousterhout kayousterhout changed the title [SPARK-5360] Eliminate duplicate objects in serialized CoGroupedRDD [SPARK-5360] [SPARK-6606] Eliminate duplicate objects in serialized CoGroupedRDD Apr 6, 2015

/** The references to rdd and splitIndex are transient because redundant information is stored
* in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from
* CoGrpupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit pick CoGroupPartition

* corresponding index.
*/
private[spark] class CoGroupPartition(
idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, let's make it explicit that the size of the array == number of parents.

@rxin
Copy link
Contributor

rxin commented Apr 7, 2015

LGTM otherwise!

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29812 has finished for PR 4145 at commit 229d263.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 13, 2015

Test build #30189 has finished for PR 4145 at commit 85156c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Assignment(id: Long, cluster: Int)
  • This patch does not change any dependencies.

@kayousterhout
Copy link
Contributor Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Apr 20, 2015

Test build #30604 has finished for PR 4145 at commit 85156c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@asfgit asfgit closed this in c035c0f Apr 21, 2015
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…oGroupedRDD

CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle. The partition is serialized separately from the RDD, so when the RDD and partition arrive on the worker, the references in the partition and in the RDD no longer point to the same object.

This is a relatively minor performance issue (the closure can be 2x larger than it needs to be because the rdds and partitions are serialized twice; see numbers below) but is more annoying as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle on the worker side, subtle bugs can appear due to the fact that the references to the RDD / ShuffleHandle in the RDD and in the partition point to separate objects. I'm not sure if this is enough of a potential future problem to fix this old and central part of the code, so hoping to get input from others here.

I did some simple experiments to see how much this effects closure size. For this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my change. The difference comes from eliminating duplicate serialization of the shuffle handle.

For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies.

The ShuffleHandle includes the ShuffleDependency, so this difference will get larger if a ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to None by default). It would also get bigger for a big RDD -- although I can't think of any examples where the RDD object gets large.  The difference is not affected by the size of the function the user specifies, which (based on my understanding) is typically the source of large task closures.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes apache#4145 from kayousterhout/SPARK-5360 and squashes the following commits:

85156c3 [Kay Ousterhout] Better comment the narrowDeps parameter
cff0209 [Kay Ousterhout] Fixed spelling issue
658e1af [Kay Ousterhout] [SPARK-5360] Eliminate duplicate objects in serialized CoGroupedRDD
@kayousterhout kayousterhout deleted the SPARK-5360 branch April 12, 2017 00:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants