Skip to content

Commit b20d90a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
2 parents 4e5d15e + c77f406 commit b20d90a

File tree

10 files changed

+166
-105
lines changed

10 files changed

+166
-105
lines changed

docs/streaming-kinesis.md

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,56 +3,57 @@ layout: global
33
title: Spark Streaming Kinesis Receiver
44
---
55

6-
### Kinesis
7-
Build notes:
8-
<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li>
9-
<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
10-
<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
11-
<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
12-
<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li>
6+
## Kinesis
7+
###Design
8+
<li>The KinesisReceiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License.</li>
9+
<li>The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases.</li>
10+
<li>The KCL uses DynamoDB to maintain all state. A DynamoDB table is created in the us-east-1 region (regardless of Kinesis stream region) during KCL initialization for each Kinesis application name.</li>
11+
<li>A single KinesisReceiver can process many shards of a stream by spinning up multiple KinesisRecordProcessor threads.</li>
12+
<li>You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread.</li>
13+
<li>Horizontal scaling is achieved by autoscaling additional KinesisReceiver (separate processes) or spinning up new KinesisRecordProcessor threads within each KinesisReceiver - up to the number of current shards for a given stream, of course. Don't forget to autoscale back down!</li>
1314

14-
Kinesis examples notes:
15-
<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
16-
<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li>
17-
<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li>
18-
<li>Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.</li>
15+
### Build
16+
<li>Spark supports a Streaming KinesisReceiver, but it is not included in the default build due to Amazon Software Licensing (ASL) restrictions.</li>
17+
<li>To build with the Kinesis Streaming Receiver and supporting ASL-licensed code, you must run the maven or sbt builds with the **-Pkinesis-asl** profile.</li>
18+
<li>All KinesisReceiver-related code, examples, tests, and artifacts live in **$SPARK_HOME/extras/kinesis-asl/**.</li>
19+
<li>Kinesis-based Spark Applications will need to link to the **spark-streaming-kinesis-asl** artifact that is built when **-Pkinesis-asl** is specified.</li>
20+
<li>_**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
1921

20-
Deployment and runtime notes:
21-
<li>A single KinesisReceiver can process many shards of a stream.</li>
22-
<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li>
23-
<li>You never need more KinesisReceivers than the number of shards in your stream.</li>
24-
<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
25-
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li>
26-
<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/>
27-
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
28-
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
29-
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
30-
4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/>
31-
</li>
32-
<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/>
33-
http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
34-
<li>Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
35-
<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
36-
retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li>
37-
<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
38-
Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh,
39-
it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li>
22+
###Example
23+
<li>To build the Kinesis example, you must run the maven or sbt builds with the **-Pkinesis-asl** profile.</li>
24+
<li>You need to setup a Kinesis stream at one of the valid Kinesis endpoints with 1 or more shards per the following: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
25+
<li>Valid Kinesis endpoints can be found here: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
26+
<li>When running **locally**, the example automatically determines the number of threads and KinesisReceivers to spin up based on the number of shards configured for the stream. Therefore, **local[n]** is not needed when starting the example as with other streaming examples.</li>
27+
<li>While this example could use a single KinesisReceiver which spins up multiple KinesisRecordProcessor threads to process multiple shards, I wanted to demonstrate unioning multiple KinesisReceivers as a single DStream. (It's a bit confusing in local mode.)</li>
28+
<li>**KinesisWordCountProducerASL** is provided to generate random records into the Kinesis stream for testing.</li>
29+
<li>The example has been configured to immediately replicate incoming stream data to another node by using (StorageLevel.MEMORY_AND_DISK_2)
30+
<li>Spark checkpointing is disabled because the example does not use any stateful or window-based DStream operations such as updateStateByKey and reduceByWindow. If those operations are introduced, you would need to enable checkpointing or risk losing data in the case of a failure.</li>
31+
<li>Kinesis checkpointing is enabled. This means that the example will recover from a Kinesis failure.</li>
32+
<li>The example uses InitialPositionInStream.LATEST strategy to pull from the latest tip of the stream if no Kinesis checkpoint info exists.</li>
33+
<li>In our example, **KinesisWordCount** is the Kinesis application name for both the Scala and Java versions. The use of this application name is described next.</li>
4034

41-
Failure recovery notes:
42-
<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/>
43-
1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/>
44-
2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/>
45-
3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/>
35+
###Deployment and Runtime
36+
<li>A Kinesis application name must be unique for a given account and region.</li>
37+
<li>A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization</li>
38+
<li>This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL.</li>
39+
<li>Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream.</li>
40+
<li>If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch.</li>
41+
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the KCL.</li>
42+
<li>The KinesisReceiver uses the DefaultAWSCredentialsProviderChain for AWS credentials which searches for credentials in the following order of precedence:</br>
43+
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
44+
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
45+
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
46+
4) Instance profile credentials - delivered through the Amazon EC2 metadata service
4647
</li>
47-
<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li>
48-
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li>
49-
<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON)
50-
or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li>
51-
<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li>
52-
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li>
53-
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data
54-
depending on the checkpoint frequency.</li>
55-
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li>
48+
49+
###Fault-Tolerance
50+
<li>The combination of Spark Streaming and Kinesis creates 2 different checkpoints that may occur at different intervals.</li>
51+
<li>Checkpointing too frequently against Kinesis will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random backoff retry strategy.</li>
52+
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last Kinesis checkpoint sequence number recorded per shard (stored in the DynamoDB table).</li>
53+
<li>If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.</li>
54+
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored.)</li>
55+
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data.</li>
56+
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency.</li>
5657
<li>Record processing should be idempotent when possible.</li>
57-
<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li>
58-
<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li>
58+
<li>A failed or latent KinesisRecordProcessor within the KinesisReceiver will be detected and automatically restarted by the KCL.</li>
59+
<li>If possible, the KinesisReceiver should be shutdown cleanly in order to trigger a final checkpoint of all KinesisRecordProcessors to avoid duplicate record processing.</li>

external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ class SparkSink extends AbstractSink with Logging with Configurable {
131131
blockingLatch.await()
132132
Status.BACKOFF
133133
}
134+
135+
private[flume] def getPort(): Int = {
136+
serverOpt
137+
.map(_.getPort)
138+
.getOrElse(
139+
throw new RuntimeException("Server was not started!")
140+
)
141+
}
134142
}
135143

136144
/**

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress
2222
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
2323
import java.util.Random
2424

25+
import org.apache.spark.TestUtils
26+
2527
import scala.collection.JavaConversions._
2628
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
2729

@@ -39,9 +41,6 @@ import org.apache.spark.util.Utils
3941

4042
class FlumePollingStreamSuite extends TestSuiteBase {
4143

42-
val random = new Random()
43-
/** Return a port in the ephemeral range. */
44-
def getTestPort = random.nextInt(16382) + 49152
4544
val batchCount = 5
4645
val eventsPerBatch = 100
4746
val totalEventsPerChannel = batchCount * eventsPerBatch
@@ -77,17 +76,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
7776
}
7877

