Skip to content

Commit d00bd74

Browse files
uncleGenkai-chi
authored andcommitted
[SPARK-27494][SS] Null values don't work in Kafka source v2
## What changes were proposed in this pull request? Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values. ## How was this patch tested? add new unit tests Closes apache#24441 from uncleGen/SPARK-27494. Authored-by: uncleGen <hustyugm@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit d2656aa) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 4541114 commit d00bd74

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
3030

3131
def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
3232
rowWriter.reset()
33+
rowWriter.zeroOutNullBytes()
3334

3435
if (record.key == null) {
3536
rowWriter.setNullAt(0)
3637
} else {
3738
rowWriter.write(0, record.key)
3839
}
39-
rowWriter.write(1, record.value)
40+
if (record.value == null) {
41+
rowWriter.setNullAt(1)
42+
} else {
43+
rowWriter.write(1, record.value)
44+
}
4045
rowWriter.write(2, UTF8String.fromString(record.topic))
4146
rowWriter.write(3, record.partition)
4247
rowWriter.write(4, record.offset)

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
169169
}
170170
}
171171
}
172+
173+
test("SPARK-27494: read kafka record containing null key/values.") {
174+
testNullableKeyValue(ContinuousTrigger(100))
175+
}
172176
}
173177

174178
class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
988988
q.stop()
989989
}
990990
}
991+
992+
test("SPARK-27494: read kafka record containing null key/values.") {
993+
testNullableKeyValue(Trigger.ProcessingTime(100))
994+
}
991995
}
992996

993997

@@ -1461,6 +1465,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
14611465
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
14621466
)
14631467
}
1468+
1469+
protected def testNullableKeyValue(trigger: Trigger): Unit = {
1470+
val table = "kafka_null_key_value_source_test"
1471+
withTable(table) {
1472+
val topic = newTopic()
1473+
testUtils.createTopic(topic)
1474+
testUtils.withTranscationalProducer { producer =>
1475+
val df = spark
1476+
.readStream
1477+
.format("kafka")
1478+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
1479+
.option("kafka.isolation.level", "read_committed")
1480+
.option("startingOffsets", "earliest")
1481+
.option("subscribe", topic)
1482+
.load()
1483+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
1484+
.as[(String, String)]
1485+
1486+
val q = df
1487+
.writeStream
1488+
.format("memory")
1489+
.queryName(table)
1490+
.trigger(trigger)
1491+
.start()
1492+
try {
1493+
var idx = 0
1494+
producer.beginTransaction()
1495+
val expected1 = Seq.tabulate(5) { _ =>
1496+
producer.send(new ProducerRecord[String, String](topic, null, null)).get()
1497+
(null, null)
1498+
}.asInstanceOf[Seq[(String, String)]]
1499+
1500+
val expected2 = Seq.tabulate(5) { _ =>
1501+
idx += 1
1502+
producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get()
1503+
(idx.toString, null)
1504+
}.asInstanceOf[Seq[(String, String)]]
1505+
1506+
val expected3 = Seq.tabulate(5) { _ =>
1507+
idx += 1
1508+
producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get()
1509+
(null, idx.toString)
1510+
}.asInstanceOf[Seq[(String, String)]]
1511+
1512+
producer.commitTransaction()
1513+
eventually(timeout(streamingTimeout)) {
1514+
checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
1515+
}
1516+
} finally {
1517+
q.stop()
1518+
}
1519+
}
1520+
}
1521+
}
14641522
}
14651523

14661524
object KafkaSourceSuite {

0 commit comments

Comments
 (0)