Skip to content

Commit

Permalink
reworked as functions
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Feb 11, 2015
1 parent e5d1cb6 commit beac1c2
Showing 1 changed file with 174 additions and 62 deletions.
236 changes: 174 additions & 62 deletions exsto/dbc/2.TextRank.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
// Databricks notebook source exported at Mon, 9 Feb 2015 04:37:21 UTC
import org.apache.spark.graphx._
// Databricks notebook source exported at Mon, 9 Feb 2015 11:25:45 UTC
// MAGIC %md ## Augmented TextRank in Spark
// MAGIC The following is a
// MAGIC [Spark](http://spark.apache.org/)
// MAGIC implementation of
// MAGIC [TextRank](http://web.eecs.umich.edu/~mihalcea/papers/mihalcea.emnlp04.pdf)
// MAGIC by Mihalcea, et al.
// MAGIC The graph used in the algorithm is enriched by replacing the original authors' *Porter stemmer* approach with lemmatization from
// MAGIC [WordNet](http://wordnet.princeton.edu/).
// MAGIC
// MAGIC This algorithm generates a *graph* from a text document, linking together related words, then runs
// MAGIC [PageRank](http://en.wikipedia.org/wiki/PageRank)
// MAGIC on that graph to determine the high-ranked keyphrases.,
// MAGIC Those keyphrases summarize the text document, similar to how an human editor would summarize for an academic paper.
// MAGIC
// MAGIC See [https://github.com/ceteri/spark-exercises/tree/master/exsto](https://github.com/ceteri/spark-exercises/tree/master/exsto)
// MAGIC and also the earlier [Hadoop implementation](https://github.com/ceteri/textrank)
// MAGIC which leveraged *semantic relations* by extending the graph using
// MAGIC [hypernyms](https://en.wikipedia.org/wiki/Hyponymy_and_hypernymy) from WordNet as well.
// MAGIC
// MAGIC First, we need to create *base RDDs* from the Parquet files that we stored in DBFS during the ETL phase...

// COMMAND ----------

val edge = sqlContext.parquetFile("/mnt/paco/exsto/graph/graf_edge.parquet")
edge.registerTempTable("edge")
Expand All @@ -17,8 +38,21 @@ val msg_id = "CA+B-+fyrBU1yGZAYJM_u=gnBVtzB=sXoBHkhmS-6L1n8K5Hhbw"

// COMMAND ----------

// MAGIC %md Our use of [GraphX](https://spark.apache.org/graphx/) requires some imports...

// COMMAND ----------

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// COMMAND ----------

// MAGIC %md Next we run a query in [Spark SQL](https://spark.apache.org/sql/) to deserialize just the fields that we need from the Parquet files to generate the graph nodes...

// COMMAND ----------

// use in parallelized version

val sql = """
SELECT node_id, root
FROM node
Expand All @@ -35,6 +69,12 @@ nodes.collect()

// COMMAND ----------

// MAGIC %md Likewise for the edges in the graph...

// COMMAND ----------

// use in parallelized version

val sql = """
SELECT node0, node1
FROM edge
Expand All @@ -51,25 +91,29 @@ edges.collect()

// COMMAND ----------

// MAGIC %md Next, we run PageRank with this graph...
// MAGIC %md ### Graph Analytics
// MAGIC We compose a graph from the `node` and `edge` RDDs and run [PageRank](http://spark.apache.org/docs/latest/graphx-programming-guide.html#pagerank) on it...

// COMMAND ----------

// use in parallelized version

val g: Graph[String, Int] = Graph(nodes, edges)
val r = g.pageRank(0.0001).vertices

// COMMAND ----------

r.join(nodes).sortBy(_._2._1, ascending=false).collect()
// MAGIC %md Save the resulting ranks for each word of interest...

// COMMAND ----------

// MAGIC %md Then save the resulting ranks for each word of interest...
// use in parallelized version

// COMMAND ----------
case class Rank(id: Int, rank: Double, word: String)

case class Rank(id: Int, rank: Float, word: String)
val rank = r.join(nodes).map(p => Rank(p._1.toInt, p._2._1.asInstanceOf[Float], p._2._2))
val rank = r.join(nodes).map {
case (node_id, (rank, word)) => Rank(node_id.toInt, rank, word)
}

