-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathspark_app.py
55 lines (43 loc) · 1.91 KB
/
spark_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint_TwitterApp")
dataStream = ssc.socketTextStream("localhost",9009)
def aggregate_tags_count(new_values, total_sum):
return sum(new_values) + (total_sum or 0)
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def send_df_to_dashboard(df):
top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
url = 'http://localhost:5001/updateData'
request_data = {'label': str(top_tags), 'data': str(tags_count)}
response = requests.post(url, data=request_data)
def process_rdd(time, rdd):
print("----------- %s -----------" % str(time))
try:
sql_context = get_sql_context_instance(rdd.context)
row_rdd = rdd.map(lambda w: Row(hashtag=w[0].encode("utf-8"), hashtag_count=w[1]))
hashtags_df = sql_context.createDataFrame(row_rdd)
hashtags_df.registerTempTable("hashtags")
hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
hashtag_counts_df.show()
send_df_to_dashboard(hashtag_counts_df)
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
words = dataStream.flatMap(lambda line: line.split(" "))
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
tags_totals.foreachRDD(process_rdd)
ssc.start()
ssc.awaitTermination()