-
Notifications
You must be signed in to change notification settings - Fork 0
/
PeopleCorrelationAnalysis.scala
119 lines (98 loc) · 3.22 KB
/
PeopleCorrelationAnalysis.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, VertexId, EdgeDirection}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* Created by mac on 3/3/16.
*/
object PeopleCorrelationAnalysis extends App{
/**
* configuration of spark
*/
val conf = new SparkConf().setAppName("PeopleCorrelationAnalysis")
conf.setMaster("spark://10.10.18.226:7077")
val sc = new SparkContext(conf)
sc.addJar("/Users/mac/Documents/GraphXSurvey/SparkTest/out/artifacts/..")
case class Person(name:String, sex:String)
case class Link(relationship:String, happenDate:String)
/**
* create a graph from files which have specified form
* @param vertexFilePath file path of vertexs.csv
* @param edgeFilePath file path of edges.csv
* @return
*/
def createGraph(vertexFilePath:String, edgeFilePath:String): Graph[Person,Link] ={
val vertices = sc.textFile(vertexFilePath)
val links= sc.textFile(edgeFilePath)
//构建边、顶点RDD
val verticesRDD: RDD[(VertexId,Person)] = vertices map {line
=>
val row = line split ','
(row(0).toLong,Person(row(1),row(2)))
}
val linksRDD:RDD[Edge[Link]] = links map {line =>
val row = line split ','
Edge(row(0).toLong, row(1).toLong, Link(row(2), row(3)))
}
//构建图
val social: Graph[Person,Link] = Graph(verticesRDD, linksRDD)
return social
}
/**
* the main graph
*/
var social:Graph[Person,Link] = createGraph("/Users/mac/Documents/GraphXSurvey/GraphX/SocialNetwork/vertexs.csv","/Users/mac/Documents/GraphXSurvey/GraphX/SocialNetwork/edges.csv")
social.cache()
/**
* neighbours' vertexId of each vertexId
*/
val neighbors:RDD[(VertexId,Array[VertexId])] =
social.collectNeighborIds(EdgeDirection.Either) //VertexId是Long的别名
neighbors.cache()
/**
* find the n-layer neighbours of specified VertexId
*
* @param n the number of layers
* @param srcId the specified VertexId
* @return the set of n-layer neighbors
*/
def findNLayers(n:Int, srcId:Long):mutable.Set[Long] = {
var temp = Array[Long](srcId);
val tempSet = mutable.Set[Long]()
val result = mutable.Set[Long]()
val set = Set[Long]()
var m = 0
while(m<n){
// print("temp:")
// temp.foreach(x => print(x+" "))
// println("temp.size : " + temp.size )
for( i <- 0 until temp.size ) {
//取出temp(i)对应的唯一的Array[VertexId]
val idsArray = neighbors.lookup(temp(i))(0)
//把Array中的值全部加入tempSet
tempSet ++= idsArray.toSet
}
// println("tempSet:" + tempSet)
result ++= tempSet
// println("result:"+result)
// println()
//去重后,赋值给新一轮的temp
tempSet --= temp
temp = tempSet.toArray
tempSet.clear()
m=m+1
}
return result
}
println(findNLayers(3,78000000000008000L))
/**
* Run PageRank with an error tolerance of 0.0001,and
* get the top 10 vertices
*/
def getTop10Vertices() ={
val ranks = social.pageRank(0.0001).vertices.sortBy(_._2,false).take(10)
ranks.foreach(x => println(x._1+" "+x._2))
ranks
}
}