Skip to content

Commit

Permalink
add strongly connected components, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Jan 10, 2015
1 parent ff75dea commit fe4e541
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
4 changes: 2 additions & 2 deletions exsto/adhoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ def nitems (replier, senders):
# Prepare for Sender/Reply Graph Analysis

edge = top_convo.map(lambda (a, b): (whoMap.get(b[0]), whoMap.get(b[1]), a,))
edgeSchema = edge.map(lambda p: Row(replier=p[0], sender=p[1], count=int(p[2])))
edgeSchema = edge.map(lambda p: Row(replier=long(p[0]), sender=long(p[1]), num=int(p[2])))
edgeTable = sqlCtx.inferSchema(edgeSchema)
edgeTable.saveAsParquetFile("reply_edge.parquet")

node = who.map(lambda (a, b): (b, a))
nodeSchema = node.map(lambda p: Row(id=int(p[0]), sender=p[1]))
nodeSchema = node.map(lambda p: Row(id=long(p[0]), sender=p[1]))
nodeTable = sqlCtx.inferSchema(nodeSchema)
nodeTable.saveAsParquetFile("reply_node.parquet")

Expand Down
60 changes: 60 additions & 0 deletions exsto/graph2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val sqlCtx = new org.apache.spark.sql.SQLContext(sc)
import sqlCtx._

val edge = sqlCtx.parquetFile("reply_edge.parquet")
edge.registerTempTable("edge")

val node = sqlCtx.parquetFile("reply_node.parquet")
node.registerTempTable("node")

edge.schemaString
node.schemaString


val sql = "SELECT id, sender FROM node"

val n = sqlCtx.sql(sql).distinct()
val nodes: RDD[(Long, String)] = n.map{ p =>
(p(0).asInstanceOf[Long], p(1).asInstanceOf[String])
}
nodes.collect()


val sql = "SELECT replier, sender, num FROM edge"

val e = sqlCtx.sql(sql).distinct()
val edges: RDD[Edge[Int]] = e.map{ p =>
Edge(p(0).asInstanceOf[Long], p(1).asInstanceOf[Long], p(2).asInstanceOf[Int])
}
edges.collect()


// run graph analytics

val g: Graph[String, Int] = Graph(nodes, edges)
val r = g.pageRank(0.0001).vertices

r.join(nodes).sortBy(_._2._1, ascending=false).foreach(println)

// define a reduce operation to compute the highest degree vertex

def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}

// compute the max degrees

val maxInDegree: (VertexId, Int) = g.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = g.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = g.degrees.reduce(max)

val node_map: scala.collection.Map[Long, String] = node.
map(p => (p(0).asInstanceOf[Long], p(1).asInstanceOf[String])).collectAsMap()

// connected components

val scc = g.stronglyConnectedComponents(10).vertices
node.join(scc).foreach(println)

0 comments on commit fe4e541

Please sign in to comment.