Skip to content

Commit 45330de

Browse files
committed
fix comments
1 parent 74bfb87 commit 45330de

File tree

4 files changed

+55
-52
lines changed

4 files changed

+55
-52
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
3030

3131
def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
3232
rowWriter.reset()
33-
rowWriter.unsetNullAt(0)
34-
rowWriter.unsetNullAt(1)
33+
rowWriter.zeroOutNullBytes()
3534

3635
if (record.key == null) {
3736
rowWriter.setNullAt(0)

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -170,52 +170,8 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
170170
}
171171
}
172172

173-
test("read kafka record containing null key/values") {
174-
val table = "kafka_null_key_value_source_test"
175-
withTable(table) {
176-
val topic = newTopic()
177-
testUtils.createTopic(topic)
178-
testUtils.withProducer { producer =>
179-
val df = spark
180-
.readStream
181-
.format("kafka")
182-
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
183-
.option("startingOffsets", "earliest")
184-
.option("subscribe", topic)
185-
.load()
186-
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
187-
.as[(String, String)]
188-
189-
val q = df
190-
.writeStream
191-
.format("memory")
192-
.queryName(table)
193-
.trigger(ContinuousTrigger(100))
194-
.start()
195-
try {
196-
val expected1 = (1 to 5).map { _ =>
197-
producer.send(new ProducerRecord[String, String](topic, null, null)).get()
198-
(null, null)
199-
}.asInstanceOf[Seq[(String, String)]]
200-
201-
val expected2 = (6 to 10).map { i =>
202-
producer.send(new ProducerRecord[String, String](topic, i.toString, null)).get()
203-
(i.toString, null)
204-
}.asInstanceOf[Seq[(String, String)]]
205-
206-
val expected3 = (11 to 15).map { i =>
207-
producer.send(new ProducerRecord[String, String](topic, null, i.toString)).get()
208-
(null, i.toString)
209-
}.asInstanceOf[Seq[(String, String)]]
210-
211-
eventually(timeout(streamingTimeout)) {
212-
checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
213-
}
214-
} finally {
215-
q.stop()
216-
}
217-
}
218-
}
173+
test(s"SPARK-27494: read kafka record containing null key/values in continuous mode") {
174+
testNullableKeyValue(ContinuousTrigger(100))
219175
}
220176
}
221177

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
10401040
q.stop()
10411041
}
10421042
}
1043+
1044+
test(s"SPARK-27494: read kafka record containing null key/values in micro-batch mode") {
1045+
testNullableKeyValue(Trigger.ProcessingTime(100))
1046+
}
10431047
}
10441048

10451049

@@ -1511,6 +1515,54 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
15111515
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
15121516
)
15131517
}
1518+
1519+
protected def testNullableKeyValue(trigger: Trigger): Unit = {
1520+
val table = "kafka_null_key_value_source_test"
1521+
withTable(table) {
1522+
val topic = newTopic()
1523+
testUtils.createTopic(topic)
1524+
testUtils.withProducer { producer =>
1525+
val df = spark
1526+
.readStream
1527+
.format("kafka")
1528+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
1529+
.option("startingOffsets", "earliest")
1530+
.option("subscribe", topic)
1531+
.load()
1532+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
1533+
.as[(String, String)]
1534+
1535+
val q = df
1536+
.writeStream
1537+
.format("memory")
1538+
.queryName(table)
1539+
.trigger(trigger)
1540+
.start()
1541+
try {
1542+
val expected1 = (1 to 5).map { _ =>
1543+
producer.send(new ProducerRecord[String, String](topic, null, null)).get()
1544+
(null, null)
1545+
}.asInstanceOf[Seq[(String, String)]]
1546+
1547+
val expected2 = (6 to 10).map { i =>
1548+
producer.send(new ProducerRecord[String, String](topic, i.toString, null)).get()
1549+
(i.toString, null)
1550+
}.asInstanceOf[Seq[(String, String)]]
1551+
1552+
val expected3 = (11 to 15).map { i =>
1553+
producer.send(new ProducerRecord[String, String](topic, null, i.toString)).get()
1554+
(null, i.toString)
1555+
}.asInstanceOf[Seq[(String, String)]]
1556+
1557+
eventually(timeout(streamingTimeout)) {
1558+
checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
1559+
}
1560+
} finally {
1561+
q.stop()
1562+
}
1563+
}
1564+
}
1565+
}
15141566
}
15151567

15161568
object KafkaSourceSuite {

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,6 @@ public void setNullAt(int ordinal) {
111111
write(ordinal, 0L);
112112
}
113113

114-
public void unsetNullAt(int ordinal) {
115-
BitSetMethods.unset(getBuffer(), startingOffset, ordinal);
116-
}
117-
118114
@Override
119115
public void setNull1Bytes(int ordinal) {
120116
setNullAt(ordinal);

0 commit comments

Comments
 (0)