Skip to content

[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp... #4136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Jan 21, 2015

If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;

object GraphTest extends Logging {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
graph.aggregateMessages(
ctx => {
ctx.sendToSrc(1)
ctx.sendToDst(2)
},
_ + _)
}
}

val g = GraphLoader.edgeListFile(sc, "graph.txt")
val rdd = GraphTest.run(g)

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.ShuffleDependency.(Dependency.scala:82)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
...

@ankurdave
Copy link
Contributor

ok to test

@ankurdave
Copy link
Contributor

This is the same fix as was originally proposed for SPARK-2823: #1763. The problem is that it seems to trigger a Spark bug (SPARK-3400, SPARK-3630) that causes deserialization errors.

@SparkQA
Copy link

SparkQA commented Jan 21, 2015

Test build #25910 has finished for PR 4136 at commit 0a2f32b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor

@ankurdave Does it still trigger that bug? How reliably? I've spent a huge amount of time fighting nondeterministic Snappy stream corruption errors in Spark, so I'd be very interested if you've found a semi-reliable reproduction in the current Spark master.

@SparkQA
Copy link

SparkQA commented Jan 21, 2015

Test build #25916 has finished for PR 4136 at commit 0a2f32b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

This is a spurious test failure due to a build break in SQL. Let's retest:

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #26004 has finished for PR 4136 at commit 0a2f32b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ankurdave
Copy link
Contributor

@JoshRosen No, it doesn't seem to trigger the Snappy error! After the previous attempted fix (#1763, 9b225ac), the GraphX unit tests (for i in {1..10}; do sbt/sbt 'graphx/test:test-only org.apache.spark.graphx.*'; done) would fail 3 out of 10 times, but they always succeed now.

I think we can merge this! I'm just going to bisect to see what fixed the error.

@JoshRosen
Copy link
Contributor

@ankurdave Don't know if you've already seen this, but check out https://issues.apache.org/jira/browse/SPARK-3630 and its linked tickets for some related Snappy errors that and how they've been fixed. It's possible that one of those patches may have fixed the GraphX manifestation of the Snappy problem.

@ankurdave
Copy link
Contributor

@JoshRosen Actually, it seems the test failures still occur, but only when I add a unit test that sets spark.default.parallelism.

Adding the test causes subsequent tests within the same run to fail with exceptions like

java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0

and

java.io.IOException: PARSING_ERROR(2)

The exception traces always occur in TorrentBroadcast.

It seems like setting spark.default.parallelism is causing some kind of side effect that corrupts broadcasts in later unit tests, which is strange since (1) each unit test should have its own SparkContext and therefore its own temp directory, and (2) I'm only passing spark.default.parallelism to SparkConf/SparkContext, not setting it as a system property.

@JoshRosen
Copy link
Contributor

@ankurdave The exception from the new unit test sounds suspiciously similar to https://issues.apache.org/jira/browse/SPARK-4133. Your new test creates a new sc local variable then never stops it, so if that test runs first then its leaked context will keep running and will interfere with contexts created in the other tests.

Because some SparkSQL tests could not pass without it, our unit tests set spark.driver.allowMultipleContexts=false to disable the check, so this might be hard to notice. If you have unit-tests.log, though, I'd take a look to see whether there are any warning messages about multiple contexts.

I'd check to see if those failures still persist after properly cleaning up the SparkContext created in your new test.

JoshRosen referenced this pull request Jan 24, 2015
If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions

Author: luluorta <luluorta@gmail.com>

Closes #1763 from luluorta/fix-graph-zip and squashes the following commits:

8338961 [luluorta] fix GraphX EdgeRDD zipPartitions
@ankurdave
Copy link
Contributor

Oh, thanks! Looks like that was the problem all along; stopping the SparkContext fixes the problem. I'm going to merge this with the amended test now.

@asfgit asfgit closed this in e224dbb Jan 24, 2015
asfgit pushed a commit that referenced this pull request Jan 24, 2015
…artitioner of EdgeRDDImp...

If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;

object GraphTest extends Logging {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
    graph.aggregateMessages(
      ctx => {
        ctx.sendToSrc(1)
        ctx.sendToDst(2)
      },
      _ + _)
  }
}

val g = GraphLoader.edgeListFile(sc, "graph.txt")
val rdd = GraphTest.run(g)

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
	at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
    ...

Author: Takeshi Yamamuro <linguin.m.s@gmail.com>

Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits:

0cd8942 [Ankur Dave] Use more concise getOrElse
aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions
0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl

(cherry picked from commit e224dbb)
Signed-off-by: Ankur Dave <ankurdave@gmail.com>
@ankurdave
Copy link
Contributor

Merged into master & branch-1.2.

@maropu
Copy link
Member Author

maropu commented Jan 24, 2015

Thanks for your quick commits! I got trouble in this bug for my graphx application :))

@maropu maropu deleted the EdgePartitionBugFix branch July 5, 2017 11:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants