@@ -57,6 +57,7 @@ object DataflowPagerank extends Logging {
5757 case " naive" => naiveVersion(sc, fname, partitions, iters)
5858 case " optimized" => optimizedSpark(sc, fname, partitions, iters)
5959 case " cc" => connectedComponents(sc, fname, partitions)
60+ case " ccOpt" => ccSlightlyOpt(sc, fname, partitions)
6061 case _ => throw new UnsupportedOperationException
6162 }
6263
@@ -147,19 +148,20 @@ object DataflowPagerank extends Logging {
147148
148149
149150 // initialize ccIDs to IDs
150- var ccs : RDD [(Long , Long )] = edges.map{ case (src : Long , ( dst : Long , _ : Int ) ) => (src, src)}
151- .union(edges.map{ case (src : Long , ( dst : Long , _ : Int ) ) => (dst, dst)})
151+ var ccs : RDD [(Long , Long )] = edges.map{ case (src : Long , dst : Long ) => (src, src)}
152+ .union(edges.map{ case (src : Long , dst : Long ) => (dst, dst)})
152153 .distinct()
153154 var numUpdates = Long .MaxValue
154155
155156 logWarning(" Starting CC iterations" )
157+ var i = 0
156158 while (numUpdates > 0 ) {
157159
158160 val newCCs = edges
159161 // get src property
160162 .join(ccs)
161163 // rekey by dst
162- .map {case (src : Long , (dst : Long , srcCC : Long )) => (dst, (src, srcCC)}
164+ .map {case (src : Long , (dst : Long , srcCC : Long )) => (dst, (src, srcCC)) }
163165 // get dst property
164166 .join(ccs)
165167 // emit min ccId to src and adst
@@ -173,14 +175,72 @@ object DataflowPagerank extends Logging {
173175 // check for convergence
174176 numUpdates = newCCs.join(ccs)
175177 .filter{case (vid, (newCC, oldCC)) => newCC != oldCC }.count()
176- ccs = newCCs
177-
178178
179179 logWarning(s " CC iter $i with $numUpdates updates " )
180+ ccs = newCCs
181+ i += 1
180182
181183 }
182184 val numCCs = ccs.map{ case (_, id) => id}.distinct().count()
183185 logWarning(s " Num connected components: $numCCs" )
184186
185187 }
188+
189+ def ccSlightlyOpt (sc : SparkContext , fname : String , partitions : Int ) {
190+
191+ val lines = sc.textFile(fname).repartition(partitions)
192+ val edges : RDD [(Long , Long )] = lines.map{ s =>
193+ val parts = s.split(" \\ s+" )
194+ (parts(0 ).toLong, parts(1 ).toLong)
195+ }.cache()
196+ logWarning(" CC started" )
197+
198+
199+ // initialize ccIDs to IDs
200+ var ccs : RDD [(Long , Long )] = edges.map{ case (src : Long , dst : Long ) => (src, src)}
201+ .union(edges.map{ case (src : Long , dst : Long ) => (dst, dst)})
202+ .distinct()
203+ var numUpdates = Long .MaxValue
204+
205+ logWarning(" Starting CC iterations" )
206+ var i = 0
207+ while (numUpdates > 0 ) {
208+
209+ val newCCs = edges
210+ // get src property
211+ .join(ccs)
212+ // rekey by dst
213+ .map {case (src : Long , (dst : Long , srcCC : Long )) => (dst, (src, srcCC))}
214+ // get dst property
215+ .join(ccs)
216+ // emit min ccId to src and adst
217+ .flatMap { case (dst : Long , ((src : Long , srcCC : Long ), dstCC)) =>
218+ if (srcCC < dstCC) {
219+ Iterator (dst, srcCC)
220+ } else if (dstCC < srcCC) {
221+ Iterator (src, dstCC)
222+ } else {
223+ Iterator .empty
224+ }
225+ // Iterator((src, min(srcCC, dstCC)), (dst, min(srcCC, dstCC)))
226+ }
227+ .reduceByKey(min(_, _))
228+
229+
230+ ccs = ccs.join(newCCs).map{ case (vid, (oldCC, newCC)) => (vid, min(oldCC, newCC)) }.cache()
231+ // .join(ranks)
232+ // .map { case (id: Long, (incomingRanks: Double, myRank: Double)) => (id, alpha*myRank + (1.0-alpha)*incomingRanks)}
233+
234+ // check for convergence
235+ // numUpdates = newCCs.join(ccs)
236+ // .filter{case (vid, (newCC, oldCC)) => newCC != oldCC }.count()
237+ numUpdates = newCCs.count()
238+
239+ logWarning(s " CC iter $i with $numUpdates updates " )
240+ // ccs = newCCs
241+ i += 1
242+ }
243+ val numCCs = ccs.map{ case (_, id) => id}.distinct().count()
244+ logWarning(s " Num connected components: $numCCs" )
245+ }
186246}
0 commit comments