Skip to content

Commit 7d171df

Browse files
committed
Added reference to JavaReceiverInputStream in examples and streaming guide.
1 parent 49edd7c commit 7d171df

File tree

5 files changed

+10
-6
lines changed

5 files changed

+10
-6
lines changed

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ by specifying the IP address and port of the data server.
147147

148148
{% highlight java %}
149149
// Create a DStream that will connect to serverIP:serverPort
150-
JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
150+
JavaReceiverInputDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
151151
{% endhighlight %}
152152

153153
This `lines` DStream represents the stream of data that will be received from the data

examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.spark.streaming.Duration;
2727
import org.apache.spark.streaming.api.java.JavaDStream;
2828
import org.apache.spark.streaming.api.java.JavaPairDStream;
29+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
2930
import org.apache.spark.streaming.api.java.JavaStreamingContext;
3031
import org.apache.spark.streaming.receiver.Receiver;
3132
import scala.Tuple2;
@@ -69,7 +70,7 @@ public static void main(String[] args) {
6970

7071
// Create a input stream with the custom receiver on target ip:port and count the
7172
// words in input stream of \n delimited text (eg. generated by 'nc')
72-
JavaDStream<String> lines = ssc.receiverStream(
73+
JavaReceiverInputDStream<String> lines = ssc.receiverStream(
7374
new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
7475
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
7576
@Override

examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public static void main(String[] args) {
5858
JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
5959
System.getenv("SPARK_HOME"),
6060
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
61-
JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
61+
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
6262

6363
flumeStream.count();
6464

examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.spark.streaming.Duration;
3030
import org.apache.spark.streaming.api.java.JavaDStream;
3131
import org.apache.spark.streaming.api.java.JavaPairDStream;
32+
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
3233
import org.apache.spark.streaming.api.java.JavaStreamingContext;
3334
import org.apache.spark.streaming.kafka.KafkaUtils;
3435
import scala.Tuple2;
@@ -73,7 +74,8 @@ public static void main(String[] args) {
7374
topicMap.put(topic, numThreads);
7475
}
7576

76-
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
77+
JavaPairReceiverInputDStream<String, String> messages =
78+
KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
7779

7880
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
7981
@Override

examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.streaming.examples;
1919

2020
import com.google.common.collect.Lists;
21+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
2122
import scala.Tuple2;
2223
import org.apache.spark.api.java.function.FlatMapFunction;
2324
import org.apache.spark.api.java.function.Function2;
@@ -57,9 +58,9 @@ public static void main(String[] args) {
5758
new Duration(1000), System.getenv("SPARK_HOME"),
5859
JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
5960

60-
// Create a NetworkInputDStream on target ip:port and count the
61+
// Create a JavaReceiverInputDStream on target ip:port and count the
6162
// words in input stream of \n delimited text (eg. generated by 'nc')
62-
JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
63+
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
6364
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
6465
@Override
6566
public Iterable<String> call(String x) {

0 commit comments

Comments
 (0)