Skip to content

Commit e27da44

Browse files
committed
Check for empty iterators
1 parent 5ec645d commit e27da44

File tree

3 files changed

+118
-75
lines changed

3 files changed

+118
-75
lines changed

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ class EdgeRDD[@specialized ED: ClassTag](
4545
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
4646

4747
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
48+
val partIter = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
49+
if (partIter.hasNext) {
50+
partIter.next._2.iterator
51+
} else {
52+
Iterator.empty
53+
}
4954
}
5055

5156
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -69,8 +74,12 @@ class EdgeRDD[@specialized ED: ClassTag](
6974
private[graphx] def mapEdgePartitions[ED2: ClassTag](
7075
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
7176
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
72-
val (pid, ep) = iter.next()
73-
Iterator(Tuple2(pid, f(pid, ep)))
77+
if (iter.hasNext) {
78+
val (pid, ep) = iter.next()
79+
Iterator(Tuple2(pid, f(pid, ep)))
80+
} else {
81+
Iterator.empty
82+
}
7483
}, preservesPartitioning = true))
7584
}
7685

@@ -107,9 +116,13 @@ class EdgeRDD[@specialized ED: ClassTag](
107116
val ed3Tag = classTag[ED3]
108117
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
109118
(thisIter, otherIter) =>
110-
val (pid, thisEPart) = thisIter.next()
111-
val (_, otherEPart) = otherIter.next()
112-
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
119+
if (thisIter.hasNext && otherIter.hasNext) {
120+
val (pid, thisEPart) = thisIter.next()
121+
val (_, otherEPart) = otherIter.next()
122+
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
123+
} else {
124+
Iterator.empty
125+
}
113126
})
114127
}
115128

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
5959
val edTag = classTag[ED]
6060
edges.partitionsRDD.zipPartitions(
6161
replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
62-
val (pid, ePart) = ePartIter.next()
63-
val (_, vPart) = vPartIter.next()
64-
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
62+
if (ePartIter.hasNext && vPartIter.hasNext) {
63+
val (pid, ePart) = ePartIter.next()
64+
val (_, vPart) = vPartIter.next()
65+
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
66+
} else {
67+
Iterator.empty
68+
}
6569
}
6670
}
6771

@@ -132,22 +136,26 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
132136
val newEdgePartitions =
133137
edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
134138
(ePartIter, vTableReplicatedIter) =>
135-
val (ePid, edgePartition) = ePartIter.next()
136-
val (vPid, vPart) = vTableReplicatedIter.next()
137-
assert(!vTableReplicatedIter.hasNext)
138-
assert(ePid == vPid)
139-
val et = new EdgeTriplet[VD, ED]
140-
val inputIterator = edgePartition.iterator.map { e =>
141-
et.set(e)
142-
et.srcAttr = vPart(e.srcId)
143-
et.dstAttr = vPart(e.dstId)
144-
et
145-
}
146-
// Apply the user function to the vertex partition
147-
val outputIter = f(ePid, inputIterator)
148-
// Consume the iterator to update the edge attributes
149-
val newEdgePartition = edgePartition.map(outputIter)
150-
Iterator((ePid, newEdgePartition))
139+
if (ePartIter.hasNext && vTableReplicatedIter.hasNext) {
140+
val (ePid, edgePartition) = ePartIter.next()
141+
val (vPid, vPart) = vTableReplicatedIter.next()
142+
assert(!vTableReplicatedIter.hasNext)
143+
assert(ePid == vPid)
144+
val et = new EdgeTriplet[VD, ED]
145+
val inputIterator = edgePartition.iterator.map { e =>
146+
et.set(e)
147+
et.srcAttr = vPart(e.srcId)
148+
et.dstAttr = vPart(e.dstId)
149+
et
150+
}
151+
// Apply the user function to the vertex partition
152+
val outputIter = f(ePid, inputIterator)
153+
// Consume the iterator to update the edge attributes
154+
val newEdgePartition = edgePartition.map(outputIter)
155+
Iterator((ePid, newEdgePartition))
156+
} else {
157+
Iterator.empty
158+
}
151159
}
152160
new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
153161
}
@@ -217,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
217225

