Skip to content

Commit 1e36690

Browse files
Aaron Kimballtdas
authored andcommitted
[SPARK-1285] Backporting updates to streaming docs to branch 0.9
Cherrypicked updates that have been added to master branch Author: Aaron Kimball <aaron@magnify.io> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Chen Chao <crazyjvm@gmail.com> Author: Andrew Or <andrewor14@gmail.com> Closes #183 from tdas/branch-0.9-streaming-docs and squashes the following commits: e1a988f [Tathagata Das] Added clean to run-tests 98c3e98 [Tathagata Das] Merge remote-tracking branch 'apache-github/branch-0.9' into branch-0.9-streaming-docs d792351 [Chen Chao] maintain arbitrary state data for each key e708f74 [Aaron Kimball] SPARK-1173. (#2) Fix typo in Java streaming example. 156bcd7 [Aaron Kimball] SPARK-1173. Improve scala streaming docs. 8849a96 [Andrew Or] Fix typos in Spark Streaming programming guide fbd66a5 [Chen Chao] Merge pull request #579 from CrazyJvm/patch-1.
1 parent 1cc979e commit 1e36690

File tree

2 files changed

+50
-23
lines changed

2 files changed

+50
-23
lines changed

dev/run-tests

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ set -e
3030
echo "========================================================================="
3131
echo "Running Spark unit tests"
3232
echo "========================================================================="
33-
sbt/sbt assembly test
33+
sbt/sbt clean assembly test
3434

3535
echo "========================================================================="
3636
echo "Running PySpark tests"

docs/streaming-programming-guide.md

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,21 @@ do is as follows.
5858

5959
<div class="codetabs">
6060
<div data-lang="scala" markdown="1" >
61+
First, we import the names of the Spark Streaming classes, and some implicit
62+
conversions from StreamingContext into our environment, to add useful methods to
63+
other classes we need (like DStream).
6164

62-
First, we create a
63-
[StreamingContext](api/streaming/index.html#org.apache.spark.streaming.StreamingContext) object,
64-
which is the main entry point for all streaming
65-
functionality. Besides Spark's configuration, we specify that any DStream will be processed
65+
[StreamingContext](api/streaming/index.html#org.apache.spark.streaming.StreamingContext) is the
66+
main entry point for all streaming functionality.
67+
68+
{% highlight scala %}
69+
import org.apache.spark.streaming._
70+
import org.apache.spark.streaming.StreamingContext._
71+
{% endhighlight %}
72+
73+
Then we create a
74+
[StreamingContext](api/streaming/index.html#org.apache.spark.streaming.StreamingContext) object.
75+
Besides Spark's configuration, we specify that any DStream will be processed
6676
in 1 second batches.
6777

6878
{% highlight scala %}
@@ -88,7 +98,7 @@ val words = lines.flatMap(_.split(" "))
8898
{% endhighlight %}
8999

90100
`flatMap` is a one-to-many DStream operation that creates a new DStream by
91-
generating multiple new records from each record int the source DStream. In this case,
101+
generating multiple new records from each record in the source DStream. In this case,
92102
each line will be split into multiple words and the stream of words is represented as the
93103
`words` DStream. Next, we want to count these words.
94104

@@ -98,7 +108,7 @@ val pairs = words.map(word => (word, 1))
98108
val wordCounts = pairs.reduceByKey(_ + _)
99109

100110
// Print a few of the counts to the console
101-
wordCount.print()
111+
wordCounts.print()
102112
{% endhighlight %}
103113

104114
The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
@@ -178,7 +188,7 @@ JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
178188
return i1 + i2;
179189
}
180190
});
181-
wordCount.print(); // Print a few of the counts to the console
191+
wordCounts.print(); // Print a few of the counts to the console
182192
{% endhighlight %}
183193

184194
The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
@@ -262,6 +272,24 @@ Time: 1357008430000 ms
262272
</td>
263273
</table>
264274

275+
If you plan to run the Scala code for Spark Streaming-based use cases in the Spark
276+
shell, you should start the shell with the SparkConfiguration pre-configured to
277+
discard old batches periodically:
278+
279+
{% highlight bash %}
280+
$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=10000 bin/spark-shell
281+
{% endhighlight %}
282+
283+
... and create your StreamingContext by wrapping the existing interactive shell
284+
SparkContext object, `sc`:
285+
286+
{% highlight scala %}
287+
val ssc = new StreamingContext(sc, Seconds(1))
288+
{% endhighlight %}
289+
290+
When working with the shell, you may also need to send a `^D` to your netcat session
291+
to force the pipeline to print the word counts to the console at the sink.
292+
265293
***************************************************************************************************
266294

267295
# Basics
@@ -428,9 +456,9 @@ KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...);
428456
</div>
429457
</div>
430458

