You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/streaming-kinesis.md
+49-48Lines changed: 49 additions & 48 deletions
Original file line number
Diff line number
Diff line change
@@ -3,56 +3,57 @@ layout: global
3
3
title: Spark Streaming Kinesis Receiver
4
4
---
5
5
6
-
### Kinesis
7
-
Build notes:
8
-
<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li>
9
-
<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
10
-
<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
11
-
<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
12
-
<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li>
6
+
## Kinesis
7
+
###Design
8
+
<li>The KinesisReceiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License.</li>
9
+
<li>The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases.</li>
10
+
<li>The KCL uses DynamoDB to maintain all state. A DynamoDB table is created in the us-east-1 region (regardless of Kinesis stream region) during KCL initialization for each Kinesis application name.</li>
11
+
<li>A single KinesisReceiver can process many shards of a stream by spinning up multiple KinesisRecordProcessor threads.</li>
12
+
<li>You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread.</li>
13
+
<li>Horizontal scaling is achieved by autoscaling additional KinesisReceiver (separate processes) or spinning up new KinesisRecordProcessor threads within each KinesisReceiver - up to the number of current shards for a given stream, of course. Don't forget to autoscale back down!</li>
13
14
14
-
Kinesis examples notes:
15
-
<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
16
-
<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li>
17
-
<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li>
18
-
<li>Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.</li>
15
+
### Build
16
+
<li>Spark supports a Streaming KinesisReceiver, but it is not included in the default build due to Amazon Software Licensing (ASL) restrictions.</li>
17
+
<li>To build with the Kinesis Streaming Receiver and supporting ASL-licensed code, you must run the maven or sbt builds with the **-Pkinesis-asl** profile.</li>
18
+
<li>All KinesisReceiver-related code, examples, tests, and artifacts live in **$SPARK_HOME/extras/kinesis-asl/**.</li>
19
+
<li>Kinesis-based Spark Applications will need to link to the **spark-streaming-kinesis-asl** artifact that is built when **-Pkinesis-asl** is specified.</li>
20
+
<li>_**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
19
21
20
-
Deployment and runtime notes:
21
-
<li>A single KinesisReceiver can process many shards of a stream.</li>
22
-
<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li>
23
-
<li>You never need more KinesisReceivers than the number of shards in your stream.</li>
24
-
<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
25
-
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li>
26
-
<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/>
27
-
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
28
-
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
29
-
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
30
-
4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/>
31
-
</li>
32
-
<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/>
<li>Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
35
-
<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
36
-
retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li>
37
-
<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
38
-
Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh,
39
-
it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li>
22
+
###Example
23
+
<li>To build the Kinesis example, you must run the maven or sbt builds with the **-Pkinesis-asl** profile.</li>
24
+
<li>You need to setup a Kinesis stream at one of the valid Kinesis endpoints with 1 or more shards per the following: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
25
+
<li>Valid Kinesis endpoints can be found here: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
26
+
<li>When running **locally**, the example automatically determines the number of threads and KinesisReceivers to spin up based on the number of shards configured for the stream. Therefore, **local[n]** is not needed when starting the example as with other streaming examples.</li>
27
+
<li>While this example could use a single KinesisReceiver which spins up multiple KinesisRecordProcessor threads to process multiple shards, I wanted to demonstrate unioning multiple KinesisReceivers as a single DStream. (It's a bit confusing in local mode.)</li>
28
+
<li>**KinesisWordCountProducerASL** is provided to generate random records into the Kinesis stream for testing.</li>
29
+
<li>The example has been configured to immediately replicate incoming stream data to another node by using (StorageLevel.MEMORY_AND_DISK_2)
30
+
<li>Spark checkpointing is disabled because the example does not use any stateful or window-based DStream operations such as updateStateByKey and reduceByWindow. If those operations are introduced, you would need to enable checkpointing or risk losing data in the case of a failure.</li>
31
+
<li>Kinesis checkpointing is enabled. This means that the example will recover from a Kinesis failure.</li>
32
+
<li>The example uses InitialPositionInStream.LATEST strategy to pull from the latest tip of the stream if no Kinesis checkpoint info exists.</li>
33
+
<li>In our example, **KinesisWordCount** is the Kinesis application name for both the Scala and Java versions. The use of this application name is described next.</li>
40
34
41
-
Failure recovery notes:
42
-
<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/>
43
-
1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/>
44
-
2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/>
45
-
3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/>
35
+
###Deployment and Runtime
36
+
<li>A Kinesis application name must be unique for a given account and region.</li>
37
+
<li>A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization</li>
38
+
<li>This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL.</li>
39
+
<li>Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream.</li>
40
+
<li>If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch.</li>
41
+
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the KCL.</li>
42
+
<li>The KinesisReceiver uses the DefaultAWSCredentialsProviderChain for AWS credentials which searches for credentials in the following order of precedence:</br>
43
+
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
44
+
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
45
+
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
46
+
4) Instance profile credentials - delivered through the Amazon EC2 metadata service
46
47
</li>
47
-
<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li>
48
-
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li>
49
-
<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON)
50
-
or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li>
51
-
<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li>
52
-
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li>
53
-
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data
54
-
depending on the checkpoint frequency.</li>
55
-
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li>
48
+
49
+
###Fault-Tolerance
50
+
<li>The combination of Spark Streaming and Kinesis creates 2 different checkpoints that may occur at different intervals.</li>
51
+
<li>Checkpointing too frequently against Kinesis will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random backoff retry strategy.</li>
52
+
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last Kinesis checkpoint sequence number recorded per shard (stored in the DynamoDB table).</li>
53
+
<li>If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.</li>
54
+
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored.)</li>
55
+
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data.</li>
56
+
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency.</li>
56
57
<li>Record processing should be idempotent when possible.</li>
57
-
<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li>
58
-
<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li>
58
+
<li>A failed or latent KinesisRecordProcessor within the KinesisReceiver will be detected and automatically restarted by the KCL.</li>
59
+
<li>If possible, the KinesisReceiver should be shutdown cleanly in order to trigger a final checkpoint of all KinesisRecordProcessors to avoid duplicate record processing.</li>
0 commit comments