|
17 | 17 |
|
18 | 18 | package org.apache.spark.examples.streaming;
|
19 | 19 |
|
20 |
| -import static java.util.Arrays.asList; |
21 |
| - |
| 20 | +import java.util.Arrays; |
22 | 21 | import java.util.List;
|
23 | 22 | import java.util.regex.Pattern;
|
24 | 23 |
|
| 24 | +import scala.Tuple2; |
| 25 | + |
| 26 | +import com.google.common.base.Optional; |
| 27 | +import com.google.common.collect.Lists; |
| 28 | + |
25 | 29 | import org.apache.spark.HashPartitioner;
|
26 | 30 | import org.apache.spark.SparkConf;
|
27 | 31 | import org.apache.spark.api.java.JavaPairRDD;
|
|
35 | 39 | import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
|
36 | 40 | import org.apache.spark.streaming.api.java.JavaStreamingContext;
|
37 | 41 |
|
38 |
| -import com.google.common.base.Optional; |
39 |
| -import com.google.common.collect.Lists; |
40 |
| - |
41 |
| -import scala.Tuple2; |
42 | 42 |
|
43 | 43 | /**
|
44 | 44 | * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
|
|
54 | 54 | * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999`
|
55 | 55 | */
|
56 | 56 | public class JavaStatefulNetworkWordCount {
|
57 |
| - private static final Pattern SPACE = Pattern.compile(" "); |
58 |
| - |
59 |
| - public static void main(String[] args) { |
60 |
| - if (args.length < 2) { |
61 |
| - System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>"); |
62 |
| - System.exit(1); |
63 |
| - } |
64 |
| - |
65 |
| - StreamingExamples.setStreamingLogLevels(); |
66 |
| - |
67 |
| - // Update the cumulative count function |
68 |
| - final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new |
69 |
| - Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { |
70 |
| - @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { |
71 |
| - Integer newSum = state.or(0); |
72 |
| - for (Integer value : values) { |
73 |
| - newSum += value; |
74 |
| - } |
75 |
| - |
76 |
| - return Optional.of(newSum); |
77 |
| - } |
78 |
| - }; |
79 |
| - |
80 |
| - // Create the context with a 1 second batch size |
81 |
| - SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); |
82 |
| - JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); |
83 |
| - ssc.checkpoint("."); |
84 |
| - |
85 |
| - // Initial RDD input to updateStateByKey |
86 |
| - JavaPairRDD<String, Integer> initialRDD = ssc.sc() |
87 |
| - .parallelizePairs(asList(new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer> |
88 |
| - ("world", 1))); |
89 |
| - |
90 |
| - JavaReceiverInputDStream<String> lines = ssc.socketTextStream( |
91 |
| - args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); |
92 |
| - |
93 |
| - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { |
94 |
| - @Override |
95 |
| - public Iterable<String> call(String x) { |
96 |
| - return Lists.newArrayList(SPACE.split(x)); |
97 |
| - } |
98 |
| - }); |
99 |
| - |
100 |
| - JavaPairDStream<String, Integer> wordsDstream = words.mapToPair( |
101 |
| - new PairFunction<String, String, Integer>() { |
102 |
| - @Override |
103 |
| - public Tuple2<String, Integer> call(String s) { |
104 |
| - return new Tuple2<String, Integer>(s, 1); |
105 |
| - } |
106 |
| - }); |
107 |
| - |
108 |
| - // This will give a Dstream made of state (which is the cumulative count of the words) |
109 |
| - JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction, new |
110 |
| - HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); |
111 |
| - |
112 |
| - stateDstream.print(); |
113 |
| - ssc.start(); |
114 |
| - ssc.awaitTermination(); |
| 57 | + private static final Pattern SPACE = Pattern.compile(" "); |
| 58 | + |
| 59 | + public static void main(String[] args) { |
| 60 | + if (args.length < 2) { |
| 61 | + System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>"); |
| 62 | + System.exit(1); |
115 | 63 | }
|
| 64 | + |
| 65 | + StreamingExamples.setStreamingLogLevels(); |
| 66 | + |
| 67 | + // Update the cumulative count function |
| 68 | + final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new |
| 69 | + Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { |
| 70 | + @Override |
| 71 | + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { |
| 72 | + Integer newSum = state.or(0); |
| 73 | + for (Integer value : values) { |
| 74 | + newSum += value; |
| 75 | + } |
| 76 | + return Optional.of(newSum); |
| 77 | + } |
| 78 | + }; |
| 79 | + |
| 80 | + // Create the context with a 1 second batch size |
| 81 | + SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); |
| 82 | + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); |
| 83 | + ssc.checkpoint("."); |
| 84 | + |
| 85 | + // Initial RDD input to updateStateByKey |
| 86 | + JavaPairRDD<String, Integer> initialRDD = ssc.sc() |
| 87 | + .parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer> |
| 88 | + ("world", 1))); |
| 89 | + |
| 90 | + JavaReceiverInputDStream<String> lines = ssc.socketTextStream( |
| 91 | + args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); |
| 92 | + |
| 93 | + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { |
| 94 | + @Override |
| 95 | + public Iterable<String> call(String x) { |
| 96 | + return Lists.newArrayList(SPACE.split(x)); |
| 97 | + } |
| 98 | + }); |
| 99 | + |
| 100 | + JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(new PairFunction<String, String, Integer>() { |
| 101 | + @Override |
| 102 | + public Tuple2<String, Integer> call(String s) { |
| 103 | + return new Tuple2<String, Integer>(s, 1); |
| 104 | + } |
| 105 | + }); |
| 106 | + |
| 107 | + // This will give a Dstream made of state (which is the cumulative count of the words) |
| 108 | + JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction, new |
| 109 | + HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); |
| 110 | + |
| 111 | + stateDstream.print(); |
| 112 | + ssc.start(); |
| 113 | + ssc.awaitTermination(); |
| 114 | + } |
116 | 115 | }
|
0 commit comments