Skip to content

Commit fad630f

Browse files
committed
more debugging. Seems like we have empty partitions.
1 parent b788bfb commit fad630f

File tree

5 files changed

+137
-72
lines changed

5 files changed

+137
-72
lines changed

conf/core-site.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
<property>
1414
<name>fs.default.name</name>
15-
<value>hdfs://ec2-184-72-130-69.compute-1.amazonaws.com:9000</value>
15+
<value>hdfs://ec2-54-80-197-211.compute-1.amazonaws.com:9000</value>
1616
</property>
1717

1818
<property>

conf/slaves

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
ec2-54-80-151-251.compute-1.amazonaws.com
2-
ec2-54-242-226-224.compute-1.amazonaws.com
3-
ec2-54-205-35-26.compute-1.amazonaws.com
4-
ec2-184-72-164-33.compute-1.amazonaws.com
5-
ec2-107-20-33-174.compute-1.amazonaws.com
6-
ec2-54-80-2-210.compute-1.amazonaws.com
7-
ec2-50-17-77-137.compute-1.amazonaws.com
8-
ec2-174-129-164-255.compute-1.amazonaws.com
9-
ec2-23-20-81-32.compute-1.amazonaws.com
10-
ec2-54-80-236-6.compute-1.amazonaws.com
11-
ec2-54-226-129-134.compute-1.amazonaws.com
12-
ec2-54-221-94-96.compute-1.amazonaws.com
13-
ec2-23-22-81-129.compute-1.amazonaws.com
14-
ec2-23-23-43-146.compute-1.amazonaws.com
15-
ec2-54-196-107-67.compute-1.amazonaws.com
16-
ec2-23-20-48-31.compute-1.amazonaws.com
1+
ec2-54-81-55-13.compute-1.amazonaws.com
2+
ec2-54-80-42-122.compute-1.amazonaws.com
3+
ec2-107-20-119-33.compute-1.amazonaws.com
4+
ec2-54-224-98-126.compute-1.amazonaws.com
5+
ec2-54-80-27-215.compute-1.amazonaws.com
6+
ec2-54-221-106-106.compute-1.amazonaws.com
7+
ec2-23-21-15-4.compute-1.amazonaws.com
8+
ec2-50-16-37-65.compute-1.amazonaws.com
9+
ec2-54-80-199-27.compute-1.amazonaws.com
10+
ec2-54-227-141-97.compute-1.amazonaws.com
11+
ec2-54-80-200-145.compute-1.amazonaws.com
12+
ec2-23-22-155-180.compute-1.amazonaws.com
13+
ec2-54-197-154-251.compute-1.amazonaws.com
14+
ec2-54-227-50-5.compute-1.amazonaws.com
15+
ec2-54-197-117-246.compute-1.amazonaws.com
16+
ec2-54-196-135-53.compute-1.amazonaws.com

graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
5252
kryo.register(classOf[LongWritable])
5353
kryo.register(classOf[Text])
5454
kryo.register(classOf[WikiArticle])
55-
kryo.register(classOf[JHashSet[VertexId]])
55+
// kryo.register(classOf[JHashSet[VertexId]])
5656
kryo.register(classOf[JTreeSet[VertexId]])
57+
kryo.register(classOf[TrackCounts])
5758
// kryo.register(classOf[MakeString])
5859
// kryo.register(classOf[PrePostProcessWikipedia])
5960
// kryo.register(classOf[(LongWritable, Text)])

graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,43 @@ import org.apache.spark.Logging
1414
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
1515
// import org.apache.spark.graphx.MakeString
1616

17+
class TrackCounts extends Serializable {
18+
19+
var red: Long = 0
20+
var stub: Long = 0
21+
var disambig: Long = 0
22+
var notFound: Long = 0
23+
var titleNull: Long = 0
24+
var relevant: Long = 0
25+
var total: Long = 0
26+
27+
def update(o: TrackCounts) {
28+
red += o.red
29+
stub += o.stub
30+
disambig += o.disambig
31+
notFound += o.notFound
32+
titleNull += o.titleNull
33+
relevant += o.relevant
34+
total += o.total
35+
}
36+
37+
def addArticle(art: WikiArticle) {
38+
if (art.redirect) red += 1
39+
if (art.stub) stub += 1
40+
if (art.disambig) disambig += 1
41+
if (art.title == WikiArticle.notFoundString) notFound += 1
42+
if (art.title == null) titleNull += 1
43+
if (art.relevant) relevant += 1
44+
total += 1
45+
}
46+
47+
override def toString: String = {
48+
s"Redirects: $red, Stubs: $stub, Disambig: $disambig, Not Found: $notFound, Null: $titleNull, RELEVANT: $relevant, TOTAL: $total"
49+
50+
}
51+
52+
}
53+
1754

