Skip to content

Commit 92db405

Browse files
committed
Fix Streaming Programming Guide. Change files according the selected language
1 parent 1768bd5 commit 92db405

File tree

2 files changed

+128
-4
lines changed

2 files changed

+128
-4
lines changed

docs/streaming-programming-guide.md

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,12 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi
878878
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
879879
{% endhighlight %}
880880

881+
The update function will be called for each word, with `newValues` having a sequence of 1's (from
882+
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
883+
Scala code, take a look at the example
884+
[StatefulNetworkWordCount.scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache
885+
/spark/examples/streaming/StatefulNetworkWordCount.scala).
886+
881887
</div>
882888
<div data-lang="java" markdown="1">
883889

@@ -899,6 +905,13 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi
899905
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
900906
{% endhighlight %}
901907

908+
The update function will be called for each word, with `newValues` having a sequence of 1's (from
909+
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
910+
Scala code, take a look at the example
911+
[JavaStatefulNetworkWordCount.java]({{site
912+
.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming
913+
/JavaStatefulNetworkWordCount.java).
914+
902915
</div>
903916
<div data-lang="python" markdown="1">
904917

@@ -916,14 +929,14 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi
916929
runningCounts = pairs.updateStateByKey(updateFunction)
917930
{% endhighlight %}
918931

919-
</div>
920-
</div>
921-
922932
The update function will be called for each word, with `newValues` having a sequence of 1's (from
923933
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
924-
Scala code, take a look at the example
934+
Python code, take a look at the example
925935
[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py).
926936

937+
</div>
938+
</div>
939+
927940
Note that using `updateStateByKey` requires the checkpoint directory to be configured, which is
928941
discussed in detail in the [checkpointing](#checkpointing) section.
929942

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.util.List;
21+
import java.util.regex.Pattern;
22+
23+
import scala.Tuple2;
24+
25+
import com.google.common.base.Optional;
26+
import com.google.common.collect.Lists;
27+
28+
import org.apache.spark.SparkConf;
29+
import org.apache.spark.api.java.StorageLevels;
30+
import org.apache.spark.api.java.function.FlatMapFunction;
31+
import org.apache.spark.api.java.function.Function2;
32+
import org.apache.spark.api.java.function.PairFunction;
33+
import org.apache.spark.streaming.Durations;
34+
import org.apache.spark.streaming.api.java.JavaDStream;
35+
import org.apache.spark.streaming.api.java.JavaPairDStream;
36+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
37+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
38+
39+
/**
40+
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
41+
* second starting with initial value of word count.
42+
* Usage: JavaStatefulNetworkWordCount <hostname> <port>
43+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
44+
* data.
45+
* <p/>
46+
* To run this on your local machine, you need to first run a Netcat server
47+
* `$ nc -lk 9999`
48+
* and then run the example
49+
* `$ bin/run-example
50+
* org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999`
51+
*/
52+
public class JavaStatefulNetworkWordCount {
53+
private static final Pattern SPACE = Pattern.compile(" ");
54+
55+
public static void main(String[] args) {
56+
if (args.length < 2) {
57+
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>");
58+
System.exit(1);
59+
}
60+
61+
StreamingExamples.setStreamingLogLevels();
62+
63+
// Create the context with a 1 second batch size
64+
SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
65+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
66+
ssc.checkpoint(".");
67+
68+
// Create a JavaReceiverInputDStream on target ip:port and count the
69+
// words in input stream of \n delimited text (eg. generated by 'nc')
70+
// Note that no duplication in storage level only for running locally.
71+
// Replication necessary in distributed scenario for fault tolerance.
72+
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
73+
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
74+
75+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
76+
@Override
77+
public Iterable<String> call(String x) {
78+
return Lists.newArrayList(SPACE.split(x));
79+
}
80+
});
81+
82+
JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
83+
new PairFunction<String, String, Integer>() {
84+
@Override
85+
public Tuple2<String, Integer> call(String s) {
86+
return new Tuple2<String, Integer>(s, 1);
87+
}
88+
});
89+
90+
// Update the cumulative count function
91+
final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new
92+
Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
93+
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state)
94+
throws Exception {
95+
Integer newSum = state.or(0);
96+
for (Integer value : values) {
97+
newSum += value;
98+
}
99+
100+
return Optional.of(newSum);
101+
}
102+
};
103+
104+
// This will give a Dstream made of state (which is the cumulative count of the words)
105+
JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction);
106+
107+
stateDstream.print();
108+
ssc.start();
109+
ssc.awaitTermination();
110+
}
111+
}

0 commit comments

Comments
 (0)