431-
For more details on these additional sources, see the corresponding [API documentation]
432-
(#where-to-go-from-here). Furthermore, you can also implement your own custom receiver
433-
for your sources. See the [Custom Receiver Guide](streaming-custom-receivers.html).
459+
For more details on these additional sources, see the corresponding [API documentation](#where-to-go-from-here).
460+
Furthermore, you can also implement your own custom receiver for your sources. See the
461+
[Custom Receiver Guide](streaming-custom-receivers.html).
434462

435463
## Operations
436464
There are two kinds of DStream operations - _transformations_ and _output operations_. Similar to
@@ -511,7 +539,7 @@ common ones are as follows.
511539
<td> <b>updateStateByKey</b>(<i>func</i>) </td>
512540
<td> Return a new "state" DStream where the state for each key is updated by applying the
513541
given function on the previous state of the key and the new values for the key. This can be
514-
used to maintain arbitrary state data for each ket.</td>
542+
used to maintain arbitrary state data for each key.</td>
515543
</tr>
516544
<tr><td></td><td></td></tr>
517545
</table>
@@ -520,9 +548,8 @@ The last two transformations are worth highlighting again.
520548

521549
<h4>UpdateStateByKey Operation</h4>
522550

523-
The `updateStateByKey` operation allows
524-
you to main arbitrary stateful computation, where you want to maintain some state data and
525-
continuously update it with new information. To use this, you will have to do two steps.
551+
The `updateStateByKey` operation allows you to maintain arbitrary state while continuously updating
552+
it with new information. To use this, you will have to do two steps.
526553

527554
1. Define the state - The state can be of arbitrary data type.
528555
1. Define the state update function - Specify with a function how to update the state using the
@@ -925,7 +952,7 @@ exception saying so.
925952
## Monitoring
926953
Besides Spark's in-built [monitoring capabilities](monitoring.html),
927954
the progress of a Spark Streaming program can also be monitored using the [StreamingListener]
928-
(streaming/index.html#org.apache.spark.scheduler.StreamingListener) interface,
955+
(api/streaming/index.html#org.apache.spark.scheduler.StreamingListener) interface,
929956
which allows you to get statistics of batch processing times, queueing delays,
930957
and total end-to-end delays. Note that this is still an experimental API and it is likely to be
931958
improved upon (i.e., more information reported) in the future.
@@ -1000,11 +1027,11 @@ Since all data is modeled as RDDs with their lineage of deterministic operations
10001027
for output operations.
10011028

10021029
## Failure of the Driver Node
1003-
To allows a streaming application to operate 24/7, Spark Streaming allows a streaming computation
1030+
For a streaming application to operate 24/7, Spark Streaming allows a streaming computation
10041031
to be resumed even after the failure of the driver node. Spark Streaming periodically writes the
10051032
metadata information of the DStreams setup through the `StreamingContext` to a
10061033
HDFS directory (can be any Hadoop-compatible filesystem). This periodic
1007-
*checkpointing* can be enabled by setting a the checkpoint
1034+
*checkpointing* can be enabled by setting the checkpoint
10081035
directory using `ssc.checkpoint(<checkpoint directory>)` as described
10091036
[earlier](#rdd-checkpointing). On failure of the driver node,
10101037
the lost `StreamingContext` can be recovered from this information, and restarted.
@@ -1105,8 +1132,8 @@ classes. So, if you are using `getOrCreate`, then make sure that the checkpoint
11051132
explicitly deleted every time recompiled code needs to be launched.
11061133

11071134
This failure recovery can be done automatically using Spark's
1108-
[standalone cluster mode](spark-standalone.html), which allows any Spark
1109-
application's driver to be as well as, ensures automatic restart of the driver on failure (see
1135+
[standalone cluster mode](spark-standalone.html), which allows the driver of any Spark application
1136+
to be launched within the cluster and be restarted on failure (see
11101137
[supervise mode](spark-standalone.html#launching-applications-inside-the-cluster)). This can be
11111138
tested locally by launching the above example using the supervise mode in a
11121139
local standalone cluster and killing the java process running the driver (will be shown as
@@ -1123,7 +1150,7 @@ There are two different failure behaviors based on which input sources are used.
11231150
1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can
11241151
re-computed and therefore no data will be lost due to any failure.
11251152
1. _Using any input source that receives data through a network_ - The received input data is
1126-
replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost
1153+
replicated in memory to multiple nodes. Since all the data in the Spark worker's memory is lost
11271154
when the Spark driver fails, the past input data will not be accessible and driver recovers.
11281155
Hence, if stateful and window-based operations are used
11291156
(like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state
@@ -1133,11 +1160,11 @@ In future releases, we will support full recoverability for all input sources. N
11331160
non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams,
11341161
the system, upon restarting, will continue to receive and process new data.
11351162

1136-
To better understand the behavior of the system under driver failure with a HDFS source, lets
1163+
To better understand the behavior of the system under driver failure with a HDFS source, let's
11371164
consider what will happen with a file input stream. Specifically, in the case of the file input
11381165
stream, it will correctly identify new files that were created while the driver was down and
11391166
process them in the same way as it would have if the driver had not failed. To explain further
1140-
in the case of file input stream, we shall use an example. Lets say, files are being generated
1167+
in the case of file input stream, we shall use an example. Let's say, files are being generated
11411168
every second, and a Spark Streaming program reads every new file and output the number of lines
11421169
in the file. This is what the sequence of outputs would be with and without a driver failure.
11431170

0 commit comments

Comments
 (0)