@@ -52,6 +52,23 @@ object DataflowPagerank extends Logging {
5252
5353 // val sc = new SparkContext(host, "DataflowPagerank(" + fname + ")", conf)
5454 val sc = new SparkContext (host, " PageRank(" + fname + " )" , conf)
55+
56+ val algo = args(4 ) match {
57+ case " naive" => naiveVersion(sc, fname, partitions, iters)
58+ case " optimized" => optimizedSpark(sc, fname, partitions, iters)
59+ case " cc" => connectedComponents(sc, fname, partitions)
60+ case _ => throw new UnsupportedOperationException
61+ }
62+
63+ sc.stop()
64+
65+
66+
67+ System .exit(0 )
68+ }
69+
70+ def optimizedSpark (sc : SparkContext , fname : String , partitions : Int , iters : Int ) {
71+
5572 val lines = sc.textFile(fname).repartition(partitions)
5673 val links : RDD [(Long , Seq [Long ])] = lines.map{ s =>
5774 val parts = s.split(" \\ s+" )
@@ -74,10 +91,96 @@ object DataflowPagerank extends Logging {
7491 // output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
7592 val totalRank = ranks.map{ case (_, r) => r}.reduce(_ + _)
7693 logWarning(s " Total Pagerank: $totalRank" )
77- sc.stop()
78-
94+ }
7995
96+ // For now edge data is arbitrarily a string
97+ def naiveVersion (sc : SparkContext , fname : String , partitions : Int , iters : Int ) {
98+
99+ val lines = sc.textFile(fname).repartition(partitions)
100+ val edges : RDD [(Long , (Long , Int ))] = lines.map{ s =>
101+ val parts = s.split(" \\ s+" )
102+ (parts(0 ).toLong, (parts(1 ).toLong, 1 ))
103+ }
104+ val alpha = 0.15
105+ val initialRank = 1.0
106+ logWarning(" Pagerank entered." )
107+
108+ // get outdegree of each each and make weight 1/outdegree
109+ val weightedEdges : RDD [(Long , (Long , Double ))] = edges
110+ .map { case (src : Long , (dst : Long , _ : Int )) => (src, 1.0 )}
111+ // .reduceByKey((v1, v2) => v1 + v2)
112+ .reduceByKey(_ + _)
113+ .join(edges)
114+ .map{ case (src : Long , (outDegree : Double , (dst : Long , _ : Int ))) =>
115+ (src, (dst, 1.0 / outDegree))
116+ }.cache()
117+
118+ // initialize ranks
119+ var ranks : RDD [(Long , Double )] = edges.map{ case (src : Long , (dst : Long , _ : Int )) => (src, initialRank)}
120+ .union(edges.map{ case (src : Long , (dst : Long , _ : Int )) => (dst, initialRank)})
121+ .distinct()
122+
123+ logWarning(" Starting pagerank iterations" )
124+ for (i <- 1 to iters) {
125+ ranks = weightedEdges.join(ranks)
126+ .map {case (src : Long , ((dst : Long , weight : Double ), rank : Double )) => (dst, weight* rank)}
127+ .reduceByKey(_ + _)
128+ .join(ranks)
129+ .map { case (id : Long , (incomingRanks : Double , myRank : Double )) => (id, alpha* myRank + (1.0 - alpha)* incomingRanks)}
130+
131+ ranks.count
132+ logWarning(" Finished iteration: " + i)
133+ }
134+ val totalRank = ranks.map{ case (_, r) => r}.reduce(_ + _)
135+ logWarning(s " Total Pagerank: $totalRank" )
136+
137+ }
138+
139+ def connectedComponents (sc : SparkContext , fname : String , partitions : Int ) {
140+
141+ val lines = sc.textFile(fname).repartition(partitions)
142+ val edges : RDD [(Long , Long )] = lines.map{ s =>
143+ val parts = s.split(" \\ s+" )
144+ (parts(0 ).toLong, parts(1 ).toLong)
145+ }.cache()
146+ logWarning(" CC started" )
147+
148+
149+ // initialize ccIDs to IDs
150+ var ccs : RDD [(Long , Long )] = edges.map{ case (src : Long , (dst : Long , _ : Int )) => (src, src)}
151+ .union(edges.map{ case (src : Long , (dst : Long , _ : Int )) => (dst, dst)})
152+ .distinct()
153+ var numUpdates = Long .MaxValue
154+
155+ logWarning(" Starting CC iterations" )
156+ while (numUpdates > 0 ) {
157+
158+ val newCCs = edges
159+ // get src property
160+ .join(ccs)
161+ // rekey by dst
162+ .map {case (src : Long , (dst : Long , srcCC : Long )) => (dst, (src, srcCC)}
163+ // get dst property
164+ .join(ccs)
165+ // emit min ccId to src and adst
166+ .flatMap { case (dst : Long , ((src : Long , srcCC : Long ), dstCC)) =>
167+ Iterator ((src, min(srcCC, dstCC)), (dst, min(srcCC, dstCC)))
168+ }
169+ .reduceByKey(min(_, _)).cache()
170+ // .join(ranks)
171+ // .map { case (id: Long, (incomingRanks: Double, myRank: Double)) => (id, alpha*myRank + (1.0-alpha)*incomingRanks)}
172+
173+ // check for convergence
174+ numUpdates = newCCs.join(ccs)
175+ .filter{case (vid, (newCC, oldCC)) => newCC != oldCC }.count()
176+ ccs = newCCs
177+
178+
179+ logWarning(s " CC iter $i with $numUpdates updates " )
180+
181+ }
182+ val numCCs = ccs.map{ case (_, id) => id}.distinct().count()
183+ logWarning(s " Num connected components: $numCCs" )
80184
81- System .exit(0 )
82185 }
83186}
0 commit comments