-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-1981] Add AWS Kinesis streaming support #1434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b3b0ff1
d18e680
cd68c0d
e7c8978
828f8ae
338997e
6c39561
21de67f
db3eefd
912640c
d17ca6d
bf614e9
e33cbeb
74e5c7c
0e1c67b
691a6be
0393795
4774581
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
|
||
name := "Kinesis Test" | ||
|
||
version := "1.0" | ||
|
||
scalaVersion := System.getenv.get("SCALA_VERSION") | ||
|
||
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % System.getenv.get("SPARK_VERSION") | ||
|
||
resolvers ++= Seq( | ||
"Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"), | ||
"Spray Repository" at "http://repo.spray.cc/") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package main.scala | ||
|
||
import scala.util.Try | ||
|
||
import org.apache.spark.SparkContext | ||
import org.apache.spark.SparkContext._ | ||
|
||
object SimpleApp { | ||
def main(args: Array[String]) { | ||
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess | ||
if (!foundKinesis) { | ||
println("Kinesis not loaded via kinesis-asl") | ||
System.exit(-1) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
--- | ||
layout: global | ||
title: Spark Streaming Kinesis Receiver | ||
--- | ||
|
||
### Kinesis | ||
Build notes: | ||
<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li> | ||
<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li> | ||
<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li> | ||
<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li> | ||
<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li> | ||
|
||
Kinesis examples notes: | ||
<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li> | ||
<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li> | ||
<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li> | ||
<li>Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.</li> | ||
|
||
Deployment and runtime notes: | ||
<li>A single KinesisReceiver can process many shards of a stream.</li> | ||
<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li> | ||
<li>You never need more KinesisReceivers than the number of shards in your stream.</li> | ||
<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li> | ||
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li> | ||
<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/> | ||
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/> | ||
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/> | ||
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/> | ||
4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/> | ||
</li> | ||
<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/> | ||
http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li> | ||
<li>Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li> | ||
<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service, | ||
retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li> | ||
<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization). | ||
Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh, | ||
it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li> | ||
|
||
Failure recovery notes: | ||
<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/> | ||
1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/> | ||
2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/> | ||
3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/> | ||
</li> | ||
<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li> | ||
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li> | ||
<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) | ||
or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li> | ||
<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li> | ||
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li> | ||
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data | ||
depending on the checkpoint frequency.</li> | ||
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li> | ||
<li>Record processing should be idempotent when possible.</li> | ||
<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li> | ||
<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li> |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ title: Spark Streaming Programming Guide | |
# Overview | ||
Spark Streaming is an extension of the core Spark API that allows enables high-throughput, | ||
fault-tolerant stream processing of live data streams. Data can be ingested from many sources | ||
like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed using complex | ||
like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be processed using complex | ||
algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`. | ||
Finally, processed data can be pushed out to filesystems, databases, | ||
and live dashboards. In fact, you can apply Spark's in-built | ||
|
@@ -38,7 +38,7 @@ stream of results in batches. | |
|
||
Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*, | ||
which represents a continuous stream of data. DStreams can be created either from input data | ||
stream from sources such as Kafka and Flume, or by applying high-level | ||
stream from sources such as Kafka, Flume, and Kinesis, or by applying high-level | ||
operations on other DStreams. Internally, a DStream is represented as a sequence of | ||
[RDDs](api/scala/index.html#org.apache.spark.rdd.RDD). | ||
|
||
|
@@ -313,7 +313,7 @@ To write your own Spark Streaming program, you will have to add the following de | |
artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}} | ||
version = {{site.SPARK_VERSION}} | ||
|
||
For ingesting data from sources like Kafka and Flume that are not present in the Spark | ||
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark | ||
Streaming core | ||
API, you will have to add the corresponding | ||
artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example, | ||
|
@@ -327,6 +327,7 @@ some of the common ones are as follows. | |
<tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr> | ||
<tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr> | ||
<tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr> | ||
<tr><td> Kinesis<br/>(built separately)</td><td> kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr> | ||
<tr><td> </td><td></td></tr> | ||
</table> | ||
|
||
|
@@ -442,7 +443,7 @@ see the API documentations of the relevant functions in | |
Scala and [JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext) | ||
for Java. | ||
|
||
Additional functionality for creating DStreams from sources such as Kafka, Flume, and Twitter | ||
Additional functionality for creating DStreams from sources such as Kafka, Flume, Kinesis, and Twitter | ||
can be imported by adding the right dependencies as explained in an | ||
[earlier](#linking) section. To take the | ||
case of Kafka, after adding the artifact `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` to the | ||
|
@@ -467,6 +468,9 @@ For more details on these additional sources, see the corresponding [API documen | |
Furthermore, you can also implement your own custom receiver for your sources. See the | ||
[Custom Receiver Guide](streaming-custom-receivers.html). | ||
|
||
### Kinesis | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not the best place to have add documentation about Kinesis. I am planning to do a bit of a overhaul and add sections on Flume, Kafka, etc., though I still havent thought it through. Till then, can you add this documentation in a separate streaming-kinesis.md file, and I will incorporate it in my updates later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved |
||
[Kinesis](streaming-kinesis.html) | ||
|
||
## Operations | ||
There are two kinds of DStream operations - _transformations_ and _output operations_. Similar to | ||
RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent</artifactId> | ||
<version>1.1.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<!-- Kinesis integration is not included by default due to ASL-licensed code. --> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Spark Kinesis Integration</name> | ||
|
||
<properties> | ||
<sbt.project.name>kinesis-asl</sbt.project.name> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-streaming_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-streaming_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>amazon-kinesis-client</artifactId> | ||
<version>${aws.kinesis.client.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk</artifactId> | ||
<version>${aws.java.sdk.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.scalatest</groupId> | ||
<artifactId>scalatest_${scala.binary.version}</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-all</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.scalacheck</groupId> | ||
<artifactId>scalacheck_${scala.binary.version}</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.easymock</groupId> | ||
<artifactId>easymockclassextension</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.novocode</groupId> | ||
<artifactId>junit-interface</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.scalatest</groupId> | ||
<artifactId>scalatest-maven-plugin</artifactId> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Good that you added this!