7978
private def testFlumePolling(): Unit = {
80-
val testPort = getTestPort
81-
// Set up the streaming context and input streams
82-
val ssc = new StreamingContext(conf, batchDuration)
83-
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
84-
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
85-
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
86-
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
87-
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
88-
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
89-
outputStream.register()
90-
9179
// Start the channel and sink.
9280
val context = new Context()
9381
context.put("capacity", channelCapacity.toString)
@@ -98,10 +86,19 @@ class FlumePollingStreamSuite extends TestSuiteBase {
9886

9987
val sink = new SparkSink()
10088
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
101-
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
89+
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
10290
Configurables.configure(sink, context)
10391
sink.setChannel(channel)
10492
sink.start()
93+
// Set up the streaming context and input streams
94+
val ssc = new StreamingContext(conf, batchDuration)
95+
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
96+
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
97+
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
98+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
99+
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
100+
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
101+
outputStream.register()
105102
ssc.start()
106103

107104
writeAndVerify(Seq(channel), ssc, outputBuffer)
@@ -111,18 +108,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
111108
}
112109

113110
private def testFlumePollingMultipleHost(): Unit = {
114-
val testPort = getTestPort
115-
// Set up the streaming context and input streams
116-
val ssc = new StreamingContext(conf, batchDuration)
117-
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
118-
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
119-
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
120-
eventsPerBatch, 5)
121-
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
122-
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
123-
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
124-
outputStream.register()
125-
126111
// Start the channel and sink.
127112
val context = new Context()
128113
context.put("capacity", channelCapacity.toString)
@@ -136,17 +121,29 @@ class FlumePollingStreamSuite extends TestSuiteBase {
136121

137122
val sink = new SparkSink()
138123
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
139-
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
124+
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
140125
Configurables.configure(sink, context)
141126
sink.setChannel(channel)
142127
sink.start()
143128

144129
val sink2 = new SparkSink()
145130
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
146-
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
131+
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
147132
Configurables.configure(sink2, context)
148133
sink2.setChannel(channel2)
149134
sink2.start()
135+
136+
// Set up the streaming context and input streams
137+
val ssc = new StreamingContext(conf, batchDuration)
138+
val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
139+
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
140+
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
141+
eventsPerBatch, 5)
142+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
143+
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
144+
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
145+
outputStream.register()
146+
150147
ssc.start()
151148
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
152149
assertChannelIsEmpty(channel)

mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ object Statistics {
155155
* :: Experimental ::
156156
* Conduct Pearson's independence test for every feature against the label across the input RDD.
157157
* For each feature, the (feature, label) pairs are converted into a contingency matrix for which
158-
* the chi-squared statistic is computed.
158+
* the chi-squared statistic is computed. All label and feature values must be categorical.
159159
*
160160
* @param data an `RDD[LabeledPoint]` containing the labeled dataset with categorical features.
161161
* Real-valued features will be treated as categorical for each distinct value.

0 commit comments

Comments
 (0)