Skip to content

Commit e95d342

Browse files
committed
Fixed #16 Added option to configure mapping of fields in SinkRecord to CQL columns
1 parent 294ec20 commit e95d342

File tree

11 files changed

+226
-40
lines changed

11 files changed

+226
-40
lines changed

README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ All the others (BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE,
8484

8585
## CassandraSink
8686
It stores Kafka SinkRecord in Cassandra tables. Currently, we only support STRUCT type in the SinkRecord.
87-
The STRUCT can have multiple fields with primitive fieldtypes. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names.
87+
The STRUCT can have multiple fields with primitive fieldtypes.
88+
By default, we assume one-to-one mapping between the column names in the Cassandra sink table and the field names.
8889

8990
Say, the SinkRecords has the following STRUCT value
9091
```
@@ -97,6 +98,23 @@ Say, the SinkRecords has the following STRUCT value
9798

9899
Then the Cassandra table should have the columns - id, username, text
99100

101+
We also support specifying the field name mapping to column names, using the property `cassandra.sink.field.mapping`
102+
Say, the SinkRecords has the following STRUCT value
103+
```
104+
{
105+
'id': 1,
106+
'user': {
107+
'id': 123,
108+
'name': 'Foo',
109+
'email': 'foo@bar.com'
110+
},
111+
'text': 'This is my first tweet'
112+
}
113+
```
114+
and the `cassandra.sink.field.mapping` has the value `{'id': 'id', 'user': {'id': 'uid', 'name': 'username'}, 'text': 'tweet_text'}`
115+
Then the Cassandra table should have the columns - id, uid, username, tweet_text.
116+
Note that since we did not specify any mapping for 'user.email', it is ignored and not inserted in the Cassandra Sink table.
117+
100118
Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink
101119

102120
## Configuration
@@ -132,6 +150,7 @@ Refer `examples/config` for sample configuration files
132150
|-------- |----------------------------|-----------------------|
133151
| cassandra.sink.route.\<topic_name\> | The table to write the SinkRecords to, \<keyspace\>.\<tableName\> | |
134152
| cassandra.sink.consistency | The consistency level for writes to Cassandra. | LOCAL_QUORUM |
153+
| cassandra.sink.field.mapping | The JSON String mapping field names to column names. | |
135154

136155

137156
## Building from Source

build.sbt

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ libraryDependencies ++= Seq(
3333
"org.joda" % "joda-convert" % "1.8.1",
3434
"org.scalatest" %% "scalatest" % "2.2.6" % "test,it",
3535
"org.mockito" % "mockito-core" % "2.0.34-beta" % "test,it",
36-
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it",
37-
CrossVersion.partialVersion(scalaVersion.value) match {
38-
case Some((2, minor)) if minor < 11 =>
39-
"org.slf4j" % "slf4j-api" % "1.7.13"
40-
case _ =>
41-
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"
42-
}
43-
)
36+
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it"
37+
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
38+
case Some((2, minor)) if minor < 11 => Seq(
39+
"org.slf4j" % "slf4j-api" % "1.7.13"
40+
)
41+
case _ => Seq(
42+
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
43+
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"
44+
)
45+
})
4446

4547
publishMavenStyle := true
4648

src/it/resources/setup.cql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ CREATE TABLE IF NOT EXISTS test.kv (
2626
value int,
2727
PRIMARY KEY (key));
2828

29+
CREATE TABLE IF NOT EXISTS test.fieldmap (
30+
new_key text,
31+
new_value int,
32+
new_nested text,
33+
new_dnested text,
34+
PRIMARY KEY (new_key));
35+
2936
CREATE TABLE test.playlists (
3037
id bigint,
3138
song_order int,

src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@
1919
package com.tuplejump.kafka.connect.cassandra
2020

2121
import scala.collection.JavaConverters._
22+
import scala.util.parsing.json.JSONObject
2223
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
2324
import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext}
2425

2526
class CassandraSinkTaskSpec extends AbstractFlatSpec {
2627

27-
val topicName = "test_kv_topic"
28-
val tableName = "test.kv"
29-
val config = sinkProperties(Map(topicName -> tableName))
30-
3128
it should "start sink task" in {
29+
val topicName = "test_kv_topic"
30+
val tableName = "test.kv"
31+
val config = sinkProperties(Map(topicName -> tableName))
32+
3233
val sinkTask = new CassandraSinkTask()
3334
val mockContext = mock[SinkTaskContext]
3435

@@ -38,6 +39,10 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
3839
}
3940

4041
it should "save records in cassandra" in {
42+
val topicName = "test_kv_topic"
43+
val tableName = "test.kv"
44+
val config = sinkProperties(Map(topicName -> tableName))
45+
4146
val sinkTask = new CassandraSinkTask()
4247
val mockContext = mock[SinkTaskContext]
4348

@@ -64,5 +69,76 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
6469
rowCount should be(2)
6570
cc.shutdown()
6671
}
72+
73+
74+
it should "save records in cassandra with custom field mapping" in {
75+
val topicName = "test_fieldmap_topic"
76+
val tableName = "test.fieldmap"
77+
val config = sinkProperties(Map(topicName -> tableName))
78+
79+
val sinkTask = new CassandraSinkTask()
80+
val mockContext = mock[SinkTaskContext]
81+
82+
val fieldMapping: JSONObject = JSONObject(Map(
83+
"key" -> "new_key",
84+
"value" -> "new_value",
85+
"nvalue" -> JSONObject(Map(
86+
"blah1" -> "new_nested",
87+
"blah2" -> JSONObject(Map(
88+
"blah2" -> "new_dnested"
89+
))
90+
))
91+
))
92+
93+
sinkTask.initialize(mockContext)
94+
sinkTask.start((config + ("cassandra.sink.field.mapping" -> fieldMapping.toString())).asJava)
95+
96+
val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1)
97+
.field("blah1", Schema.STRING_SCHEMA)
98+
.field("blah2", Schema.STRING_SCHEMA).build
99+
val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1)
100+
.field("blah1", Schema.STRING_SCHEMA)
101+
.field("blah2", doubleNestedSchema).build
102+
val valueSchema = SchemaBuilder.struct.name("record").version(1)
103+
.field("key", Schema.STRING_SCHEMA)
104+
.field("value", Schema.INT32_SCHEMA)
105+
.field("nvalue", nestedSchema).build
106+
107+
val dnestedValue1 = new Struct(doubleNestedSchema)
108+
.put("blah1", "dnes_blah1_1")
109+
.put("blah2", "dnes_blah2_1")
110+
val nestedValue1 = new Struct(nestedSchema)
111+
.put("blah1", "nes_blah1_1")
112+
.put("blah2", dnestedValue1)
113+
val value1 = new Struct(valueSchema)
114+
.put("key", "pqr")
115+
.put("value", 15)
116+
.put("nvalue", nestedValue1)
117+
118+
val dnestedValue2 = new Struct(doubleNestedSchema)
119+
.put("blah1", "dnes_blah1_2")
120+
.put("blah2", "dnes_blah2_2")
121+
val nestedValue2 = new Struct(nestedSchema)
122+
.put("blah1", "nes_blah1_2")
123+
.put("blah2", dnestedValue2)
124+
val value2 = new Struct(valueSchema)
125+
.put("key", "abc")
126+
.put("value", 17)
127+
.put("nvalue", nestedValue2)
128+
129+
val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0)
130+
val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0)
131+
132+
sinkTask.put(List(record1, record2).asJavaCollection)
133+
134+
sinkTask.stop()
135+
136+
val cc = CassandraCluster.local
137+
val session = cc.session
138+
val result = session.execute(s"select count(1) from $tableName").one()
139+
val rowCount = result.getLong(0)
140+
rowCount should be(2)
141+
cc.shutdown()
142+
}
67143
}
68144

src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask {
5252
private def write(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = {
5353
// TODO needs ticket: if (byTopic.size > 1) boundWrite(sc, byTopic) else
5454
for (record <- byTopic) {
55-
val query = record.as(sc.schema.namespace)
55+
val query = record.as(sc.schema.namespace, sc.options.fieldMapping)
5656
Try(session.executeAsync(query.cql)) recover { case NonFatal(e) =>
5757
throw new ConnectException(
5858
s"Error executing ${byTopic.size} records for schema '${sc.schema}'.", e)
@@ -64,7 +64,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask {
6464
private def boundWrite(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = {
6565
val statement = prepare(session, sc)
6666
val futures = for (record <- byTopic) yield {
67-
val query = record.as(sc.schema.namespace)
67+
val query = record.as(sc.schema.namespace, sc.options.fieldMapping)
6868
try {
6969
val bs = statement.bind(query.cql)
7070
session.executeAsync(bs)

src/main/scala/com/tuplejump/kafka/connect/cassandra/ConnectorSyntax.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,12 @@ private[cassandra] object Syntax {
107107
namespace.length >= 3 || namespace.contains(".")
108108
}
109109

110-
def apply(namespace: String, columnNames: List[ColumnName], columnValues: String): SinkQuery = {
111-
val columns = columnNames.mkString(",")
112-
SinkQuery(s"INSERT INTO $namespace($columns) VALUES($columnValues)")
110+
def apply(namespace: String, columnNamesVsValues: Map[ColumnName, String]): SinkQuery = {
111+
val query = columnNamesVsValues.view.map(e => Vector(e._1, e._2)).transpose match {
112+
case columnNames :: columnValues :: Nil =>
113+
s"INSERT INTO ${namespace}(${columnNames.mkString(",")}) VALUES(${columnValues.mkString(",")})"
114+
}
115+
SinkQuery(query)
113116
}
114117
}
115118

src/main/scala/com/tuplejump/kafka/connect/cassandra/TaskConfig.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package com.tuplejump.kafka.connect.cassandra
1818

1919
import scala.collection.immutable
2020
import scala.util.control.NonFatal
21+
import scala.util.parsing.json.JSON
2122
import org.apache.kafka.common.config.ConfigException
2223
import org.apache.kafka.connect.connector.Task
24+
import org.apache.kafka.connect.errors.DataException
2325
import com.datastax.driver.core.{TableMetadata, ConsistencyLevel}
2426
import InternalConfig._
2527

@@ -123,6 +125,9 @@ object TaskConfig {
123125
final val SinkConsistency: Key = "cassandra.sink.consistency"
124126
final val DefaultSinkConsistency = ConsistencyLevel.LOCAL_QUORUM
125127

128+
final val FieldMapping: Key = "cassandra.sink.field.mapping"
129+
final val DefaultFieldMapping = Map.empty[String, String]
130+
126131
/* **** Task config **** */
127132
final val TaskParallelismStrategy: Key = "cassandra.task.parallelism"
128133

@@ -156,6 +161,10 @@ private[cassandra] object InternalConfig {
156161
def toInt(a: String): Int = a.toInt
157162
def toLong(a: String): Long = a.toLong
158163
def toConsistency(a: String): ConsistencyLevel = ConsistencyLevel.valueOf(a)
164+
def toMap(a: String): Map[String, Any] = JSON.parseFull(a) collect {
165+
case data: Map[_, _] => data.asInstanceOf[Map[String, Any]]
166+
} getOrElse(throw new DataException(s"Field mapping type for '$a' is not supported."))
167+
159168

160169
/** A Cassandra `keyspace.table` to Kafka topic mapping.
161170
*
@@ -319,15 +328,21 @@ private[cassandra] object InternalConfig {
319328
sealed trait ClusterQueryOptions
320329

321330
/** Settings related for individual queries, can be set per keyspace.table. */
322-
final case class WriteOptions(consistency: ConsistencyLevel) extends ClusterQueryOptions
331+
final case class WriteOptions(consistency: ConsistencyLevel,
332+
fieldMapping: Map[String, Any]) extends ClusterQueryOptions
323333

324334
object WriteOptions {
325335

326-
def apply(config: Map[String,String]): WriteOptions =
327-
WriteOptions(config.valueOr[ConsistencyLevel](
328-
SinkConsistency, toConsistency, DefaultSourceConsistency))
336+
def apply(config: Map[String, String]): WriteOptions = {
337+
WriteOptions(
338+
consistency = config.valueOr[ConsistencyLevel](
339+
SinkConsistency, toConsistency, DefaultSourceConsistency),
340+
fieldMapping = config.valueOr[Map[String, Any]](
341+
FieldMapping, toMap, DefaultFieldMapping
342+
)
343+
)
344+
}
329345
}
330-
331346
/** Settings related for individual queries, can be set per keyspace.table. */
332347
final case class ReadOptions(splitSize: Int,
333348
fetchSize: Int,

src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.tuplejump.kafka.connect
1818

19+
import org.apache.kafka.connect.data.Field
20+
1921
/** Common package operations. */
2022
package object cassandra {
2123
import java.util.{List => JList, Map => JMap, Date => JDate}
@@ -69,29 +71,90 @@ package object cassandra {
6971

7072
implicit class SinkRecordOps(record: SinkRecord) {
7173

72-
def as(namespace: String): SinkQuery = {
74+
def as(namespace: String, fieldMapping: Map[String, Any]): SinkQuery = {
75+
val colNamesVsValues: Map[String, String] = {
76+
if (fieldMapping.isEmpty) {
77+
toCqlData
78+
} else {
79+
toCqlData(fieldMapping)
80+
}
81+
}
82+
SinkQuery(namespace, colNamesVsValues)
83+
}
84+
85+
def toCqlData(): (Map[String, String]) = {
7386
val schema = record.valueSchema
74-
val columnNames = schema.asColumnNames
75-
val columnValues = schema.`type`() match {
87+
schema.`type`() match {
7688
case STRUCT =>
77-
val struct: Struct = record.value.asInstanceOf[Struct]
78-
columnNames.map(convert(schema, struct, _)).mkString(",")
79-
case other => throw new DataException(
80-
s"Unable to create insert statement with unsupported value schema type $other.")
89+
schema.fields.asScala.map { field =>
90+
field.name -> convert(schema, record.value.asInstanceOf[Struct], field)
91+
}.toMap
92+
case other =>
93+
throw new DataException(
94+
s"Unable to create insert statement with unsupported value schema type $other.")
95+
}
96+
}
97+
98+
def toCqlData(fieldMapping: Map[String, Any]): Map[String, String] = {
99+
record.valueSchema.`type`() match {
100+
case STRUCT =>
101+
toColNamesVsValues(Map.empty[String, String], record.value.asInstanceOf[Struct], fieldMapping)
102+
case other =>
103+
throw new DataException(
104+
s"Unable to create insert statement with unsupported value schema type $other.")
105+
}
106+
}
107+
108+
// scalastyle:off
109+
private def toColNamesVsValues(colNameVsValues: Map[String, String],
110+
struct: Struct, fieldMapping: Map[String, Any]): Map[String, String] = {
111+
lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema")
112+
var result: Map[String, String] = colNameVsValues
113+
struct.schema.fields.asScala.foreach { field =>
114+
val fieldMappingValue = fieldMapping.get(field.name)
115+
field.schema.`type`() match {
116+
case STRUCT =>
117+
fieldMappingValue match {
118+
case Some(value) =>
119+
value match {
120+
case newMap: Map[_, _] =>
121+
result = toColNamesVsValues(
122+
result,
123+
struct.get(field).asInstanceOf[Struct],
124+
newMap.asInstanceOf[Map[String, Any]]
125+
)
126+
case _ =>
127+
throw exception
128+
}
129+
case None =>
130+
}
131+
case _ =>
132+
fieldMappingValue match {
133+
case Some(value) =>
134+
value match {
135+
case strValue: String =>
136+
result += (strValue -> convert(field.schema, struct, field))
137+
case _ =>
138+
throw exception
139+
}
140+
case None =>
141+
}
142+
}
81143
}
82-
SinkQuery(namespace, columnNames, columnValues)
144+
result
83145
}
146+
// scalastyle:on
84147

85148
/* TODO support all types. */
86-
def convert(schema: Schema, result: Struct, col: String): AnyRef =
87-
schema.field(col).schema match {
149+
def convert(schema: Schema, result: Struct, field: Field): String =
150+
field.schema match {
88151
case x if x.`type`() == Schema.STRING_SCHEMA.`type`() =>
89-
s"'${result.get(col).toString}'"
152+
s"'${result.get(field).toString}'"
90153
case x if x.name() == Timestamp.LOGICAL_NAME =>
91-
val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate])
154+
val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate])
92155
s"$time"
93156
case y =>
94-
result.get(col)
157+
String.valueOf(result.get(field))
95158
}
96159

97160
def asColumnNames: List[ColumnName] =

0 commit comments

Comments
 (0)