Skip to content

Commit d4a8073

Browse files
luluortaconviva-zz
authored andcommitted
[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions
If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions Author: luluorta <luluorta@gmail.com> Closes apache#1763 from luluorta/fix-graph-zip and squashes the following commits: 8338961 [luluorta] fix GraphX EdgeRDD zipPartitions
1 parent 51b0b82 commit d4a8073

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
1919

2020
import scala.reflect.{classTag, ClassTag}
2121

22-
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
22+
import org.apache.spark._
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.storage.StorageLevel
2525

@@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
5555
* partitioner that allows co-partitioning with `partitionsRDD`.
5656
*/
5757
override val partitioner =
58-
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
58+
partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size)))
5959

6060
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
6161
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
1919

2020
import org.scalatest.FunSuite
2121

22+
import org.apache.spark.SparkConf
2223
import org.apache.spark.SparkContext
2324
import org.apache.spark.graphx.Graph._
2425
import org.apache.spark.graphx.PartitionStrategy._
@@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
350351
}
351352
}
352353

354+
test("non-default number of edge partitions") {
355+
val n = 10
356+
val defaultParallelism = 3
357+
val numEdgePartitions = 4
358+
assert(defaultParallelism != numEdgePartitions)
359+
val conf = new SparkConf()
360+
.set("spark.default.parallelism", defaultParallelism.toString)
361+
val sc = new SparkContext("local", "test", conf)
362+
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
363+
numEdgePartitions)
364+
val graph = Graph.fromEdgeTuples(edges, 1)
365+
val neighborAttrSums = graph.mapReduceTriplets[Int](
366+
et => Iterator((et.dstId, et.srcAttr)), _ + _)
367+
assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
368+
}
353369
}

0 commit comments

Comments
 (0)