The Spark-Riak connector comes with several sample programs and demos:
- Simple Scala example
- Simple Scala Riak TS example
- Simple Scala DataFrame example
- Simple Scala Riak TS DataFrame example
- Simple Java example
- Simple Java Riak TS example
- OFAC demo
- Scala RiakTS Parquet Example
- Spark Streaming Examples
-
All of the examples assume that you have a Riak KV or Riak TS cluster installed and running on localhost:8087. You can follow these guides to setup a Riak KV or Riak TS cluster: Installing Riak KV and Installing Riak TS.
-
If you don't have SBT installed, go to the SBT download page and follow the installation instructions for your OS.
-
Then, install the dependencies (we will skip integration tests to speed things up):
sbt clean package assembly
- Go to examples repl folder:
cd examples/src/main/repl
-
Update conf/config.sh file with your settings if needed
-
Run the example or demo that you want by running
bin/run-example <class>
. For example:./bin/run-example SimpleScalaRiakExample
will run the SimpleScalaRiakExample example locally.
This Scala example demonstrates how to use Riak Spark connector to query all data from the corresponding bucket. This example creates test data in the Riak, pulls it back to Spark by utilizing RiakRDD, and as a result, calculates the number of values loaded from the Riak bucket.
Run it locally:
bin/run-example SimpleScalaRiakExample
Sources SimpleScalaRiakExample.scala
This Scala example demonstrates how to use Riak Spark connector to do range queries in TS and how to do the same in KV
Before running, you will need to create and activate a TS table called ts_weather_demo
if it does not already exist. You can find more information on creating and activating TS tables here, or you can run the following:
riak-admin bucket-type create ts_weather_demo '{"props":{"n_val":3, "table_def": "CREATE TABLE ts_weather_demo (weather VARCHAR NOT NULL, family VARCHAR NOT NULL, time TIMESTAMP NOT NULL, temperature DOUBLE, humidity DOUBLE, pressure DOUBLE, PRIMARY KEY ((weather, family, QUANTUM(time, 1, 'h')), weather, family, time))"}}'
riak-admin bucket-type activate ts_weather_demo
You can run the example locally with:
bin/run-example SimpleScalaRiakTSExample
Sources SimpleScalaRiakTSExample.scala
This Scala example demonstrates how to use Spark Dataframes with RiakKV
You can run the example locally with:
bin/run-example dataframes.SimpleScalaRiakDataframesExample
Sources SimpleScalaRiakDataframesExample.scala
This Scala example demonstrates how to use Spark Dataframes with Riak TS
Before running, you will need to create and activate a TS table called ts_weather_demo
if it does not already exist. You can find more information on creating and activating TS tables here, or you can run the following:
riak-admin bucket-type create ts_weather_demo '{"props":{"n_val":3, "table_def": "CREATE TABLE ts_weather_demo (weather VARCHAR NOT NULL, family VARCHAR NOT NULL, time TIMESTAMP NOT NULL, temperature DOUBLE, humidity DOUBLE, pressure DOUBLE, PRIMARY KEY ((weather, family, QUANTUM(time, 1, 'h')), weather, family, time))"}}'
riak-admin bucket-type activate ts_weather_demo
You can run the example locally with:
bin/run-example dataframes.SimpleScalaRiakTSDataframesExample
Sources SimpleScalaRiakTSDataframesExample.scala
This Java example demonstrates how to use Riak Spark connector to query query Riak KV. This example creates test data in the Riak, pulls it back to Spark by utilizing such features as full bucket read, 2i range query, 2i keys query, and query by keys.
Run it locally:
bin/run-example SimpleJavaRiakExample
Sources SimpleJavaRiakExample.java
This Java example demonstrates how to use Riak Spark connector to query query Riak TS.
This example creates test data in the Riak, pulls it back to Spark by utilizing range scan query.
Before running, you will need to create and activate a TS table called ts_weather_demo
if it does not already exist. You can find more information on creating and activating TS tables here, or you can run the following:
riak-admin bucket-type create ts_weather_demo '{"props":{"n_val":3, "table_def": "CREATE TABLE ts_weather_demo (weather VARCHAR NOT NULL, family VARCHAR NOT NULL, time TIMESTAMP NOT NULL, temperature DOUBLE, humidity DOUBLE, pressure DOUBLE, PRIMARY KEY ((weather, family, QUANTUM(time, 1, 'h')), weather, family, time))"}}'
riak-admin bucket-type activate ts_weather_demo
You can run the example locally with:
bin/run-example SimpleJavaRiakTSExample
Sources SimpleJavaRiakExample.java
This demo shows how Riak and Spark can be used to analyze semi-structured data using Scala.
As part of its enforcement efforts, the Office of Foreign Assets Control (OFAC) publishes a list of individuals and companies owned, controlled by, or acting for/on behalf of targeted countries. It also lists individuals, groups, and entities, such as terrorists and narcotics traffickers designated under programs that are not country-specific. Collectively, such individuals and companies are called "Specially Designated Nationals” (SDNs). Their assets are blocked and U.S. persons are generally prohibited from dealing with them.
More information about OFAC list can be found at www.treasury.gov
All the data required for this demo will be downloaded automatically from the public internet copies.
We will download a public copy of the SDN list from OFAC. In order to connect SDNs to their specific locations, we will be downloading an address list. And, finally, we will download a Stop words list for the proper word frequency analysis.
In the demo we are going to generate descriptive and summary statistics from the OFAC dataset. As a part of our analysis we are going to calculate the following:
- How many unique SDNs are there in total? (see output during execution)
- Which distinct SDN Types does this dataset contain? (see output during execution)
- How many banned individuals per country are in OFAC list? (supporting chart: bar plot)
- How many marine vessels are on the list by country and by vessel type? (supporting chart: heat map)
- What is the probability distribution of the vessel tonnage for vessels on the list? (supporting chart: histogram)
- What are the most common titles of the individuals on the list? (see output during execution)
You can run the demo locally by:
bin/run-example demos.ofac.OFACDemo
Sources OFACDemo.scala
Simple demo which illustrates how data can be extracted from Riak TS and saved as a parquet file.
Before running, you will need to create and activate a TS table called parquet_demo
if it does not already exist. You can find more information on creating and activating TS tables here, or you can run the following:
riak-admin bucket-type create parquet_demo '{"props":{"n_val":3, "table_def":"CREATE TABLE parquet_demo (site VARCHAR NOT NULL, species VARCHAR NOT NULL, measurementDate TIMESTAMP NOT NULL, latitude DOUBLE, longitude DOUBLE, value DOUBLE, PRIMARY KEY ((site, species, QUANTUM(measurementDate, 24, h)),site, species, measurementDate))"}}'
riak-admin bucket-type activate parquet_demo
You can run the example locally with:
bin/run-example parquet.ScalaRiakParquetExample
Sources ScalaRiakParquetExample.scala
The Spark-Riak Connector can be used with Spark Streaming. To demonstrate this usage, we will work through two small Scala examples, one for Riak KV and the other for Riak TS.
These examples require the use of Kafka. Please install Kafka and setup a Kafka broker prior to running this example. We will assume that there is a Kafka broker running at 127.0.0.1:9092
. Instructions for setting up Kafka topics can be found in this guide. You can create a broker with the following:
path/to/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
path/to/kafka/bin/kafka-server-start.sh config/server.properties
We also assume Riak KV/TS is installed and there is a Riak KV/TS node running at 127.0.0.1:8087
. You can find instruction to do so here.
Riak KV, Kafka and Spark master hostnames must be specified in config.sh prior to running the examples.
Important Note: config.sh will attempt to gather your local scala version. This version number will be used in run-example to pull the appropriate spark-streaming-kafka
and kafka
libraries from spark-packages.org. This can cause an error if your local Spark and the Spark-Riak-Connector were built with a different Scala version than your local Scala. For example, Spark 1.6.2 is built with Scala 2.10 by default. Therefore, in order for the streaming examples to work, the Scala version config.sh picks up must be the same as the Scala version that Spark and the Spark-Riak-Connector were built with. If your local Scala version is different than the version that Spark and the Spark-Riak-Connector were built with, you should change the Scala version in this line in config.sh to the version of Scala that Spark and the Spark-Riak-Connector were built with.
This example will start a stream from the Kafka topic ingest-kv
into the KV bucket test-data
. This stream will run until terminated. Whenever a message is produced for Kafka topic ingest-kv
, the Spark Streaming context will automatically stream the message from the topic into the KV bucket. To see this in action, we first need create the ingest-kv
topic:
path/to/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ingest-kv
Then, start the example:
bin/run-example streaming.StreamingKVExample
Next, we need to send a message to the Kafka topic ingest-kv
with the Kafka console producer script, which can be found in the Kafka directory:
/path/to/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ingest-kv
This script will read messages from the terminal and pass it to the topic. From the topic, the Spark Streaming context will write the message to Riak KV bucket test-data
. As an example put the following into the terminal:
{"time": "2016-01-01 08:30:00.000", "weather": "sunny", "temperature": 25.0, "humidity": 67.0, "pressure": 30.20, "family": "f"}
You should now be able to see this data entry in the KV bucket test-data
.
Sources StreamingKVExample.scala
Having seen how Spark Streaming works with KV buckets, let's now look at the TS table example.
This example will start a stream from the Kafka topic ingest-ts
into the TS table ts_weather_demo
. This stream will run until terminated. Whenever a message is produced for Kafka topic ingest-ts
, the Spark Streaming context will automatically stream the message from the topic into the TS table. To see this in action, we first need to create and activate the TS table. You can find more information on creating and activating TS tables here. For this demo we will create and activate the table with the following:
riak-admin bucket-type create ts_weather_demo '{"props":{"n_val":3, "table_def": "CREATE TABLE ts_weather_demo (weather VARCHAR NOT NULL, family VARCHAR NOT NULL, time TIMESTAMP NOT NULL, temperature DOUBLE, humidity DOUBLE, pressure DOUBLE, PRIMARY KEY ((weather, family, QUANTUM(time, 1, 'h')), weather, family, time))"}}'
riak-admin bucket-type activate ts_weather_demo
Now that we have created a TS table, we need to create the ingest-ts
topic:
path/to/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ingest-ts
Now, we can run the StreamingTSExample.scala
example with:
bin/run-example streaming.StreamingTSExample
Now that the stream is up and running, we need to send data to the Kafka topic. Let's start the Kafka console producer. This will allow us to stream messages from the terminal into the Kafka ingest-ts
topic.
/path/to/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ingest-ts
Now paste the following message into the terminal:
{"time": "2016-01-01 08:30:00.000", "weather": "sunny", "temperature": 25.0, "humidity": 67.0, "pressure": 30.20, "family": "f"}
You can check that this worked by doing a simple SQL query for the example data.
Sources StreamingTSExample.scala