1855
object PrePostProcessWikipedia extends Logging {
1956

@@ -104,19 +141,39 @@ object PrePostProcessWikipedia extends Logging {
104141
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
105142

106143
val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }.cache
107-
val numRedirects = allArtsRDD.filter { art => art.redirect }.count
108-
val numStubs = allArtsRDD.filter { art => art.stub }.count
109-
val numDisambig = allArtsRDD.filter { art => art.disambig }.count
110-
val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count
111-
logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound")
144+
// val numRedirects = allArtsRDD.filter { art => art.redirect }.count
145+
// val numStubs = allArtsRDD.filter { art => art.stub }.count
146+
// val numDisambig = allArtsRDD.filter { art => art.disambig }.count
147+
// val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count
148+
// logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound")
112149

113-
val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
114-
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles.")
150+
val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
151+
wikiRDD.repartition(128)
152+
// val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
153+
val wikiRDDCount = wikiRDD.count
154+
logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.")
155+
// logWarning("Counting differently")
156+
157+
// count: redirects, stubs, disambigs, titlenotfound, titlenull, relevant, total
158+
// val zeroCount = new TrackCounts
159+
// val countSeqOp = (curCount: TrackCounts, art: WikiArticle) => {
160+
// curCount.addArticle(art)
161+
// curCount
162+
// }
163+
// val countCombOp = (c1: TrackCounts, c2: TrackCounts) => {
164+
// c1.update(c2)
165+
// c1
166+
// }
167+
//
168+
// val cr = allArtsRDD.aggregate(zeroCount)(countSeqOp, countCombOp)
169+
// logWarning(s"Different count results: $cr")
170+
// System.exit(0)
171+
115172
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
116173
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
117174
logWarning("creating graph")
118175
val g = Graph(vertices, edges)
119-
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null)
176+
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache
120177
logWarning(s"DIRTY graph has ${g.triplets.count()} EDGES, ${g.vertices.count()} VERTICES")
121178
logWarning(s"CLEAN graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES")
122179
val resultG = pagerankConnComponentsAlt(numIters, cleanG)
@@ -134,12 +191,13 @@ object PrePostProcessWikipedia extends Logging {
134191
var currentGraph = g
135192
logWarning("starting iterations")
136193
for (i <- 0 to numRepetitions) {
194+
currentGraph.cache
137195
val startTime = System.currentTimeMillis
138196
logWarning("starting pagerank")
139-
val pr = PageRank.run(currentGraph, 20)
197+
val pr = PageRank.run(currentGraph, 20).cache
140198
pr.vertices.count
141199
logWarning("Pagerank completed")
142-
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
200+
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache
143201
prAndTitle.vertices.count
144202
logWarning("join completed.")
145203
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
@@ -149,8 +207,8 @@ object PrePostProcessWikipedia extends Logging {
149207
val filterTop20 = {(v: VertexId, d: String) =>
150208
!top20verts.contains(v)
151209
}
152-
val newGraph = currentGraph.subgraph(x => true, filterTop20)
153-
val ccGraph = ConnectedComponents.run(newGraph)
210+
val newGraph = currentGraph.subgraph(x => true, filterTop20).cache
211+
val ccGraph = ConnectedComponents.run(newGraph).cache
154212
// val zeroVal = new mutable.HashSet[VertexId]()
155213
// val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
156214
// s.add(vtuple._2)

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
2020
import scala.reflect.{classTag, ClassTag}
2121

2222
import org.apache.spark.util.collection.PrimitiveVector
23-
import org.apache.spark.{HashPartitioner, Partitioner}
23+
import org.apache.spark.{HashPartitioner, Partitioner, Logging}
2424
import org.apache.spark.SparkContext._
2525
import org.apache.spark.graphx._
2626
import org.apache.spark.graphx.impl.GraphImpl._
@@ -29,6 +29,7 @@ import org.apache.spark.graphx.util.BytecodeUtils
2929
import org.apache.spark.rdd.{ShuffledRDD, RDD}
3030
import org.apache.spark.storage.StorageLevel
3131
import org.apache.spark.util.ClosureCleaner
32+
import java.util.NoSuchElementException
3233

3334

3435
/**
@@ -47,7 +48,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
4748
@transient val edges: EdgeRDD[ED],
4849
@transient val routingTable: RoutingTable,
4950
@transient val replicatedVertexView: ReplicatedVertexView[VD])
50-
extends Graph[VD, ED] with Serializable {
51+
extends Graph[VD, ED] with Serializable with Logging {
5152

5253
/** Default constructor is provided to support serialization */
5354
protected def this() = this(null, null, null, null)
@@ -216,50 +217,55 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
216217

217218
// Map and combine.
218219
val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
219-
val (ePid, edgePartition) = ePartIter.next()
220-
val (vPid, vPart) = vPartIter.next()
221-
assert(!vPartIter.hasNext)
222-
assert(ePid == vPid)
223-
// Choose scan method
224-
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
225-
val edgeIter = activeDirectionOpt match {
226-
case Some(EdgeDirection.Both) =>
227-
if (activeFraction < 0.8) {
228-
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
229-
.filter(e => vPart.isActive(e.dstId))
230-
} else {
231-
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
220+
if (ePartIter.hasNext) {
221+
val (ePid, edgePartition) = ePartIter.next()
222+
val (vPid, vPart) = vPartIter.next()
223+
assert(!vPartIter.hasNext)
224+
assert(ePid == vPid)
225+
// Choose scan method
226+
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
227+
val edgeIter = activeDirectionOpt match {
228+
case Some(EdgeDirection.Both) =>
229+
if (activeFraction < 0.8) {
230+
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
231+
.filter(e => vPart.isActive(e.dstId))
232+
} else {
233+
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
234+
}
235+
case Some(EdgeDirection.Either) =>
236+
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
237+
// the index here. Instead we have to scan all edges and then do the filter.
238+
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
239+
case Some(EdgeDirection.Out) =>
240+
if (activeFraction < 0.8) {
241+
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
242+
} else {
243+
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
244+
}
245+
case Some(EdgeDirection.In) =>
246+
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
247+
case _ => // None
248+
edgePartition.iterator
249+
}
250+
251+
// Scan edges and run the map function
252+
val et = new EdgeTriplet[VD, ED]
253+
val mapOutputs = edgeIter.flatMap { e =>
254+
et.set(e)
255+
if (mapUsesSrcAttr) {
256+
et.srcAttr = vPart(e.srcId)
232257
}
233-
case Some(EdgeDirection.Either) =>
234-
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
235-
// the index here. Instead we have to scan all edges and then do the filter.
236-
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
237-
case Some(EdgeDirection.Out) =>
238-
if (activeFraction < 0.8) {
239-
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
240-
} else {
241-
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
258+
if (mapUsesDstAttr) {
259+
et.dstAttr = vPart(e.dstId)
242260
}
243-
case Some(EdgeDirection.In) =>
244-
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
245-
case _ => // None
246-
edgePartition.iterator
247-
}
248-
249-
// Scan edges and run the map function
250-
val et = new EdgeTriplet[VD, ED]
251-
val mapOutputs = edgeIter.flatMap { e =>
252-
et.set(e)
253-
if (mapUsesSrcAttr) {
254-
et.srcAttr = vPart(e.srcId)
255-
}
256-
if (mapUsesDstAttr) {
257-
et.dstAttr = vPart(e.dstId)
261+
mapFunc(et)
258262
}
259-
mapFunc(et)
263+
// Note: This doesn't allow users to send messages to arbitrary vertices.
264+
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
265+
} else {
266+
logError("preAgg in mapReduceTriplets tried to iterate over empty partition.")
267+
Iterator.empty
260268
}
261-
// Note: This doesn't allow users to send messages to arbitrary vertices.
262-
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
263269
}
264270

265271
// do the final reduction reusing the index map

0 commit comments

Comments
 (0)