Skip to content

Commit 2da5e87

Browse files
committed
Restore object re-use in EdgePartition.
1 parent 0182f2b commit 2da5e87

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,18 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
6262
* applied to each edge
6363
*/
6464
def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
65-
val newData = (0 until data.size).map(i => f(Edge(srcIds(i), dstIds(i), data(i))))
66-
new EdgePartition(srcIds, dstIds, newData.toArray, index)
65+
val newData = new Array[ED2](data.size)
66+
val edge = new Edge[ED]()
67+
val size = data.size
68+
var i = 0
69+
while (i < size) {
70+
edge.srcId = srcIds(i)
71+
edge.dstId = dstIds(i)
72+
edge.attr = data(i)
73+
newData(i) = f(edge)
74+
i += 1
75+
}
76+
new EdgePartition(srcIds, dstIds, newData, index)
6777
}
6878

6979
/**
@@ -175,12 +185,15 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
175185
* @return an iterator over edges in the partition
176186
*/
177187
def iterator = new Iterator[Edge[ED]] {
188+
private[this] val edge = new Edge[ED]
178189
private[this] var pos = 0
179190

180191
override def hasNext: Boolean = pos < EdgePartition.this.size
181192

182193
override def next(): Edge[ED] = {
183-
val edge = Edge(srcIds(pos), dstIds(pos), data(pos))
194+
edge.srcId = srcIds(pos)
195+
edge.dstId = dstIds(pos)
196+
edge.attr = data(pos)
184197
pos += 1
185198
edge
186199
}
@@ -199,6 +212,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
199212
* cluster must start at position `index`.
200213
*/
201214
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
215+
private[this] val edge = new Edge[ED]
202216
private[this] var pos = index
203217

204218
override def hasNext: Boolean = {
@@ -207,7 +221,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
207221

208222
override def next(): Edge[ED] = {
209223
assert(srcIds(pos) == srcId)
210-
val edge = Edge(srcIds(pos), dstIds(pos), data(pos))
224+
edge.srcId = srcIds(pos)
225+
edge.dstId = dstIds(pos)
226+
edge.attr = data(pos)
211227
pos += 1
212228
edge
213229
}

0 commit comments

Comments
 (0)