rank.registerTempTable("rank")

Expand All @@ -87,15 +131,16 @@ rank.registerTempTable("rank")

// COMMAND ----------

// use in parallelized version

val rankMap = rank.map(r => (r.id, r.rank)).collectAsMap()

// COMMAND ----------

def median[T](s: Seq[T])(implicit n: Fractional[T]) = {
import n._
val (lower, upper) = s.sortWith(_<_).splitAt(s.size / 2)
if (s.size % 2 == 0) (lower.last + upper.head) / fromInt(2) else upper.head
}
// MAGIC %md ### Email Summarization: Extracting Key Phrases
// MAGIC Next we got back to the parsed text and use the *TextRank* rankings to extract key phrases for each email message.
// MAGIC
// MAGIC First, let's examing the parsed text for the example message...

// COMMAND ----------

Expand All @@ -108,88 +153,103 @@ parsed.printSchema

// COMMAND ----------

node.printSchema
// MAGIC %sql
// MAGIC SELECT graf, tile, size, polr, subj
// MAGIC FROM parsed
// MAGIC WHERE id='CA+B-+fyrBU1yGZAYJM_u=gnBVtzB=sXoBHkhmS-6L1n8K5Hhbw'

// COMMAND ----------

edge.printSchema
// MAGIC %md Fortunately, the stored Parquet files have that data available in an efficient way...

// COMMAND ----------

rank.printSchema
node.printSchema

// COMMAND ----------

// MAGIC %sql
// MAGIC SELECT graf, tile, size, polr, subj
// MAGIC FROM parsed
// MAGIC WHERE id='CA+B-+fyrBU1yGZAYJM_u=gnBVtzB=sXoBHkhmS-6L1n8K5Hhbw'

// COMMAND ----------
// use in parallelized version

val sql = """
SELECT num, node_id, raw, pos, keep
FROM node
WHERE id='%s'
ORDER BY num ASC
""".format(msg_id)

val para = sqlContext.sql(sql)

// COMMAND ----------

para.collect()
// MAGIC %md The parsed text for the given message looks like the following sequence...

// COMMAND ----------

var last_idx: Int = -1
var span: List[String] = List()
var rank_sum: Double = 0.0
var noun_count: Int = 0
var phrases: collection.mutable.Map[String, Double] = collection.mutable.Map()
// use in parallelized version

