Skip to content

Commit efae765

Browse files
committed
fix mistakes: should be able to call with or without mergeFunc
1 parent b2422f9 commit efae765

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ object VertexRDD {
419419
(vertexIter, routingTableIter) =>
420420
val routingTable =
421421
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
422-
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
422+
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
423423
}
424424
new VertexRDD(vertexPartitions)
425425
}

graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,16 @@ private[graphx]
3636
object ShippableVertexPartition {
3737
/** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
3838
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
39-
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
39+
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a)
40+
41+
/**
42+
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
43+
* table, filling in missing vertices mentioned in the routing table using `defaultVal`.
44+
*/
45+
def apply[VD: ClassTag](
46+
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
47+
: ShippableVertexPartition[VD] =
48+
apply(iter, routingTable, defaultVal, (a, b) => a)
4049

4150
/**
4251
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing

0 commit comments

Comments
 (0)