Skip to content

Commit 5032924

Browse files
committed
Added dataflow pagerank. Having issues with it and in-memory shuffle.
1 parent f483ca4 commit 5032924

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.graphx.lib
19+
20+
import org.apache.spark.SparkContext._
21+
import org.apache.spark._
22+
import scala.math._
23+
import org.apache.spark.rdd.RDD
24+
import org.apache.spark.graphx._
25+
26+
27+
/**
28+
* Computes the PageRank of URLs from an input file. Input file should
29+
* be in format of:
30+
* URL neighbor URL
31+
* URL neighbor URL
32+
* URL neighbor URL
33+
* ...
34+
* where URL and their neighbors are separated by space(s).
35+
*/
36+
object DataflowPagerank extends Logging {
37+
def main(args: Array[String]) {
38+
if (args.length < 3) {
39+
System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
40+
System.exit(1)
41+
}
42+
val host = args(0)
43+
val fname = args(1)
44+
val iters = args(2).toInt
45+
val partitions = args(3).toInt
46+
val conf = new SparkConf()
47+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
48+
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
49+
.set("spark.locality.wait", "100000")
50+
// val sc = new SparkContext(args(0), "PageRank",
51+
// System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
52+
53+
// val sc = new SparkContext(host, "DataflowPagerank(" + fname + ")", conf)
54+
val sc = new SparkContext(host, "PageRank(" + fname + ")", conf)
55+
val lines = sc.textFile(fname).repartition(partitions)
56+
val links: RDD[(Long, Seq[Long])] = lines.map{ s =>
57+
val parts = s.split("\\s+")
58+
(parts(0).toLong, parts(1).toLong)
59+
}.groupByKey().cache()
60+
var ranks: RDD[(Long, Double)] = links.mapValues(v => 1.0)
61+
logWarning("Graph loaded")
62+
63+
for (i <- 1 to iters) {
64+
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
65+
val size = urls.size
66+
urls.map(url => (url, rank / size))
67+
}
68+
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
69+
ranks.count
70+
logWarning(s"Pagerank finished iteration $i")
71+
}
72+
73+
// val output = ranks.collect()
74+
// output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
75+
val totalRank = ranks.map{ case(_, r) => r}.reduce(_ + _)
76+
logWarning(s"Total Pagerank: $totalRank")
77+
sc.stop()
78+
79+
80+
81+
System.exit(0)
82+
}
83+
}

0 commit comments

Comments
 (0)