Skip to content

Commit 7a036cb

Browse files
committed
Fixed compilation issues.
1 parent d3bbfd0 commit 7a036cb

File tree

4 files changed

+170
-148
lines changed

4 files changed

+170
-148
lines changed
Lines changed: 116 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,116 @@
1-
package org.apache.spark.examples.graphx
2-
3-
import org.apache.spark._
4-
import org.apache.spark.graph._
5-
import org.apache.spark.graph.algorithms._
6-
import org.apache.spark.rdd.NewHadoopRDD
7-
import org.apache.hadoop.io.LongWritable
8-
import org.apache.hadoop.io.Text
9-
import org.apache.hadoop.conf.Configuration
10-
import org.apache.mahout.text.wikipedia._
11-
import org.apache.spark.rdd.RDD
12-
import java.util.Calendar
13-
import scala.math.Ordering.Implicits._
14-
15-
16-
object AnalyzeWikipedia extends Logging {
17-
18-
def main(args: Array[String]) = {
19-
20-
21-
22-
23-
val host = args(0)
24-
val fname = args(1)
25-
// val numparts = {
26-
// if (args.length >= 3) {
27-
// args(2).toInt
28-
// } else {
29-
// 64
30-
// }
31-
// }
32-
// val preformattedFname = args(2)
33-
34-
val serializer = "org.apache.spark.serializer.KryoSerializer"
35-
System.setProperty("spark.serializer", serializer)
36-
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
37-
38-
val sc = new SparkContext(host, "AnalyzeWikipedia")
39-
// val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2))
40-
41-
42-
// val conf = new Configuration
43-
// conf.set("key.value.separator.in.input.line", " ");
44-
// conf.set("xmlinput.start", "<page>");
45-
// conf.set("xmlinput.end", "</page>");
46-
47-
// val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
48-
// .map(stringify)
49-
50-
// println("XML pages: " + xmlRDD.count)
51-
// // .repartition(numparts)
52-
53-
// val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
54-
// .filter { art => art.relevant }
55-
56-
// println("Relevant pages: " + wikiRDD.count)
57-
58-
// val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
59-
// val justVids = wikiRDD.map { art => art.vertexID }
60-
// // println("taking top vids")
61-
// // val topvids = justVids.top(10)
62-
// // sc.stop()
63-
// // System.exit(0)
64-
65-
// // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
66-
// val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
67-
// println("Edges: " + edges.count)
68-
// println("Creating graph: " + Calendar.getInstance().getTime())
69-
70-
// val g = Graph(vertices, edges)
71-
// val g = Graph.fromEdges(edges, 1)
72-
// val g = Graph(edges, 1)
73-
val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices",
74-
minEdgePartitions = 128).cache()
75-
println("Triplets: " + g.triplets.count)
76-
77-
println("starting pagerank " + Calendar.getInstance().getTime())
78-
val startTime = System.currentTimeMillis
79-
val pr = PageRank.run(g, 20)
80-
81-
println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count)
82-
println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
83-
val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
84-
println("finished join.")
85-
86-
val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
87-
println("Top articles:\n" + topArticles.deep.mkString("\n"))
88-
// for(v <- topArticles) {
89-
// println(v)
90-
// }
91-
val article_name = "JohnsHopkinsUniversity"
92-
//
93-
//Find relevant vertices
94-
g.mapTriplets(e => {
95-
if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 }
96-
else { e.attr }
97-
})
98-
val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr },
99-
{ (v1: String , v2: String) => v1 + "\n" + v2 })
100-
101-
// filter only vertices whose title contains JHU
102-
val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect
103-
println("Articles matching " + article_name)
104-
println(relevant.deep.mkString("New Article\n"))
105-
106-
sc.stop()
107-
}
108-
109-
110-
def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
111-
tup._2.toString
112-
}
113-
114-
115-
116-
}
1+
//package org.apache.spark.examples.graphx
2+
//
3+
//import org.apache.spark._
4+
//import org.apache.spark.graph._
5+
//import org.apache.spark.graph.algorithms._
6+
//import org.apache.spark.rdd.NewHadoopRDD
7+
//import org.apache.hadoop.io.LongWritable
8+
//import org.apache.hadoop.io.Text
9+
//import org.apache.hadoop.conf.Configuration
10+
//import org.apache.mahout.text.wikipedia._
11+
//import org.apache.spark.rdd.RDD
12+
//import java.util.Calendar
13+
//import scala.math.Ordering.Implicits._
14+
//
15+
//
16+
//object AnalyzeWikipedia extends Logging {
17+
//
18+
// def main(args: Array[String]) = {
19+
//
20+
//
21+
//
22+
//
23+
// val host = args(0)
24+
// val fname = args(1)
25+
// // val numparts = {
26+
// // if (args.length >= 3) {
27+
// // args(2).toInt
28+
// // } else {
29+
// // 64
30+
// // }
31+
// // }
32+
// // val preformattedFname = args(2)
33+
//
34+
// val serializer = "org.apache.spark.serializer.KryoSerializer"
35+
// System.setProperty("spark.serializer", serializer)
36+
// System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
37+
//
38+
// val sc = new SparkContext(host, "AnalyzeWikipedia")
39+
// // val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2))
40+
//
41+
//
42+
// // val conf = new Configuration
43+
// // conf.set("key.value.separator.in.input.line", " ");
44+
// // conf.set("xmlinput.start", "<page>");
45+
// // conf.set("xmlinput.end", "</page>");
46+
//
47+
// // val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
48+
// // .map(stringify)
49+
//
50+
// // println("XML pages: " + xmlRDD.count)
51+
// // // .repartition(numparts)
52+
//
53+
// // val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
54+
// // .filter { art => art.relevant }
55+
//
56+
// // println("Relevant pages: " + wikiRDD.count)
57+
//
58+
// // val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
59+
// // val justVids = wikiRDD.map { art => art.vertexID }
60+
// // // println("taking top vids")
61+
// // // val topvids = justVids.top(10)
62+
// // // sc.stop()
63+
// // // System.exit(0)
64+
//
65+
// // // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
66+
// // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
67+
// // println("Edges: " + edges.count)
68+
// // println("Creating graph: " + Calendar.getInstance().getTime())
69+
//
70+
// // val g = Graph(vertices, edges)
71+
// // val g = Graph.fromEdges(edges, 1)
72+
// // val g = Graph(edges, 1)
73+
// val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices",
74+
// minEdgePartitions = 128).cache()
75+
// println("Triplets: " + g.triplets.count)
76+
//
77+
// println("starting pagerank " + Calendar.getInstance().getTime())
78+
// val startTime = System.currentTimeMillis
79+
// val pr = PageRank.run(g, 20)
80+
//
81+
// println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count)
82+
// println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
83+
// val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
84+
// println("finished join.")
85+
//
86+
// val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
87+
// println("Top articles:\n" + topArticles.deep.mkString("\n"))
88+
// // for(v <- topArticles) {
89+
// // println(v)
90+
// // }
91+
// val article_name = "JohnsHopkinsUniversity"
92+
// //
93+
// //Find relevant vertices
94+
// g.mapTriplets(e => {
95+
// if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 }
96+
// else { e.attr }
97+
// })
98+
// val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr },
99+
// { (v1: String , v2: String) => v1 + "\n" + v2 })
100+
//
101+
// // filter only vertices whose title contains JHU
102+
// val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect
103+
// println("Articles matching " + article_name)
104+
// println(relevant.deep.mkString("New Article\n"))
105+
//
106+
// sc.stop()
107+
// }
108+
//
109+
//
110+
// def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
111+
// tup._2.toString
112+
// }
113+
//
114+
//
115+
//
116+
//}

examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,12 @@ package org.apache.spark.examples.graphx
33
import org.apache.spark._
44
import org.apache.spark.graphx._
55
import org.apache.spark.graphx.lib._
6-
import org.apache.spark.graph.algorithms._
7-
import org.apache.spark.rdd.NewHadoopRDD
86
import org.apache.hadoop.io.LongWritable
97
import org.apache.hadoop.io.Text
108
import org.apache.hadoop.conf.Configuration
119
import org.apache.mahout.text.wikipedia._
1210
import org.apache.spark.rdd.RDD
1311
import org.apache.spark.SparkContext._
14-
import java.util.Calendar
15-
import scala.math.Ordering.Implicits._
1612
import org.apache.spark.Logging
1713
import scala.collection.mutable
1814

@@ -46,15 +42,15 @@ object PrePostProcessWikipedia extends Logging {
4642
case "graphx" => {
4743
val rawData = args(2)
4844
val result = graphx(sc, rawData)
49-
logWarning(result)
45+
// logWarning(result)
5046
}
5147
case "prep" => {
5248
val rawData = args(2)
5349
val outBase = args(3)
5450
prep(sc, rawData, outBase)
5551
}
5652

57-
case _ => throw new IllegalArgumentException("Please provide a valid process")
53+
case _ => throw new IllegalArgumentException("Please proVertexIde a valid process")
5854
}
5955
logWarning(process + "\tTIMEX: " + (System.currentTimeMillis - start)/1000.0)
6056
sc.stop()
@@ -77,36 +73,36 @@ object PrePostProcessWikipedia extends Logging {
7773
.map(stringify)
7874
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
7975
.filter { art => art.relevant }.repartition(128)
80-
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
76+
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
8177
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
8278
verticesToSave.saveAsTextFile(vertPath)
8379
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
84-
val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D)
85-
val pr = PageRank.runStandalone(g, 0.01)
86-
val prToSave = pr.map {v => v._1 + "\t"+ v._2}
80+
val g = Graph(vertices, edges) //TODO what to do about partitionStrategy???
81+
val pr = PageRank.run(g, 20)
82+
val prToSave = pr.vertices.map {v => v._1 + "\t"+ v._2}
8783
prToSave.saveAsTextFile(rankPath)
8884
}
8985