218226
// Map and combine.
219227
val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
220-
if (ePartIter.hasNext) {
228+
if (ePartIter.hasNext && vPartIter.hasNext) {
221229
val (ePid, edgePartition) = ePartIter.next()
222230
val (vPid, vPart) = vPartIter.next()
223231
assert(!vPartIter.hasNext)

graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala

Lines changed: 71 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,13 @@ class ReplicatedVertexView[VD: ClassTag](
103103
// Update the view with shippedActives, setting activeness flags in the resulting
104104
// VertexPartitions
105105
get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
106-
val (pid, vPart) = viewIter.next()
107-
val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
108-
Iterator((pid, newPart))
106+
if (viewIter.hasNext) {
107+
val (pid, vPart) = viewIter.next()
108+
val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
109+
Iterator((pid, newPart))
110+
} else {
111+
Iterator.empty
112+
}
109113
}
110114
}
111115

@@ -126,30 +130,40 @@ class ReplicatedVertexView[VD: ClassTag](
126130
// VertexPartitions
127131
prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
128132
(prevViewIter, shippedVertsIter) =>
129-
val (pid, prevVPart) = prevViewIter.next()
130-
val newVPart = prevVPart.innerJoinKeepLeftDestructive(shippedVertsIter.flatMap(_._2.iterator))
131-
Iterator((pid, newVPart))
133+
if (prevViewIter.hasNext) {
134+
val (pid, prevVPart) = prevViewIter.next()
135+
val newVPart =
136+
if (destructiveLocal) prevVPart.innerJoinKeepLeftDestructive(shippedVertsIter.flatMap(_._2.iterator))
137+
else prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
138+
Iterator((pid, newVPart))
139+
} else {
140+
Iterator.empty
141+
}
132142
}.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
133143

134144
case None =>
135145
// Within each edge partition, place the shipped vertex attributes into the correct
136146
// locations specified in localVertexIdMap
137147
localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
138-
val (pid, vidToIndex) = mapIter.next()
139-
assert(!mapIter.hasNext)
140-
// Populate the vertex array using the vidToIndex map
141-
val vertexArray = vdTag.newArray(vidToIndex.capacity)
142-
for ((_, block) <- shippedVertsIter) {
143-
for (i <- 0 until block.vids.size) {
144-
val vid = block.vids(i)
145-
val attr = block.attrs(i)
146-
val ind = vidToIndex.getPos(vid)
147-
vertexArray(ind) = attr
148+
if (mapIter.hasNext) {
149+
val (pid, vidToIndex) = mapIter.next()
150+
assert(!mapIter.hasNext)
151+
// Populate the vertex array using the vidToIndex map
152+
val vertexArray = vdTag.newArray(vidToIndex.capacity)
153+
for ((_, block) <- shippedVertsIter) {
154+
for (i <- 0 until block.vids.size) {
155+
val vid = block.vids(i)
156+
val attr = block.attrs(i)
157+
val ind = vidToIndex.getPos(vid)
158+
vertexArray(ind) = attr
159+
}
148160
}
161+
val newVPart = new VertexPartition(
162+
vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
163+
Iterator((pid, newVPart))
164+
} else {
165+
Iterator.empty
149166
}
150-
val newVPart = new VertexPartition(
151-
vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
152-
Iterator((pid, newVPart))
153167
}.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
154168
}
155169
}
@@ -159,47 +173,55 @@ private object ReplicatedVertexView {
159173
protected def buildBuffer[VD: ClassTag](
160174
pid2vidIter: Iterator[Array[Array[VertexId]]],
161175
vertexPartIter: Iterator[VertexPartition[VD]]) = {
162-
val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
163-
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
164-
165-
Iterator.tabulate(pid2vid.size) { pid =>
166-
val vidsCandidate = pid2vid(pid)
167-
val size = vidsCandidate.length
168-
val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
169-
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
170-
var i = 0
171-
while (i < size) {
172-
val vid = vidsCandidate(i)
173-
if (vertexPart.isDefined(vid)) {
174-
vids += vid
175-
attrs += vertexPart(vid)
176+
if (pid2vidIter.hasNext && vertexPartIter.hasNext) {
177+
val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
178+
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
179+
180+
Iterator.tabulate(pid2vid.size) { pid =>
181+
val vidsCandidate = pid2vid(pid)
182+
val size = vidsCandidate.length
183+
val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
184+
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
185+
var i = 0
186+
while (i < size) {
187+
val vid = vidsCandidate(i)
188+
if (vertexPart.isDefined(vid)) {
189+
vids += vid
190+
attrs += vertexPart(vid)
191+
}
192+
i += 1
176193
}
177-
i += 1
194+
(pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
178195
}
179-
(pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
196+
} else {
197+
Iterator.empty
180198
}
181199
}
182200

183201
protected def buildActiveBuffer(
184202
pid2vidIter: Iterator[Array[Array[VertexId]]],
185203
activePartIter: Iterator[VertexPartition[_]])
186204
: Iterator[(Int, Array[VertexId])] = {
187-
val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
188-
val activePart: VertexPartition[_] = activePartIter.next()
189-
190-
Iterator.tabulate(pid2vid.size) { pid =>
191-
val vidsCandidate = pid2vid(pid)
192-
val size = vidsCandidate.length
193-
val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
194-
var i = 0
195-
while (i < size) {
196-
val vid = vidsCandidate(i)
197-
if (activePart.isDefined(vid)) {
198-
actives += vid
205+
if (pid2vidIter.hasNext && activePartIter.hasNext) {
206+
val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
207+
val activePart: VertexPartition[_] = activePartIter.next()
208+
209+
Iterator.tabulate(pid2vid.size) { pid =>
210+
val vidsCandidate = pid2vid(pid)
211+
val size = vidsCandidate.length
212+
val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
213+
var i = 0
214+
while (i < size) {
215+
val vid = vidsCandidate(i)
216+
if (activePart.isDefined(vid)) {
217+
actives += vid
218+
}
219+
i += 1
199220
}
200-
i += 1
221+
(pid, actives.trim().array)
201222
}
202-
(pid, actives.trim().array)
223+
} else {
224+
Iterator.empty
203225
}
204226
}
205227
}

0 commit comments

Comments
 (0)