para.collect().foreach{ x =>
val w_idx = x(0).asInstanceOf[Int]
val node_id = x(1).asInstanceOf[Long].toInt
val word = x(2).asInstanceOf[String].toLowerCase()
val pos = x(3).asInstanceOf[String]
val keep = x(4).asInstanceOf[Int]

if (keep == 1) {
if (w_idx - last_idx > 1) {
if (noun_count > 0) {
phrases += (span.mkString(" ") -> rank_sum)
val paraSeq = para
.map(r => (r(0).asInstanceOf[Int], r(1).asInstanceOf[Long], r(2).toString, r(3).toString, r(4).asInstanceOf[Int]))
.collect
.toSeq

// COMMAND ----------

// MAGIC %md We define a function to extract key phrases from that sequence...

// COMMAND ----------

// use in parallelized version

def extractPhrases (s: Seq[(Int, Long, String, String, Int)]): Seq[(String, Double)] = {
var last_idx: Int = -1
var span: List[String] = List()
var rank_sum: Double = 0.0
var noun_count: Int = 0
var phrases: collection.mutable.Map[String, Double] = collection.mutable.Map()

s.foreach { row =>
val(w_idx, node_id, word, pos, keep) = row

if (keep == 1) {
if (w_idx - last_idx > 1) {
if (noun_count > 0) phrases += (span.mkString(" ").toLowerCase() -> rank_sum)

span = List()
rank_sum = 0.0
noun_count = 0
}

span = List()
rank_sum = 0.0
noun_count = 0
}

val rank = rankMap.get(node_id).getOrElse(0.0).asInstanceOf[Float]
//println(w_idx, node_id, word, pos, rank)

last_idx = w_idx
span = span :+ word
rank_sum += rank

if (pos.startsWith("N")) noun_count += 1
val rank = rankMap.get(node_id.toInt).getOrElse(0.0).asInstanceOf[Float]
//println(w_idx, node_id, word, pos, rank)

last_idx = w_idx
span = span :+ word
rank_sum += rank

if (pos.startsWith("N")) noun_count += 1
}
}
}

if (noun_count > 0) {
phrases += (span.mkString(" ") -> rank_sum)
if (noun_count > 0) phrases += (span.mkString(" ").toLowerCase() -> rank_sum)

// normalize the ranks
val sum = phrases.values.reduceLeft[Double](_ + _)
val norm_ranks: collection.mutable.Map[String, Double] = collection.mutable.Map()

phrases foreach {case (phrase, rank) => norm_ranks += (phrase -> rank / sum)}
norm_ranks.toSeq
}

// COMMAND ----------

// MAGIC %md Now let's create an RDD from the extracted phrases and use SQL to show the results...

// COMMAND ----------

case class Phrase(phrase: String, norm_rank: Double)

val phraseRdd = sc.parallelize(phrases.toSeq).distinct()
val sum = phraseRdd.map(_._2).reduce(_ + _)

val phraseTable = phraseRdd.map(p => Phrase(p._1, p._2 / sum))
phraseTable.registerTempTable("phrase")
val phraseRdd = sc.parallelize(extractPhrases(paraSeq)).map(p => Phrase(p._1, p._2))
phraseRdd.registerTempTable("phrase")

// COMMAND ----------

Expand All @@ -199,14 +259,19 @@ phraseTable.registerTempTable("phrase")

// COMMAND ----------

// MAGIC %md ### Evaluation
// MAGIC How do those results compare with what a human reader might extract from the message text?

// COMMAND ----------

// MAGIC %sql
// MAGIC SELECT text
// MAGIC FROM msg
// MAGIC WHERE id='CA+B-+fyrBU1yGZAYJM_u=gnBVtzB=sXoBHkhmS-6L1n8K5Hhbw'

// COMMAND ----------

// MAGIC %md Just for kicks, let's compare the results of **TextRank** with the *term frequencies* that would result from a **WordCount** ...
// MAGIC %md Just for kicks, let's compare the results of *TextRank* with the *term frequencies* that would result from a **WordCount** ...

// COMMAND ----------

Expand All @@ -222,4 +287,51 @@ para.map(x => (x(2).asInstanceOf[String].toLowerCase(), 1))

// MAGIC %md Um, yeah. So that happened.
// MAGIC
// MAGIC That's why you probably want to enrich text analytics results before other algorithms need to use them downstream.
// MAGIC That's why you probably want to clean-up and enrich the results of text analytics before other algorithms consume them downstream as *features*.
// MAGIC Otherwise, `GIGO` as they say.

// COMMAND ----------

// MAGIC %md ### Parallelized Version
// MAGIC
// MAGIC Let's pull all of these pieces together into a function that can be run in parallel at scale...

// COMMAND ----------

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

case class Rank(id: Int, rank: Double, word: String)

def textRank (msg_id: String): Seq[(String, Double)] = {
var sql = s"SELECT node_id, root FROM node WHERE id='%s' AND keep=1".format(msg_id)

val nodes: RDD[(Long, String)] = sqlContext.sql(sql).distinct()
.map{ p =>
(p(0).asInstanceOf[Long], p(1).asInstanceOf[String])
}

sql = s"SELECT node0, node1 FROM edge WHERE id='%s'".format(msg_id)

val edges: RDD[Edge[Int]] = sqlContext.sql(sql).distinct()
.map{ p =>
Edge(p(0).asInstanceOf[Long], p(1).asInstanceOf[Long], 0)
}

val g: Graph[String, Int] = Graph(nodes, edges)

val rankMap = g.pageRank(0.0001).vertices
.join(nodes)
.map {
case (node_id, (rank, word)) => (node_id.toInt, rank)
}.collectAsMap()

sql = s"SELECT num, node_id, raw, pos, keep FROM node WHERE id='%s' ORDER BY num ASC".format(msg_id)

val paraSeq = sqlContext.sql(sql)
.map(r => (r(0).asInstanceOf[Int], r(1).asInstanceOf[Long], r(2).toString, r(3).toString, r(4).asInstanceOf[Int]))
.collect
.toSeq

extractPhrases(paraSeq)
}

0 comments on commit beac1c2

Please sign in to comment.