-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp... #4136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ok to test |
This is the same fix as was originally proposed for SPARK-2823: #1763. The problem is that it seems to trigger a Spark bug (SPARK-3400, SPARK-3630) that causes deserialization errors. |
Test build #25910 has finished for PR 4136 at commit
|
Jenkins, retest this please. |
@ankurdave Does it still trigger that bug? How reliably? I've spent a huge amount of time fighting nondeterministic Snappy stream corruption errors in Spark, so I'd be very interested if you've found a semi-reliable reproduction in the current Spark master. |
Test build #25916 has finished for PR 4136 at commit
|
This is a spurious test failure due to a build break in SQL. Let's retest: |
Jenkins, retest this please. |
Test build #26004 has finished for PR 4136 at commit
|
@JoshRosen No, it doesn't seem to trigger the Snappy error! After the previous attempted fix (#1763, 9b225ac), the GraphX unit tests ( I think we can merge this! I'm just going to bisect to see what fixed the error. |
@ankurdave Don't know if you've already seen this, but check out https://issues.apache.org/jira/browse/SPARK-3630 and its linked tickets for some related Snappy errors that and how they've been fixed. It's possible that one of those patches may have fixed the GraphX manifestation of the Snappy problem. |
@JoshRosen Actually, it seems the test failures still occur, but only when I add a unit test that sets spark.default.parallelism. Adding the test causes subsequent tests within the same run to fail with exceptions like
and
The exception traces always occur in TorrentBroadcast. It seems like setting spark.default.parallelism is causing some kind of side effect that corrupts broadcasts in later unit tests, which is strange since (1) each unit test should have its own SparkContext and therefore its own temp directory, and (2) I'm only passing spark.default.parallelism to SparkConf/SparkContext, not setting it as a system property. |
@ankurdave The exception from the new unit test sounds suspiciously similar to https://issues.apache.org/jira/browse/SPARK-4133. Your new test creates a new Because some SparkSQL tests could not pass without it, our unit tests set I'd check to see if those failures still persist after properly cleaning up the SparkContext created in your new test. |
If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions Author: luluorta <luluorta@gmail.com> Closes #1763 from luluorta/fix-graph-zip and squashes the following commits: 8338961 [luluorta] fix GraphX EdgeRDD zipPartitions
Oh, thanks! Looks like that was the problem all along; stopping the SparkContext fixes the problem. I'm going to merge this with the amended test now. |
…artitioner of EdgeRDDImp... If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl), the following error occurs in ReplicatedVertexView.scala:72; object GraphTest extends Logging { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = { graph.aggregateMessages( ctx => { ctx.sendToSrc(1) ctx.sendToDst(2) }, _ + _) } } val g = GraphLoader.edgeListFile(sc, "graph.txt") val rdd = GraphTest.run(g) java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191) ... Author: Takeshi Yamamuro <linguin.m.s@gmail.com> Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits: 0cd8942 [Ankur Dave] Use more concise getOrElse aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions 0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl (cherry picked from commit e224dbb) Signed-off-by: Ankur Dave <ankurdave@gmail.com>
Merged into master & branch-1.2. |
Thanks for your quick commits! I got trouble in this bug for my graphx application :)) |
If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;
object GraphTest extends Logging {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
graph.aggregateMessages(
ctx => {
ctx.sendToSrc(1)
ctx.sendToDst(2)
},
_ + _)
}
}
val g = GraphLoader.edgeListFile(sc, "graph.txt")
val rdd = GraphTest.run(g)
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.ShuffleDependency.(Dependency.scala:82)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
...