Skip to content

Commit c88b269

Browse files
committed
Revert upgradeIterator to if-in-a-loop
1 parent 0d3584c commit c88b269

File tree

2 files changed

+32
-29
lines changed

2 files changed

+32
-29
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -293,32 +293,7 @@ class EdgePartition[
293293
def upgradeIterator(
294294
edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
295295
: Iterator[EdgeTriplet[VD, ED]] = {
296-
val tripletIter = new Iterator[EdgeTriplet[VD, ED]] {
297-
private[this] val triplet = new EdgeTriplet[VD, ED]
298-
override def hasNext = edgeIter.hasNext
299-
override def next() = {
300-
triplet.set(edgeIter.next())
301-
}
302-
}
303-
val withSrc =
304-
if (includeSrc) {
305-
tripletIter.map { triplet =>
306-
triplet.srcAttr = EdgePartition.this.vertices(triplet.srcId)
307-
triplet
308-
}
309-
} else {
310-
tripletIter
311-
}
312-
val withDst =
313-
if (includeDst) {
314-
withSrc.map { triplet =>
315-
triplet.dstAttr = EdgePartition.this.vertices(triplet.dstId)
316-
triplet
317-
}
318-
} else {
319-
withSrc
320-
}
321-
withDst
296+
new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
322297
}
323298

324299
/**

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ import org.apache.spark.graphx._
2323
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
2424

2525
/**
26-
* The Iterator type returned when constructing edge triplets. This class technically could be
27-
* an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
28-
* debug / profile.
26+
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
27+
* EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
2928
*/
3029
private[impl]
3130
class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
@@ -54,3 +53,32 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
5453
triplet
5554
}
5655
}
56+
57+
/**
58+
* An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
59+
* class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
60+
* profile.
61+
*/
62+
private[impl]
63+
class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
64+
val edgeIter: Iterator[Edge[ED]],
65+
val edgePartition: EdgePartition[ED, VD],
66+
val includeSrc: Boolean,
67+
val includeDst: Boolean)
68+
extends Iterator[EdgeTriplet[VD, ED]] {
69+
70+
private val triplet = new EdgeTriplet[VD, ED]
71+
72+
override def hasNext = edgeIter.hasNext
73+
74+
override def next() = {
75+
triplet.set(edgeIter.next())
76+
if (includeSrc) {
77+
triplet.srcAttr = edgePartition.vertices(triplet.srcId)
78+
}
79+
if (includeDst) {
80+
triplet.dstAttr = edgePartition.vertices(triplet.dstId)
81+
}
82+
triplet
83+
}
84+
}

0 commit comments

Comments
 (0)