Skip to content

[SPARK-1981] Add AWS Kinesis streaming support #1434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ if [ -n "$1" ]; then
else
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
echo " - set MASTER=XX to use a specific master" 1>&2
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
echo " - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2
echo " (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2
exit 1
fi

Expand Down
3 changes: 2 additions & 1 deletion bin/run-example2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ rem Test that an argument was given
if not "x%1"=="x" goto arg_given
echo Usage: run-example ^<example-class^> [example-args]
echo - set MASTER=XX to use a specific master
echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)
echo - can use abbreviated example class name relative to com.apache.spark.examples
echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)
goto exit
:arg_given

Expand Down
4 changes: 2 additions & 2 deletions dev/audit-release/audit_release.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_url(url):
"spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
"spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
"spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
"spark-catalyst", "spark-sql", "spark-hive"
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
]
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)

Expand Down Expand Up @@ -136,7 +136,7 @@ def ensure_path_not_present(x):
os.chdir(original_dir)

# SBT application tests
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]:
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive", "sbt_app_kinesis"]:
os.chdir(app)
ret = run_cmd("sbt clean run", exit_on_failure=False)
test(ret == 0, "sbt application (%s)" % app)
Expand Down
7 changes: 7 additions & 0 deletions dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,12 @@ object SimpleApp {
println("Ganglia sink was loaded via spark-core")
System.exit(-1)
}

// Remove kinesis from default build due to ASL license issue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Good that you added this!

val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
if (foundKinesis) {
println("Kinesis was loaded via spark-core")
System.exit(-1)
}
}
}
28 changes: 28 additions & 0 deletions dev/audit-release/sbt_app_kinesis/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

name := "Kinesis Test"

version := "1.0"

scalaVersion := System.getenv.get("SCALA_VERSION")

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % System.getenv.get("SPARK_VERSION")

resolvers ++= Seq(
"Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"),
"Spray Repository" at "http://repo.spray.cc/")
33 changes: 33 additions & 0 deletions dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main.scala

import scala.util.Try

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
def main(args: Array[String]) {
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
if (!foundKinesis) {
println("Kinesis not loaded via kinesis-asl")
System.exit(-1)
}
}
}
4 changes: 2 additions & 2 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
-Dmaven.javadoc.skip=true \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
--batch-mode release:prepare

mvn -DskipTests \
-Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Dmaven.javadoc.skip=true \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
release:perform

cd ..
Expand Down
3 changes: 3 additions & 0 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ fi
if [ -z "$SBT_MAVEN_PROFILES_ARGS" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
fi

export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"

echo "SBT_MAVEN_PROFILES_ARGS=\"$SBT_MAVEN_PROFILES_ARGS\""

# Remove work directory
Expand Down
4 changes: 2 additions & 2 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
---

Spark Streaming can receive streaming data from any arbitrary data source beyond
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
This requires the developer to implement a *receiver* that is customized for receiving data from
the concerned data source. This guide walks through the process of implementing a custom receiver
and using it in a Spark Streaming application.
Expand Down Expand Up @@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" "))
...
{% endhighlight %}

The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).

</div>
<div data-lang="java" markdown="1">
Expand Down
58 changes: 58 additions & 0 deletions docs/streaming-kinesis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
layout: global
title: Spark Streaming Kinesis Receiver
---

### Kinesis
Build notes:
<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li>
<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li>

Kinesis examples notes:
<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li>
<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li>
<li>Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.</li>

Deployment and runtime notes:
<li>A single KinesisReceiver can process many shards of a stream.</li>
<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li>
<li>You never need more KinesisReceivers than the number of shards in your stream.</li>
<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li>
<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/>
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/>
</li>
<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/>
http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
<li>Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li>
<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh,
it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li>

Failure recovery notes:
<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/>
1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/>
2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/>
3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/>
</li>
<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li>
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li>
<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON)
or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li>
<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li>
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li>
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data
depending on the checkpoint frequency.</li>
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li>
<li>Record processing should be idempotent when possible.</li>
<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li>
<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li>
12 changes: 8 additions & 4 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ title: Spark Streaming Programming Guide
# Overview
Spark Streaming is an extension of the core Spark API that allows enables high-throughput,
fault-tolerant stream processing of live data streams. Data can be ingested from many sources
like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed using complex
like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be processed using complex
algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`.
Finally, processed data can be pushed out to filesystems, databases,
and live dashboards. In fact, you can apply Spark's in-built
Expand Down Expand Up @@ -38,7 +38,7 @@ stream of results in batches.

Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*,
which represents a continuous stream of data. DStreams can be created either from input data
stream from sources such as Kafka and Flume, or by applying high-level
stream from sources such as Kafka, Flume, and Kinesis, or by applying high-level
operations on other DStreams. Internally, a DStream is represented as a sequence of
[RDDs](api/scala/index.html#org.apache.spark.rdd.RDD).

Expand Down Expand Up @@ -313,7 +313,7 @@ To write your own Spark Streaming program, you will have to add the following de
artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION}}

For ingesting data from sources like Kafka and Flume that are not present in the Spark
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark
Streaming core
API, you will have to add the corresponding
artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example,
Expand All @@ -327,6 +327,7 @@ some of the common ones are as follows.
<tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> Kinesis<br/>(built separately)</td><td> kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> </td><td></td></tr>
</table>

Expand Down Expand Up @@ -442,7 +443,7 @@ see the API documentations of the relevant functions in
Scala and [JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext)
for Java.

Additional functionality for creating DStreams from sources such as Kafka, Flume, and Twitter
Additional functionality for creating DStreams from sources such as Kafka, Flume, Kinesis, and Twitter
can be imported by adding the right dependencies as explained in an
[earlier](#linking) section. To take the
case of Kafka, after adding the artifact `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` to the
Expand All @@ -467,6 +468,9 @@ For more details on these additional sources, see the corresponding [API documen
Furthermore, you can also implement your own custom receiver for your sources. See the
[Custom Receiver Guide](streaming-custom-receivers.html).

### Kinesis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the best place to have add documentation about Kinesis. I am planning to do a bit of a overhaul and add sections on Flume, Kafka, etc., though I still havent thought it through. Till then, can you add this documentation in a separate streaming-kinesis.md file, and I will incorporate it in my updates later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

[Kinesis](streaming-kinesis.html)

## Operations
There are two kinds of DStream operations - _transformations_ and _output operations_. Similar to
RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams
Expand Down
13 changes: 13 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@
<name>Spark Project Examples</name>
<url>http://spark.apache.org/</url>

<profiles>
<profile>
<id>kinesis-asl</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
96 changes: 96 additions & 0 deletions extras/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<!-- Kinesis integration is not included by default due to ASL-licensed code. -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Kinesis Integration</name>

<properties>
<sbt.project.name>kinesis-asl</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Loading