Skip to content

Commit 51a79a7

Browse files
committed
[SPARK-6274][Streaming][Examples] Added examples streaming + sql examples.
Added Scala, Java and Python streaming examples showing DataFrame and SQL operations within streaming. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4975 from tdas/streaming-sql-examples and squashes the following commits: 705cba1 [Tathagata Das] Fixed python lint error 75a3fad [Tathagata Das] Fixed python lint error 5fbf789 [Tathagata Das] Removed empty lines at the end 874b943 [Tathagata Das] Added examples streaming + sql examples.
1 parent 55c4831 commit 51a79a7

File tree

4 files changed

+336
-0
lines changed

4 files changed

+336
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
/** Java Bean class to be used with the example JavaSqlNetworkWordCount. */
21+
public class JavaRecord implements java.io.Serializable {
22+
private String word;
23+
24+
public String getWord() {
25+
return word;
26+
}
27+
28+
public void setWord(String word) {
29+
this.word = word;
30+
}
31+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.util.regex.Pattern;
21+
22+
import com.google.common.collect.Lists;
23+
24+
import org.apache.spark.SparkConf;
25+
import org.apache.spark.SparkContext;
26+
import org.apache.spark.api.java.JavaRDD;
27+
import org.apache.spark.api.java.function.FlatMapFunction;
28+
import org.apache.spark.api.java.function.Function;
29+
import org.apache.spark.api.java.function.Function2;
30+
import org.apache.spark.sql.SQLContext;
31+
import org.apache.spark.sql.DataFrame;
32+
import org.apache.spark.api.java.StorageLevels;
33+
import org.apache.spark.streaming.Durations;
34+
import org.apache.spark.streaming.Time;
35+
import org.apache.spark.streaming.api.java.JavaDStream;
36+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
37+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
38+
39+
/**
40+
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
41+
* network every second.
42+
*
43+
* Usage: JavaSqlNetworkWordCount <hostname> <port>
44+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
45+
*
46+
* To run this on your local machine, you need to first run a Netcat server
47+
* `$ nc -lk 9999`
48+
* and then run the example
49+
* `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
50+
*/
51+
52+
public final class JavaSqlNetworkWordCount {
53+
private static final Pattern SPACE = Pattern.compile(" ");
54+
55+
public static void main(String[] args) {
56+
if (args.length < 2) {
57+
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
58+
System.exit(1);
59+
}
60+
61+
StreamingExamples.setStreamingLogLevels();
62+
63+
// Create the context with a 1 second batch size
64+
SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
65+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
66+
67+
// Create a JavaReceiverInputDStream on target ip:port and count the
68+
// words in input stream of \n delimited text (eg. generated by 'nc')
69+
// Note that no duplication in storage level only for running locally.
70+
// Replication necessary in distributed scenario for fault tolerance.
71+
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
72+
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
73+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
74+
@Override
75+
public Iterable<String> call(String x) {
76+
return Lists.newArrayList(SPACE.split(x));
77+
}
78+
});
79+
80+
// Convert RDDs of the words DStream to DataFrame and run SQL query
81+
words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
82+
@Override
83+
public Void call(JavaRDD<String> rdd, Time time) {
84+
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
85+
86+
// Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
87+
JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
88+
public JavaRecord call(String word) {
89+
JavaRecord record = new JavaRecord();
90+
record.setWord(word);
91+
return record;
92+
}
93+
});
94+
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRecord.class);
95+
96+
// Register as table
97+
wordsDataFrame.registerTempTable("words");
98+
99+
// Do word count on table using SQL and print it
100+
DataFrame wordCountsDataFrame =
101+
sqlContext.sql("select word, count(*) as total from words group by word");
102+
System.out.println("========= " + time + "=========");
103+
wordCountsDataFrame.show();
104+
return null;
105+
}
106+
});
107+
108+
ssc.start();
109+
ssc.awaitTermination();
110+
}
111+
}
112+
113+
/** Lazily instantiated singleton instance of SQLContext */
114+
class JavaSQLContextSingleton {
115+
static private transient SQLContext instance = null;
116+
static public SQLContext getInstance(SparkContext sparkContext) {
117+
if (instance == null) {
118+
instance = new SQLContext(sparkContext);
119+
}
120+
return instance;
121+
}
122+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
20+
network every second.
21+
22+
Usage: sql_network_wordcount.py <hostname> <port>
23+
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
24+
25+
To run this on your local machine, you need to first run a Netcat server
26+
`$ nc -lk 9999`
27+
and then run the example
28+
`$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
29+
"""
30+
31+
import os
32+
import sys
33+
34+
from pyspark import SparkContext
35+
from pyspark.streaming import StreamingContext
36+
from pyspark.sql import SQLContext, Row
37+
38+
39+
def getSqlContextInstance(sparkContext):
40+
if ('sqlContextSingletonInstance' not in globals()):
41+
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
42+
return globals()['sqlContextSingletonInstance']
43+
44+
45+
if __name__ == "__main__":
46+
if len(sys.argv) != 3:
47+
print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port> "
48+
exit(-1)
49+
host, port = sys.argv[1:]
50+
sc = SparkContext(appName="PythonSqlNetworkWordCount")
51+
ssc = StreamingContext(sc, 1)
52+
53+
# Create a socket stream on target ip:port and count the
54+
# words in input stream of \n delimited text (eg. generated by 'nc')
55+
lines = ssc.socketTextStream(host, int(port))
56+
words = lines.flatMap(lambda line: line.split(" "))
57+
58+
# Convert RDDs of the words DStream to DataFrame and run SQL query
59+
def process(time, rdd):
60+
print "========= %s =========" % str(time)
61+
62+
try:
63+
# Get the singleton instance of SQLContext
64+
sqlContext = getSqlContextInstance(rdd.context)
65+
66+
# Convert RDD[String] to RDD[Row] to DataFrame
67+
rowRdd = rdd.map(lambda w: Row(word=w))
68+
wordsDataFrame = sqlContext.createDataFrame(rowRdd)
69+
70+
# Register as table
71+
wordsDataFrame.registerTempTable("words")
72+
73+
# Do word count on table using SQL and print it
74+
wordCountsDataFrame = \
75+
sqlContext.sql("select word, count(*) as total from words group by word")
76+
wordCountsDataFrame.show()
77+
except:
78+
pass
79+
80+
words.foreachRDD(process)
81+
ssc.start()
82+
ssc.awaitTermination()
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.SparkContext
22+
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
24+
import org.apache.spark.util.IntParam
25+
import org.apache.spark.sql.SQLContext
26+
import org.apache.spark.storage.StorageLevel
27+
28+
/**
29+
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
30+
* network every second.
31+
*
32+
* Usage: SqlNetworkWordCount <hostname> <port>
33+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
34+
*
35+
* To run this on your local machine, you need to first run a Netcat server
36+
* `$ nc -lk 9999`
37+
* and then run the example
38+
* `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
39+
*/
40+
41+
object SqlNetworkWordCount {
42+
def main(args: Array[String]) {
43+
if (args.length < 2) {
44+
System.err.println("Usage: NetworkWordCount <hostname> <port>")
45+
System.exit(1)
46+
}
47+
48+
StreamingExamples.setStreamingLogLevels()
49+
50+
// Create the context with a 2 second batch size
51+
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
52+
val ssc = new StreamingContext(sparkConf, Seconds(2))
53+
54+
// Create a socket stream on target ip:port and count the
55+
// words in input stream of \n delimited text (eg. generated by 'nc')
56+
// Note that no duplication in storage level only for running locally.
57+
// Replication necessary in distributed scenario for fault tolerance.
58+
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
59+
val words = lines.flatMap(_.split(" "))
60+
61+
// Convert RDDs of the words DStream to DataFrame and run SQL query
62+
words.foreachRDD((rdd: RDD[String], time: Time) => {
63+
// Get the singleton instance of SQLContext
64+
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
65+
import sqlContext.implicits._
66+
67+
// Convert RDD[String] to RDD[case class] to DataFrame
68+
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
69+
70+
// Register as table
71+
wordsDataFrame.registerTempTable("words")
72+
73+
// Do word count on table using SQL and print it
74+
val wordCountsDataFrame =
75+
sqlContext.sql("select word, count(*) as total from words group by word")
76+
println(s"========= $time =========")
77+
wordCountsDataFrame.show()
78+
})
79+
80+
ssc.start()
81+
ssc.awaitTermination()
82+
}
83+
}
84+
85+
86+
/** Case class for converting RDD to DataFrame */
87+
case class Record(word: String)
88+
89+
90+
/** Lazily instantiated singleton instance of SQLContext */
91+
object SQLContextSingleton {
92+
93+
@transient private var instance: SQLContext = _
94+
95+
def getInstance(sparkContext: SparkContext): SQLContext = {
96+
if (instance == null) {
97+
instance = new SQLContext(sparkContext)
98+
}
99+
instance
100+
}
101+
}

0 commit comments

Comments
 (0)