Skip to content

[SPARK-6606][CORE]Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. #5259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

suyanNone
Copy link
Contributor

  1. Use code like belows, will found accumulator deserialized twice.
    first:
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

second:

val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

which the first deserialized is not what expected.
because ResultTask or ShuffleMapTask will have a partition object.
in class

CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
, the CogroupPartition may contains a CoGroupDep:
NarrowCoGroupSplitDep(
    rdd: RDD[_],
    splitIndex: Int,
    var split: Partition
  ) extends CoGroupSplitDep {

in that NarrowCoGroupSplitDep, it will bring into rdd object, which result into the first deserialized.

example:

   val acc1 = sc.accumulator(0, "test1")
    val acc2 = sc.accumulator(0, "test2")
    val rdd1 = sc.parallelize((1 to 10).toSeq, 3)
    val rdd2 = sc.parallelize((1 to 10).toSeq, 3)
    val combine1 = rdd1.map { case a => (a, 1)}.combineByKey(a => {
      acc1 += 1
      a
    }, (a: Int, b: Int) => {
      a + b
    },
      (a: Int, b: Int) => {
        a + b
      }, new HashPartitioner(3), mapSideCombine = false)

    val combine2 = rdd2.map { case a => (a, 1)}.combineByKey(
      a => {
        acc2 += 1
        a
      },
      (a: Int, b: Int) => {
        a + b
      },
      (a: Int, b: Int) => {
        a + b
      }, new HashPartitioner(3), mapSideCombine = false)

    combine1.cogroup(combine2, new HashPartitioner(3)).count()

@suyanNone suyanNone changed the title Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. [SPARK-6606]Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. Mar 30, 2015
@suyanNone suyanNone changed the title [SPARK-6606]Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. [SPARK-6606][CORE]Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. Mar 30, 2015
@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29393 has started for PR 5259 at commit 2fde066.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29393 has finished for PR 5259 at commit 2fde066.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class CoGroupedRDD[K](var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29393/
Test PASSed.

@sryza
Copy link
Contributor

sryza commented Mar 30, 2015

Is this the same as the issue in SPARK-5360? @kayousterhout

@JoshRosen
Copy link
Contributor

Just to understand this issue a little better, does this cause a correctness issue where we'd give the wrong answer? If so, can we add a regression test?

@kayousterhout
Copy link
Contributor

@sryza thanks for noticing this -- this is a subset of SPARK-5360 (#4145 also fixes the issue that the ShuffleHandle is included twice), so I'd be in favor of merging #4145 so we can just fix both issues.

@suyanNone
Copy link
Contributor Author

@kayousterhout I had just search "accumulators" in global issues before I pull request. I has read you patch, I also prefer SPARK-5360.

@suyanNone suyanNone closed this Mar 31, 2015
@suyanNone
Copy link
Contributor Author

Duplicate with @kayousterhout SPARK-5360, and 5360 is better solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants