Skip to content

Commit 0a43433

Browse files
authored
Finalize Streaming, Cleaning, and Vizualiation
1 parent d9d835e commit 0a43433

File tree

1 file changed

+76
-1
lines changed

1 file changed

+76
-1
lines changed

Project/README.md

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,91 @@ We use Streaming Context API socketTextStream to receive tweet Streaming through
2323
raw_tweets = ssc.socketTextStream('localhost',5555)
2424
```
2525

26+
## Clean Tweets
2627

28+
The tweets come in as a DStream object, which can be thought of as a list of strings, each one corresponding to a single tweet.
29+
```python
30+
raw_tweets = ssc.socketTextStream('172.17.0.2',5555)
31+
```
32+
Next, the `raw_tweets` are written out to a log file to be accessed for any future analysis.
33+
```python
34+
def writeRDD(rdd):
35+
global logs
36+
logs.write(timeHeader())
37+
logs.write("\nRaw Tweets:\n{}".format(rdd.take(num=1)))
38+
logs.flush()
39+
raw_tweets.foreachRDD(writeRDD)
40+
```
2741

28-
## Clean Tweets
42+
The following line pre-processes the incoming tweets by applying an operation to each RDD in the DStream:
43+
1. Split the string by whitespace
44+
```
45+
clean_tweets = raw_tweets\
46+
.map(lambda x: x.split())\
47+
```
48+
2. Regualar expression to remove any non-alphanumeric characters
49+
```
50+
.map(lambda x: [re.sub(r'([^\s\w]|_)+', '', y) for y in x])\
51+
```
52+
3. Ensure all words are lower case
53+
```
54+
.map(lambda x: [word.lower() for word in x])\
55+
```
56+
4. Remove any empty string
57+
```
58+
.map(lambda x: [word for word in x if word != ''])\
59+
```
60+
5. Filter any words included in STOPWORDS, a file loaded during initialization
61+
```
62+
.map(lambda x: [word for word in x if word not in STOPWORDS])\
63+
```
64+
6. If any tweets are empty now, remove them:
65+
```
66+
.filter(lambda x: x != [])\
67+
```
68+
7. Assign label to each tweets
69+
```
70+
.map(assign_label)\
71+
```
72+
8. Remove tweets that don't belong to any label
73+
```
74+
.filter(lambda x: x != None)
75+
```
76+
77+
The `assign_label` function used above is defined as follows:
78+
79+
```python
80+
def assign_label(words):
81+
""" Assign labels to tweets. If this tweet has word resist in it,
82+
then we assign label resist to it. Else if it has word maga, we
83+
label it as maga. If it doesn't have either words, we return none.
84+
In the same time we remove the label word from the words.
85+
86+
Inputs: words: one tweet in form of a list of cleaned words.
87+
Output: labeled tweet: (label, words)
88+
"""
89+
if 'resist' in words:
90+
words = [x for x in words if x != 'resist']
91+
return('resist', words)
92+
if 'maga' in words:
93+
words = [x for x in words if x != 'maga']
94+
return('maga', words)
95+
```
96+
97+
This takes as input a tweet word list, and outputs a tuple of ('resist', tweet_word_list) if 'resist is contained inside the tweet, and returns ('maga', tweet_word_list) if 'maga' is contained within the tweet.
98+
99+
The final output is be a DStream containing tuples of structure (label, [tweet words])
29100

101+
30102
## Extract Feature Words
31103

32104
## Calculate Informativeness of Features
33105

34106
## Visualization
35107

108+
The TweetsStreamingPlot.py file defines the streaming plotting class. We will import this file and use it to initialize an object called `plot` and make streaming plot with method `plot.start(data)`. It will take the data input as a new column data source and push the updated plotting onto the Jupyter notebook.
109+
![streaming_plot](../images/streaming_plot.png)
110+
36111
---
37112

38113
# Project Contributors:

0 commit comments

Comments
 (0)