Skip to content

Commit 476770b

Browse files
committed
ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
1 parent 614059f commit 476770b

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

-2
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,6 @@ object VertexRDD {
406406
* @param edges the [[EdgeRDD]] that these vertices may be joined with
407407
* @param defaultVal the vertex attribute to use when creating missing vertices
408408
* @param mergeFunc the commutative, associative duplicate vertex attribute merge function
409-
* note that all vertices with default value created upon construction in VertexPartition
410-
* so it will appear as b in (a, b) pair for mergeFunc.
411409
*/
412410
def apply[VD: ClassTag](
413411
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD

graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala

+13-7
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ object ShippableVertexPartition {
4444
*/
4545
def apply[VD: ClassTag](
4646
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
47-
: ShippableVertexPartition[VD] =
47+
: ShippableVertexPartition[VD] =
4848
apply(iter, routingTable, defaultVal, (a, b) => a)
4949

5050
/**
@@ -54,12 +54,18 @@ object ShippableVertexPartition {
5454
*/
5555
def apply[VD: ClassTag](
5656
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
57-
mergeFunc: (VD, VD) => VD
58-
)
59-
: ShippableVertexPartition[VD] = {
60-
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
61-
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc)
62-
new ShippableVertexPartition(index, values, mask, routingTable)
57+
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
58+
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
59+
// Merge the given vertices using mergeFunc
60+
iter.foreach { pair =>
61+
map.setMerge(pair._1, pair._2, mergeFunc)
62+
}
63+
// Fill in missing vertices mentioned in the routing table
64+
routingTable.iterator.foreach { vid =>
65+
map.changeValue(vid, defaultVal, identity)
66+
}
67+
68+
new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
6369
}
6470

6571
import scala.language.implicitConversions

0 commit comments

Comments
 (0)