9086
def graphx(sc: SparkContext, rawData: String) {
9187

9288
val conf = new Configuration
93-
conf.set("key.value.separator.in.input.line", " ");
94-
conf.set("xmlinput.start", "<page>");
95-
conf.set("xmlinput.end", "</page>");
89+
conf.set("key.value.separator.in.input.line", " ")
90+
conf.set("xmlinput.start", "<page>")
91+
conf.set("xmlinput.end", "</page>")
9692

9793
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
9894
.map(stringify)
9995
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
10096
.filter { art => art.relevant }.repartition(128)
101-
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
97+
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
10298
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
103-
val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D)
99+
val g = Graph(vertices, edges)
104100
val resultG = pagerankConnComponentsAlt(4, g)
105-
logWarning(s"Final graph has ${resultG.triplets.count} EDGES, ${resultG.vertices.count} VERTICES")
101+
logWarning(s"Final graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES")
106102
// val pr = PageRank.run(g, 20)
107103
// val prAndTitle = g
108-
// .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
109-
// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
104+
// .outerJoinVertices(pr)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
105+
// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
110106
// top20.mkString("\n")
111107

112108
}
@@ -115,15 +111,26 @@ object PrePostProcessWikipedia extends Logging {
115111
var currentGraph = g
116112
for (i <- 0 to numRepetitions) {
117113
val pr = PageRank.run(currentGraph, 20)
118-
val prAndTitle = currentGraph
119-
.outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
120-
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
114+
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
115+
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
121116
logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}")
122117
val top20verts = top20.map(_._1).toSet
123118
// filter out top 20 vertices
124-
val newGraph = currentGraph.subgraph(vpred = ((v, d) => !top20Verts.contains(v)))
119+
val filterTop20 = {(v: VertexId, d: String) =>
120+
!top20verts.contains(v)
121+
}
122+
val newGraph = currentGraph.subgraph(x => true, filterTop20)
125123
val ccGraph = ConnectedComponents.run(newGraph)
126-
val numCCs = ccGraph.vertices.aggregate(new mutable.HashSet())(((set, vtuple) => set += vtuple._2), ((set1, set2) => set1 union set2)).size
124+
val zeroVal = new mutable.HashSet[VertexId]()
125+
val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
126+
s.add(vtuple._2)
127+
s
128+
}
129+
val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2}
130+
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp)
131+
//(new mutable.HashSet[Int]())((s: mutable.HashSet[Int], vtuple: (VertexId, Int)) => { s.add(vtuple._2); s },(s1: mutable.HashSet[Int], s2: mutable.HashSet[Int]) => { s1 union s2})
132+
133+
//(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size
127134
logWarning(s"Number of connected components for iteration $i: $numCCs")
128135
// TODO will this result in too much memory overhead???
129136
currentGraph = newGraph
@@ -143,7 +150,7 @@ object PrePostProcessWikipedia extends Logging {
143150
.map(stringify)
144151
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
145152
.filter { art => art.relevant }.repartition(128)
146-
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
153+
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
147154
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
148155
verticesToSave.saveAsTextFile(outBase + "_vertices")
149156
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
@@ -159,12 +166,10 @@ object PrePostProcessWikipedia extends Logging {
159166

160167
// slightly cheating, but not really
161168
val ranksAndAttrs = ranks.join(attrs)
162-
val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (Vid, (Double, String))) => entry._2._1))
169+
val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1))
163170
top20.mkString("\n")
164171
}
165172

166-
167-
168173
def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
169174
tup._2.toString
170175
}

0 commit comments

Comments
 (0)