Skip to content

Commit d2656aa

Browse files
uncleGencloud-fan
authored andcommitted
[SPARK-27494][SS] Null values don't work in Kafka source v2
## What changes were proposed in this pull request? Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values. ## How was this patch tested? add new unit tests Closes #24441 from uncleGen/SPARK-27494. Authored-by: uncleGen <hustyugm@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent d5dbf05 commit d2656aa

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
3030

3131
def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
3232
rowWriter.reset()
33+
rowWriter.zeroOutNullBytes()
3334

3435
if (record.key == null) {
3536
rowWriter.setNullAt(0)
3637
} else {
3738
rowWriter.write(0, record.key)
3839
}
39-
rowWriter.write(1, record.value)
40+
if (record.value == null) {
41+
rowWriter.setNullAt(1)
42+
} else {
43+
rowWriter.write(1, record.value)
44+
}
4045
rowWriter.write(2, UTF8String.fromString(record.topic))
4146
rowWriter.write(3, record.partition)
4247
rowWriter.write(4, record.offset)

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
169169
}
170170
}
171171
}
172+
173+
test("SPARK-27494: read kafka record containing null key/values.") {
174+
testNullableKeyValue(ContinuousTrigger(100))
175+
}
172176
}
173177

174178
class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
10401040
q.stop()
10411041
}
10421042
}
1043+
1044+
test("SPARK-27494: read kafka record containing null key/values.") {
1045+
testNullableKeyValue(Trigger.ProcessingTime(100))
1046+
}
10431047
}
10441048

10451049

@@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
15111515
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
15121516
)
15131517
}
1518+
1519+
protected def testNullableKeyValue(trigger: Trigger): Unit = {
1520+
val table = "kafka_null_key_value_source_test"
1521+
withTable(table) {
1522+
val topic = newTopic()
1523+
testUtils.createTopic(topic)
1524+
testUtils.withTranscationalProducer { producer =>
1525+
val df = spark
1526+
.readStream
1527+
.format("kafka")
1528+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
1529+
.option("kafka.isolation.level", "read_committed")
1530+
.option("startingOffsets", "earliest")
1531+
.option("subscribe", topic)
1532+
.load()
1533+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
1534+
.as[(String, String)]
1535+
1536+
val q = df
1537+
.writeStream
1538+
.format("memory")
1539+
.queryName(table)
1540+
.trigger(trigger)
1541+
.start()
1542+
try {
1543+
var idx = 0
1544+
producer.beginTransaction()
1545+
val expected1 = Seq.tabulate(5) { _ =>
1546+
producer.send(new ProducerRecord[String, String](topic, null, null)).get()
1547+
(null, null)
1548+
}.asInstanceOf[Seq[(String, String)]]
1549+
1550+
val expected2 = Seq.tabulate(5) { _ =>
1551+
idx += 1
1552+
producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get()
1553+
(idx.toString, null)
1554+
}.asInstanceOf[Seq[(String, String)]]
1555+
1556+
val expected3 = Seq.tabulate(5) { _ =>
1557+
idx += 1
1558+
producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get()
1559+
(null, idx.toString)
1560+
}.asInstanceOf[Seq[(String, String)]]
1561+
1562+
producer.commitTransaction()
1563+
eventually(timeout(streamingTimeout)) {
1564+
checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
1565+
}
1566+
} finally {
1567+
q.stop()
1568+
}
1569+
}
1570+
}
1571+
}
15141572
}
15151573

15161574
object KafkaSourceSuite {

0 commit comments

Comments
 (0)