Skip to content

Commit

Permalink
adding PySpark workflow for ETL
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Dec 31, 2014
1 parent 1b9017c commit 9a9ffe9
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 7 deletions.
172 changes: 172 additions & 0 deletions exsto/ETL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
## ETL in PySpark with Spark SQL

Let's use PySpark and Spark SQL to prepare the data for ML and graph
analysis.
We can perform *data discovery* while reshaping the data for later
work.
These early results can help guide our deeper analysis.

NB: if this ETL needs to run outside of the `bin/pyspark` shell, first
set up a `SparkContext` variable:

```python
from pyspark import SparkContext
sc = SparkContext(appName="Exsto", master="local[*]")
```

Import the JSON data produced by the scraper and register its schema
for ad-hoc SQL queries later.
Each message has the fields:
`date`, `sender`, `id`, `next_thread`, `prev_thread`, `next_url`, `subject`, `text`

```python
from pyspark.sql import SQLContext, Row
sqlCtx = SQLContext(sc)

msg = sqlCtx.jsonFile("data").cache()
msg.registerTempTable("msg")
```

NB: note the persistence used for the JSON message data.
We may need to unpersist at a later stage of this ETL work.

### Question: Who are the senders?

Who are the people in the developer community sending email to the list?
We will use this as a dimension in our analysis and reporting.
Let's create a map, with a unique ID for each email address --
this will be required for the graph analysis.
It may come in handy later for some
[named-entity recognition](https://en.wikipedia.org/wiki/Named-entity_recognition).

```python
who = msg.map(lambda x: x.sender).distinct().zipWithUniqueId()
who.take(10)

whoMap = who.collectAsMap()
```

### Question: Who are the top K senders?

A highly active open source developer community such as Apache Spark
will have several thousand people engaged.
Let's identify the most active ones.
Then we can show a leaderboard and track changes in it over time.

```python
from operator import add

top_sender = msg.map(lambda x: (x.sender, 1,)).reduceByKey(add) \
.map(lambda (a, b): (b, a)) \
.sortByKey(0, 1) \
.map(lambda (a, b): (b, a))

top_sender.take(11)
```

You many notice that code... it comes from *word count*.


### Question: Which are the top K conversations?

Clearly, some people discuss over the email list more than others.
Let's identify *who* those people are.
Later we can leverage our graph analysis to determine *what* they discuss.

NB: note the use case for `groupByKey` transformations;
sometimes it usage is indicated.

```python
import itertools

def nitems (replier, senders):
for sender, g in itertools.groupby(senders):
yield len(list(g)), (replier, sender,)

senders = msg.map(lambda x: (x.id, x.sender,))
replies = msg.map(lambda x: (x.prev_thread, x.sender,))

convo = replies.join(senders).values() \
.filter(lambda (a, b): a != b)

top_convo = convo.groupByKey() \
.flatMap(lambda (a, b): list(nitems(a, b))) \
.sortByKey(0)

top_convo.take(10)
```

### Prepare for Sender/Reply Graph Analysis

Given the RDDs that we have created to help answer some of the
questions so far, let's persist those data sets using
[Parquet](http://parquet.io) --
starting with the graph of sender/message/reply:

```python
edge = top_convo.map(lambda (a, b): (whoMap.get(b[0]), whoMap.get(b[1]), a,))
edgeSchema = edge.map(lambda p: Row(replier=p[0], sender=p[1], count=int(p[2])))
edgeTable = sqlCtx.inferSchema(edgeSchema)
edgeTable.saveAsParquetFile("reply_edge.parquet")

node = who.map(lambda (a, b): (b, a))
nodeSchema = node.map(lambda p: Row(id=int(p[0]), sender=p[1]))
nodeTable = sqlCtx.inferSchema(nodeSchema)
nodeTable.saveAsParquetFile("reply_node.parquet")
```

---

*(TBD)*

Parse the text in email messages.
NB: FIX THIS (only one elem now)

```python
import exsto

def parse_text (id, text):
for graf_text in exsto.filter_quotes(text):
yield id, exsto.parse_graf(graf_text)[0]

msg1 = sc.parallelize(msg.take(1))
grafs = msg1.flatMap(lambda x: list(parse_text(x.id, x.text)))
grafs.collect()
```

Each paragraph has a [SHA-1](https://en.wikipedia.org/wiki/SHA-1)
digest of its parsed sentences, effectively making parts of our
semantic analysis *content addressable*.

Store a mapping of `(ID, SHA1)` to preserve the composition of each
message.

```python
sha1 = grafs.map(lambda (id, meta): (id, meta["sha1"],))
sha1Schema = sha1.map(lambda p: Row(id=p[0], sha1=p[1]))
sha1Table = sqlCtx.inferSchema(sha1Schema)
sha1Table.saveAsParquetFile("id_sha1.parquet")
```

Next, store a mapping of `(SHA1, parsed_graf)` for each paragraph --
so that we have a tree structure for the parse data.
This is useful to detect duplicated paragraphs and thus avoid
distorting the analytics.

```python
def csv_tiles (tiles):
return "\t".join(["%d,%d" % (n0, n1) for n0, n1 in tiles])

def word_tuple (t):
id, word, lemma, pos, keep, idx = t
return ",".join([str(id), word, lemma, pos, str(keep), str(idx)])

def csv_words (words):
return "\t".join([word_tuple(t) for t in words])

text = grafs.map(lambda (id, m): (m["sha1"], csv_tiles(m["tile"]), csv_words(m["graf"]), m["polr"], m["subj"],))

textSchema = text.map(lambda p: Row(sha1=p[0], tiles=p[1], words=p[2], polr=float(p[3]), subj=float(p[4])))
textTable = sqlCtx.inferSchema(textSchema)
textTable.saveAsParquetFile("sha1_graf.parquet")
```
3 changes: 2 additions & 1 deletion exsto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ file.
```


# What's In A Name?
# What's in a name?

The word [exsto](http://en.wiktionary.org/wiki/exsto) is the Latin
verb meaning "to stand out", in its present active form.
Expand All @@ -97,6 +97,7 @@ verb meaning "to stand out", in its present active form.

* [TextRank](http://web.eecs.umich.edu/~mihalcea/papers/mihalcea.emnlp04.pdf)
* [Word2Vec use cases](http://www.yseam.com/blog/WV.html)
* [Word2Vec vs. GloVe](http://radimrehurek.com/2014/12/making-sense-of-word2vec/)

### microservices and containers

Expand Down
9 changes: 4 additions & 5 deletions exsto/exsto.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,14 @@ def split_grafs (lines):
yield "\n".join(graf)


def filter_quotes (line):
def filter_quotes (text):
"""filter the quoted text out of a message"""
global DEBUG
global PAT_FORWARD, PAT_REPLIED, PAT_UNSUBSC

meta = json.loads(line)
text = filter(lambda x: x in string.printable, meta["text"])
text = filter(lambda x: x in string.printable, text)

if DEBUG:
print line
print text

# strip off quoted text in a forward
Expand Down Expand Up @@ -161,7 +159,8 @@ def test_filter (path):
for file in files:
with open(path + file, 'r') as f:
line = f.readline()
grafs = filter_quotes(line)
meta = json.loads(line)
grafs = filter_quotes(meta["text"])

if not grafs or len(grafs) < 1:
raise Exception("no results")
Expand Down
4 changes: 3 additions & 1 deletion exsto/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# encoding: utf-8

import exsto
import json
import os
import sys

Expand All @@ -14,7 +15,8 @@ def main ():
else:
with open(path, 'r') as f:
for line in f.readlines():
print exsto.pretty_print(exsto.filter_quotes(line))
meta = json.loads(line)
print exsto.pretty_print(exsto.filter_quotes(meta["text"]))


if __name__ == "__main__":
Expand Down

0 comments on commit 9a9ffe9

Please sign in to comment.