-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23539][SS] Add support for Kafka headers in Structured Streaming #22282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
As you can see, this PR consists of 3 parts:
I have the following questions:
Please have a look when you are free. Thanks. |
This comment has been minimized.
This comment has been minimized.
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
Outdated
Show resolved
Hide resolved
229aac8
to
8a5ef14
Compare
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change mostly looks good. Left some comments to point some possible minors as well as style nits.
...-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
Outdated
Show resolved
Hide resolved
...kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
2254009
to
253a894
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
253a894
to
b0f64e9
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
## What changes were proposed in this pull request? Fix unused imports & outdated comments on `kafka-0-10-sql` module. (Found while I was working on [SPARK-23539](#22282)) ## How was this patch tested? Existing unit tests. Closes #22342 from dongjinleekr/feature/fix-kafka-sql-trivials. Authored-by: Lee Dongjin <dongjin@apache.org> Signed-off-by: Sean Owen <sean.owen@databricks.com> (cherry picked from commit 458f501) Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request? Fix unused imports & outdated comments on `kafka-0-10-sql` module. (Found while I was working on [SPARK-23539](#22282)) ## How was this patch tested? Existing unit tests. Closes #22342 from dongjinleekr/feature/fix-kafka-sql-trivials. Authored-by: Lee Dongjin <dongjin@apache.org> Signed-off-by: Sean Owen <sean.owen@databricks.com>
retest this please. |
Test build #95906 has finished for PR 22282 at commit
|
220bd0a
to
2c48aae
Compare
cc/ @zsxwing @tdas @dongjoon-hyun @srowen Rebased onto the latest master. Please have a look when you are free. Thanks in advance. |
Test build #98726 has finished for PR 22282 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor feedback to start
...nal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
Outdated
Show resolved
Hide resolved
...nal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
...nal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
Outdated
Show resolved
Hide resolved
2c48aae
to
9fad0ca
Compare
Test build #102141 has finished for PR 22282 at commit
|
retest this please. @HyukjinKwon Could you have a look when you are free? cc/ @kiszk @zsxwing @tdas |
retest this please |
Test build #102174 has finished for PR 22282 at commit
|
If a user uses a Kafka cluster which runs using an old version that doesn't support Kafka headers, will their query fail? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small style items
@@ -131,9 +145,26 @@ private[kafka010] abstract class KafkaRowWriter( | |||
throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + | |||
s"attribute unsupported type ${t.catalogString}") | |||
} | |||
val headersExpression = inputSchema | |||
.find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse( | |||
Literal(CatalystTypeConverters.convertToCatalyst(null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be indented further?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but the formatter reverts the indention to the current status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The style checker or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code formatter of the IDE. Also, it passes the style checker of mvn.
...nal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
Outdated
Show resolved
Hide resolved
...nal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
Show resolved
Hide resolved
.../kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
Outdated
Show resolved
Hide resolved
@srowen Fixed, with removing redundant |
Test build #110456 has finished for PR 22282 at commit
|
I think it's OK @dongjinleekr , just needs a rebase now. |
…TRUCT<key:STRING,value:BINARY>>)'
Move projection methods in `KafkaOffsetReader` to `KafkaRecordToRowConverter`.
243809f
to
de02de4
Compare
@srowen Here is the rebase. Thanks for reviewing! 😄 |
Test build #110550 has finished for PR 22282 at commit
|
Merged to master |
Great! @dongjinleekr , @srowen , @HeartSaVioR , @zsxwing ! |
Finally! Well done, @dongjinleekr ! |
## What changes were proposed in this pull request? This update adds support for Kafka Headers functionality in Structured Streaming. ## How was this patch tested? With following unit tests: - KafkaRelationSuite: "default starting and ending offsets with headers" (new) - KafkaSinkSuite: "batch - write to kafka" (updated) Closes apache#22282 from dongjinleekr/feature/SPARK-23539. Lead-authored-by: Lee Dongjin <dongjin@apache.org> Co-authored-by: Jungtaek Lim <kabhwan@gmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
Just gone through, good job @dongjinleekr |
What changes were proposed in this pull request?
This update adds support for Kafka Headers functionality in Structured Streaming.
How was this patch tested?
With following unit tests: