Skip to content

Commit

Permalink
Added Cassandra examples
Browse files Browse the repository at this point in the history
  • Loading branch information
hakanilter committed Mar 24, 2018
1 parent 237e990 commit 490da34
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 90 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
# Kafka Spark Streaming
An example project for integrating Kafka and Spark Streaming in order to run streaming sql queries.

## NetworkQualityStreamingJob
An example Spark Streaming app which consumes network signal data and executes continuous SQL query.

## NetworkQualityCassandraJob
An example Spark Streaming app which consumes network signal data and writes to Cassandra with a foreach writer

## NetworkQualityAnalysisJob
An example Spark DataFrame app which creates a DF from Cassandra and executes an aggregation SQL query.

9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<kafka-avro-serializer.version>3.3.0</kafka-avro-serializer.version>
<avro.version>1.8.1</avro.version>
<spark.version>2.3.0</spark.version>
<cassandra.version>2.0.7</cassandra.version>
<gson.version>2.6</gson.version>
<slf4j.version>1.7.25</slf4j.version>
<junit.version>4.10</junit.version>
Expand Down Expand Up @@ -80,7 +81,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.tools.version}</artifactId>
<version>2.3.0</version>
<version>${spark.version}</version>
</dependency>
<!-- Cassandra -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.tools.version}</artifactId>
<version>${cassandra.version}</version>
</dependency>
<!-- Gson -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/
public class RandomNetworkDataProducer implements Runnable {

private static final long INCOMING_DATA_INTERVAL = 500;
private static final long INCOMING_DATA_INTERVAL = 10;

private static final Logger LOGGER = LoggerFactory.getLogger(RandomNetworkDataProducer.class);

Expand All @@ -45,9 +45,8 @@ public void run() {
List<String> deviceIds = IntStream.range(0, deviceCount)
.mapToObj(i-> UUID.randomUUID().toString())
.collect(Collectors.toList());

final int count = Integer.MAX_VALUE;
for (int i = 0; i < count; i++) {

for (int i = 0; i < Integer.MAX_VALUE; i++) {
NetworkData networkData = new NetworkData();

networkData.setDeviceId(deviceIds.get(random.nextInt(deviceCount-1)));
Expand All @@ -60,6 +59,8 @@ public void run() {
networkSignal.setRxSpeed((double) random.nextInt(100));
networkSignal.setTxSpeed((double) random.nextInt(100));
networkSignal.setTime(System.currentTimeMillis());
networkSignal.setLatitude((Math.random() * 10) + 1);
networkSignal.setLongitude((Math.random() * 10) + 1);
networkData.getSignals().add(networkSignal);
}

Expand All @@ -72,7 +73,9 @@ public void run() {
producer.send(record);

try {
Thread.sleep(INCOMING_DATA_INTERVAL);
if (INCOMING_DATA_INTERVAL > 0) {
Thread.sleep(INCOMING_DATA_INTERVAL);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/cassandra.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spark.master=local[*]
topic.names=network-data
bootstrap.servers=localhost:9092
cassandra.host=localhost
cassandra.keyspace=test
cassandra.table=network_signals
processing.time=10 seconds
40 changes: 40 additions & 0 deletions src/main/scala/com/datapyro/spark/NetworkQualityAnalysisJob.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.datapyro.spark

import com.datapyro.kafka.util.ConfigUtil
import org.apache.spark.sql.SparkSession

/**
* Cassandra Dataframe Example
* (Works with Spark 2.2.0)
*/
object NetworkQualityAnalysisJob extends App {

val config = ConfigUtil.getConfig("cassandra")

// spark config
val spark: SparkSession = SparkSession.builder
.master(config.getProperty("spark.master"))
.appName(getClass.getSimpleName)
.getOrCreate()

// prepare cassandra df
val df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "network_signals", "keyspace" -> "test", "cluster" -> "Test Cluster"))
.load()

df.printSchema()
df.createOrReplaceTempView("network_signals")

// execute sql
val sql =
"""
SELECT networkType, COUNT(*), AVG(rxSpeed), AVG(txSpeed), SUM(rxData), SUM(txData)
FROM network_signals
GROUP BY networkType
"""
spark.sql(sql).show()

spark.close()

}
121 changes: 121 additions & 0 deletions src/main/scala/com/datapyro/spark/NetworkQualityCassandraJob.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.datapyro.spark

import java.util.UUID

import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import com.datapyro.kafka.util.ConfigUtil
import com.datastax.driver.core._
import com.datastax.driver.core.querybuilder.{QueryBuilder => QB}

/**
* Cassandra Spark Streaming Foreach Example
*
* CREATE KEYSPACE test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
*
* CREATE TABLE network_signals (id TEXT PRIMARY KEY, deviceId TEXT, time TIMESTAMP, networkType TEXT, rxSpeed DOUBLE, txSpeed DOUBLE, rxData DOUBLE, txData DOUBLE, latitude DOUBLE, longitude DOUBLE);
*/
object NetworkQualityCassandraJob extends App {

val config = ConfigUtil.getConfig("cassandra")

// spark config
val spark: SparkSession = SparkSession.builder
.master(config.getProperty("spark.master"))
.appName(getClass.getSimpleName)
.getOrCreate()

import spark.implicits._

// define schema for json
val schema = StructType(
List(
StructField("deviceId", StringType, true),
StructField("time", LongType, true),
StructField("signals", ArrayType(StructType(Array(
StructField("time", LongType, true),
StructField("networkType", StringType, true),
StructField("rxSpeed", DoubleType, true),
StructField("txSpeed", DoubleType, true),
StructField("rxData", LongType, true),
StructField("txData", LongType, true),
StructField("latitude", DoubleType, true),
StructField("longitude", DoubleType, true)
))))
)
)

// create stream
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getProperty("bootstrap.servers"))
.option("subscribe", config.getProperty("topic.names"))
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json($"value", schema).alias("data"))

df.createOrReplaceTempView("network_signals")

val sql = "SELECT x.deviceId, x.signal.* FROM (SELECT data.deviceId, EXPLODE(data.signals) AS signal FROM network_signals) x"
val generateUUID = udf(() => UUID.randomUUID().toString)
val query = spark.sql(sql).withColumn("id", generateUUID())

// cassandra
val keyspace = config.getProperty("cassandra.keyspace")
val table = config.getProperty("cassandra.table")
val cluster = {
Cluster.builder()
.addContactPoint(config.getProperty("cassandra.host"))
.build()
}

val writer = new ForeachWriter[Row] {
var session: Session = null
var records: Int = 0
var start: Long = 0

override def open(partitionId: Long, version: Long) = {
start = System.currentTimeMillis()
session = cluster.connect(keyspace)
session != null
}

override def process(row: Row) = {
val query = {
QB.insertInto(table)
.value("deviceId", row.getString(0))
.value("time", row.getLong(1))
.value("networkType", row.getString(2))
.value("rxSpeed", row.getDouble(3))
.value("txSpeed", row.getDouble(4))
.value("rxData", row.getLong(5))
.value("txData", row.getLong(6))
.value("latitude", row.getDouble(7))
.value("longitude", row.getDouble(8))
.value("id", row.getString(9))
}
session.executeAsync(query)
records += 1
}

override def close(errorOrNull: Throwable) = {
if (session != null) session.close()
println(records + " records processed, takes " + (System.currentTimeMillis() - start) + " ms")
}
}

val result = query.writeStream
.trigger(Trigger.ProcessingTime(config.getProperty("processing.time")))
.outputMode(OutputMode.Append())
.foreach(writer)
.start()

result.awaitTermination()

}


This file was deleted.

Loading

0 comments on commit 490da34

Please sign in to comment.