39
39
import org .apache .spark .streaming .api .java .JavaReceiverInputDStream ;
40
40
import org .apache .spark .streaming .api .java .JavaStreamingContext ;
41
41
42
-
43
42
/**
44
43
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
45
44
* second starting with initial value of word count.
@@ -65,17 +64,17 @@ public static void main(String[] args) {
65
64
StreamingExamples .setStreamingLogLevels ();
66
65
67
66
// Update the cumulative count function
68
- final Function2 <List <Integer >, Optional <Integer >, Optional <Integer >> updateFunction = new
69
- Function2 <List <Integer >, Optional <Integer >, Optional <Integer >>() {
70
- @ Override
71
- public Optional <Integer > call (List <Integer > values , Optional <Integer > state ) {
72
- Integer newSum = state .or (0 );
73
- for (Integer value : values ) {
74
- newSum += value ;
75
- }
76
- return Optional .of (newSum );
77
- }
78
- };
67
+ final Function2 <List <Integer >, Optional <Integer >, Optional <Integer >> updateFunction =
68
+ new Function2 <List <Integer >, Optional <Integer >, Optional <Integer >>() {
69
+ @ Override
70
+ public Optional <Integer > call (List <Integer > values , Optional <Integer > state ) {
71
+ Integer newSum = state .or (0 );
72
+ for (Integer value : values ) {
73
+ newSum += value ;
74
+ }
75
+ return Optional .of (newSum );
76
+ }
77
+ };
79
78
80
79
// Create the context with a 1 second batch size
81
80
SparkConf sparkConf = new SparkConf ().setAppName ("JavaStatefulNetworkWordCount" );
@@ -97,12 +96,13 @@ public Iterable<String> call(String x) {
97
96
}
98
97
});
99
98
100
- JavaPairDStream <String , Integer > wordsDstream = words .mapToPair (new PairFunction <String , String , Integer >() {
101
- @ Override
102
- public Tuple2 <String , Integer > call (String s ) {
103
- return new Tuple2 <String , Integer >(s , 1 );
104
- }
105
- });
99
+ JavaPairDStream <String , Integer > wordsDstream = words .mapToPair (
100
+ new PairFunction <String , String , Integer >() {
101
+ @ Override
102
+ public Tuple2 <String , Integer > call (String s ) {
103
+ return new Tuple2 <String , Integer >(s , 1 );
104
+ }
105
+ });
106
106
107
107
// This will give a Dstream made of state (which is the cumulative count of the words)
108
108
JavaPairDStream <String , Integer > stateDstream = wordsDstream .updateStateByKey (updateFunction ,
0 commit comments