Skip to content

Commit 0a2f32b

Browse files
committed
Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl
1 parent 424d8c6 commit 0a2f32b

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl
1919

2020
import scala.reflect.{classTag, ClassTag}
2121

22-
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
22+
import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext}
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.storage.StorageLevel
2525

@@ -45,8 +45,13 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
4545
* [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new
4646
* partitioner that allows co-partitioning with `partitionsRDD`.
4747
*/
48-
override val partitioner =
49-
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
48+
override val partitioner = {
49+
if (partitionsRDD.partitioner.isDefined) {
50+
partitionsRDD.partitioner
51+
} else {
52+
Some(new HashPartitioner(partitions.size))
53+
}
54+
}
5055

5156
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
5257

0 commit comments

Comments
 (0)