Skip to content

Commit 1efbaa4

Browse files
authored
Saving Kafka offsets to Cassandra (#2)
* todo: fix stream provider * fix: StreamSinkProvider must not be abstract * trying update format for cassandra stream provider * wip: saving offset to Kafka when checkpointing is not possible * wip: comments about sync ops * wip: get offsets from kafka and save in it 2 transactions * saving offsets to Cassandra finally works * change saving method to cassandra * cleaner code * doc: add blog article about kafka
1 parent 1aba223 commit 1efbaa4

11 files changed

+229
-71
lines changed

README.md

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@ Then, Kafka to Cassandra
1212
## Output data
1313
Stored inside Kafka and Cassandra for example only.
1414
Cassandra's Sinks uses the [ForeachWriter](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter) and also the [StreamSinkProvider](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.sources.StreamSinkProvider) to compare both sinks.
15-
One is using the Datastax's Cassandra saveToCassandra method. The other another method, more messy, that uses CQL on a custom foreach loop.
15+
16+
One is using the Datastax's Cassandra saveToCassandra method. The other another method, messier (untyped), that uses CQL on a custom foreach loop.
17+
18+
From Spark's doc about batch duration:
19+
> Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. If a trigger time is missed because the previous processing has not completed, then the system will attempt to trigger at the next trigger point, not immediately after the processing has completed.
20+
1621
### Kafka topic
17-
topic:test
22+
One topic "test" with only one partition
23+
1824
### Cassandra Table
1925
A table for the ForeachWriter
2026
```
@@ -38,7 +44,18 @@ CREATE TABLE test.radioOtherSink (
3844
);
3945
```
4046

47+
A 3rd sink to store **kafka metadata** in case checkpointing is not available (application upgrade for example)
48+
```
49+
CREATE TABLE test.kafkaMetadata (
50+
partition int,
51+
offset bigint,
52+
PRIMARY KEY (partition)
53+
);
54+
```
55+
4156

57+
#### Table Content
58+
##### Radio
4259
```
4360
cqlsh> SELECT * FROM test.radio;
4461
@@ -56,16 +73,36 @@ cqlsh> SELECT * FROM test.radio;
5673
5774
```
5875

76+
##### Kafka Metadata
77+
When doing an application upgrade, we cannot use [checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing), so we need to store our offset into a external datasource, here Cassandra is chosen.
78+
Then, when starting our kafka source we need to use the option "StartingOffsets" with a json string like
79+
```
80+
""" {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
81+
```
82+
Learn more [in the official Spark's doc](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries).
83+
84+
In the case, there is not Kafka's metadata stored inside Cassandra, **earliest** is used.
85+
86+
```
87+
cqlsh> SELECT * FROM test.kafkametadata;
88+
partition | offset
89+
-----------+--------
90+
0 | 171
91+
```
92+
5993
## Useful links
94+
* [Processing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2](https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)
6095
* https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html
6196
* https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
6297
* https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
6398

6499
## Inspired by
65100
* https://github.com/ansrivas/spark-structured-streaming
66-
* From Holden Karau's High Performance Spark : https://github.com/holdenk/spark-structured-streaming-ml/blob/master/src/main/scala/com/high-performance-spark-examples/structuredstreaming/CustomSink.scala#L66
101+
* [Holden Karau's High Performance Spark](https://github.com/holdenk/spark-structured-streaming-ml/blob/master/src/main/scala/com/high-performance-spark-examples/structuredstreaming/CustomSink.scala#L66)
102+
* [Jay Kreps blog articles](https://medium.com/@jaykreps/exactly-once-support-in-apache-kafka-55e1fdd0a35f)
67103

68104
## Requirements
105+
@TODO docker compose
69106
* Cassandra 3.10
70107
* Kafka 0.10+ (with Zookeeper)
71108

src/main/scala/Main.scala

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,38 @@
11
package main
22

33
import cassandra.CassandraDriver
4-
import kafka.{KafkaService, KafkaSink, KafkaSource}
4+
import kafka.{KafkaSink, KafkaSource}
55
import parquetHelper.ParquetService
66
import spark.SparkHelper
77

88
object Main {
99

10-
1110
def main(args: Array[String]) {
1211
val spark = SparkHelper.getAndConfigureSparkSession()
1312

1413
//Classic Batch
15-
ParquetService.batchWay()
14+
//ParquetService.batchWay()
1615

17-
//Stream
16+
//Generate a "fake" stream from a parquet file
1817
val staticInputDF = ParquetService.streamingWay()
1918

20-
//Stream To Kafka
19+
//Send it to Kafka for our example
2120
val queryToKafka = KafkaSink.writeStream(staticInputDF)
2221

23-
//Read from Kafka
24-
val kafkaInputDF = KafkaSource.read()
22+
//Finally read it from kafka, in case checkpointing is not available we read last offsets saved from Cassandra
23+
val (startingOption, partitionsAndOffsets) = CassandraDriver.getKafaMetadata()
24+
val kafkaInputDF = KafkaSource.read(startingOption, partitionsAndOffsets)
2525

26-
//Debug Kafka input Stream
26+
//Just debugging Kafka source into our console
2727
KafkaSink.debugStream(kafkaInputDF)
2828

29-
CassandraDriver.getTestInfo()
30-
//Saving using the foreach method
31-
CassandraDriver.saveForeach(kafkaInputDF)
32-
3329
//Saving using Datastax connector's saveToCassandra method
3430
CassandraDriver.saveStreamSinkProvider(kafkaInputDF)
3531

36-
//@TODO debug
37-
CassandraDriver.debug()
32+
//Saving using the foreach method
33+
//CassandraDriver.saveForeach(kafkaInputDF) //Untype/unsafe method using CQL --> just here for example
3834

35+
//Wait for all streams to finish
3936
spark.streams.awaitAnyTermination()
4037
}
4138
}

src/main/scala/cassandra/CassandraDriver.scala

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package cassandra
33
import org.apache.spark.sql._
44
import org.apache.spark.sql.cassandra._
55
import com.datastax.spark.connector._
6-
import kafka.KafkaService
6+
import com.datastax.spark.connector.cql.CassandraConnector
7+
import kafka.{KafkaMetadata, KafkaService}
78
import radio.SimpleSongAggregation
89
import spark.SparkHelper
910
import sink._
@@ -12,23 +13,31 @@ object CassandraDriver {
1213
private val spark = SparkHelper.getSparkSession()
1314
import spark.implicits._
1415

16+
val connector = CassandraConnector(SparkHelper.getSparkSession().sparkContext.getConf)
17+
1518
val namespace = "test"
1619
val foreachTableSink = "radio"
17-
val StreamProviderTableSink = "radioOtherSink"
20+
val StreamProviderTableSink = "radioothersink"
21+
val kafkaMetadata = "kafkametadata"
1822

1923
def getTestInfo() = {
20-
val rdd = spark.sparkContext.cassandraTable("test", "kv")
21-
println(rdd.count)
22-
println(rdd.first)
23-
println(rdd.map(_.getInt("value")).sum)
24+
val rdd = spark.sparkContext.cassandraTable(namespace, kafkaMetadata)
25+
26+
if( !rdd.isEmpty ) {
27+
println(rdd.count)
28+
println(rdd.first)
29+
} else {
30+
println(s"$namespace, $kafkaMetadata is empty in cassandra")
31+
}
2432
}
2533

2634

2735
/**
2836
* remove kafka metadata and only focus on business structure
2937
*/
30-
private def getDatasetForCassandra(df: DataFrame) = {
31-
df.select(KafkaService.radioStructureName + ".*").as[SimpleSongAggregation]
38+
def getDatasetForCassandra(df: DataFrame) = {
39+
df.select(KafkaService.radioStructureName + ".*")
40+
.as[SimpleSongAggregation]
3241
}
3342

3443
//https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
@@ -39,7 +48,7 @@ object CassandraDriver {
3948
ds
4049
.writeStream
4150
.queryName("KafkaToCassandraForeach")
42-
.format("update")
51+
//.outputMode("update")
4352
.foreach(new CassandraSinkForeach())
4453
.start()
4554
}
@@ -48,13 +57,51 @@ object CassandraDriver {
4857
df
4958
.writeStream
5059
.format("cassandra.sink.CassandraSinkProvider")
60+
.outputMode("update")
5161
.queryName("KafkaToCassandraStreamSinkProvider")
52-
.format("update") //@TODO check how to handle this in a custom StreakSnkProvider
5362
.start()
5463
}
5564

65+
/**
66+
* @TODO handle more topic name, for our example we only use the topic "test"
67+
*
68+
* we can use collect here as kafkameta data is not big at all
69+
*
70+
* if no metadata are found, we would use the earliest offsets.
71+
*
72+
* @see https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-batch
73+
* assign json string {"topicA":[0,1],"topicB":[2,4]}
74+
* Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source.
75+
*/
76+
def getKafaMetadata() = {
77+
val kafkaMetadataRDD = spark.sparkContext.cassandraTable(namespace, kafkaMetadata)
78+
79+
val output = if(kafkaMetadataRDD.isEmpty) {
80+
("startingOffsets", "earliest")
81+
} else {
82+
("startingOffsets", transformKafkaMetadataArrayToJson( kafkaMetadataRDD.collect() ) )
83+
}
84+
85+
println("getKafkaMetadata " + output.toString)
86+
87+
output
88+
}
89+
90+
/**
91+
* @param array
92+
* @return {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
93+
*/
94+
def transformKafkaMetadataArrayToJson(array: Array[CassandraRow]) : String = {
95+
s"""{"${KafkaService.topicName}":
96+
{
97+
"${array(0).getLong("partition")}": ${array(0).getLong("offset")}
98+
}
99+
}
100+
""".replaceAll("\n", "").replaceAll(" ", "")
101+
}
102+
56103
def debug() = {
57-
val output = spark.sparkContext.cassandraTable("test", "radio")
104+
val output = spark.sparkContext.cassandraTable(namespace, foreachTableSink)
58105

59106
println(output.count)
60107
/* output
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package cassandra
2+
3+
import kafka.KafkaMetadata
4+
5+
object CassandraKafkaMetadata {
6+
private def cql(metadata: KafkaMetadata): String = s"""
7+
INSERT INTO test.kafkametadata (partition, offset)
8+
VALUES(${metadata.partition}, ${metadata.offset})
9+
"""
10+
11+
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connection-pooling
12+
def save(metadata: KafkaMetadata) = {
13+
CassandraDriver.connector.withSessionDo(session =>
14+
session.execute(cql(metadata))
15+
)
16+
}
17+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package cassandra.sink
2+
3+
import cassandra.{CassandraDriver, CassandraKafkaMetadata}
4+
import org.apache.spark.sql.DataFrame
5+
import org.apache.spark.sql.execution.streaming.Sink
6+
import org.apache.spark.sql.functions.max
7+
import spark.SparkHelper
8+
import cassandra.CassandraDriver
9+
import com.datastax.spark.connector._
10+
import kafka.KafkaMetadata
11+
import org.apache.spark.sql.execution.streaming.Sink
12+
import org.apache.spark.sql.types.LongType
13+
14+
/**
15+
* must be idempotent and synchronous (@TODO check asynchronous/synchronous from Datastax's Spark connector) sink
16+
*/
17+
class CassandraSink() extends Sink {
18+
private val spark = SparkHelper.getSparkSession()
19+
import spark.implicits._
20+
import org.apache.spark.sql.functions._
21+
22+
private def saveToCassandra(df: DataFrame) = {
23+
println("Saving this DF to Cassandra")
24+
val ds = CassandraDriver.getDatasetForCassandra(df)
25+
ds.show() //Debug only
26+
27+
ds.rdd.saveToCassandra(CassandraDriver.namespace,
28+
CassandraDriver.StreamProviderTableSink,
29+
SomeColumns("title", "artist", "radio", "count")
30+
)
31+
32+
saveKafkaMetaData(df)
33+
}
34+
35+
/*
36+
* As per SPARK-16020 arbitrary transformations are not supported, but
37+
* converting to an RDD allows us to do magic.
38+
*/
39+
override def addBatch(batchId: Long, df: DataFrame) = {
40+
println(s"saveToCassandra batchId : ${batchId}")
41+
saveToCassandra(df)
42+
}
43+
44+
/**
45+
* saving the highest value of offset per partition when checkpointing is not available (application upgrade for example)
46+
* http://docs.datastax.com/en/cassandra/3.0/cassandra/dml/dmlTransactionsDiffer.html
47+
* should be done in the same transaction as the data linked to the offsets
48+
*/
49+
private def saveKafkaMetaData(df: DataFrame) = {
50+
val kafkaMetadata = df.groupBy($"partition").agg(max($"offset").cast(LongType).as("offset")).as[KafkaMetadata]
51+
52+
println("saveKafkaMetaData")
53+
kafkaMetadata.show()
54+
55+
kafkaMetadata.rdd.saveToCassandra(CassandraDriver.namespace,
56+
CassandraDriver.kafkaMetadata,
57+
SomeColumns("partition", "offset")
58+
)
59+
60+
//Otherway to save offset inside Cassandra
61+
//kafkaMetadata.collect().foreach(CassandraKafkaMetadata.save)
62+
}
63+
}
64+

src/main/scala/cassandra/sink/CassandraSinkForeach.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package cassandra.sink
22

3+
import cassandra.CassandraDriver
34
import com.datastax.spark.connector.cql.CassandraConnector
45
import org.apache.spark.sql.ForeachWriter
56
import radio.SimpleSongAggregation
@@ -11,9 +12,7 @@ import spark.SparkHelper
1112
* https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
1213
*/
1314
class CassandraSinkForeach() extends ForeachWriter[SimpleSongAggregation] {
14-
private val connector = CassandraConnector(SparkHelper.getSparkSession().sparkContext.getConf)
15-
16-
private def cql(record: SimpleSongAggregation): String = s"""
15+
private def cqlRadio(record: SimpleSongAggregation): String = s"""
1716
insert into test.radio (title, artist, radio, count)
1817
values('${record.title}', '${record.artist}', '${record.radio}', ${record.count})"""
1918

@@ -26,8 +25,8 @@ class CassandraSinkForeach() extends ForeachWriter[SimpleSongAggregation] {
2625
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connection-pooling
2726
def process(record: SimpleSongAggregation) = {
2827
println(s"Saving record: $record")
29-
connector.withSessionDo(session =>
30-
session.execute(cql(record))
28+
CassandraDriver.connector.withSessionDo(session =>
29+
session.execute(cqlRadio(record))
3130
)
3231
}
3332

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package cassandra.sink
22

3-
import cassandra.CassandraDriver
4-
import com.datastax.spark.connector._
5-
import org.apache.spark.sql.execution.streaming.Sink
63
import org.apache.spark.sql.sources.StreamSinkProvider
74
import org.apache.spark.sql.streaming.OutputMode
85
import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -19,26 +16,4 @@ class CassandraSinkProvider extends StreamSinkProvider {
1916
outputMode: OutputMode): CassandraSink = {
2017
new CassandraSink()
2118
}
22-
}
23-
24-
/**
25-
* must be idempotent and synchronous (@TODO check asynchronous/synchronous from Datastax's Spark connector) sink
26-
*/
27-
class CassandraSink() extends Sink {
28-
def saveToCassandra(df: DataFrame) = {
29-
df.show()
30-
df.rdd.saveToCassandra(CassandraDriver.namespace,
31-
CassandraDriver.StreamProviderTableSink,
32-
SomeColumns("title", "artist", "radio", "count")
33-
)
34-
}
35-
36-
/*
37-
* As per SPARK-16020 arbitrary transformations are not supported, but
38-
* converting to an RDD allows us to do magic.
39-
*/
40-
override def addBatch(batchId: Long, df: DataFrame) = {
41-
println(s"saveToCassandra batchId : ${batchId}")
42-
saveToCassandra(df)
43-
}
44-
}
19+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package kafka
2+
3+
case class KafkaMetadata(partition: Long, offset: Long)

0 commit comments

Comments
 (0)