Skip to content

[SPARK-23539][SS] Add support for Kafka headers in Structured Streaming #22282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

dongjinleekr
Copy link
Contributor

What changes were proposed in this pull request?

This update adds support for Kafka Headers functionality in Structured Streaming.

How was this patch tested?

With following unit tests:

  • KafkaRelationSuite: "default starting and ending offsets with headers" (new)
  • KafkaSinkSuite: "batch - write to kafka" (updated)

@dongjinleekr
Copy link
Contributor Author

As you can see, this PR consists of 3 parts:

  1. Extend UnsafeArrayData, UnsafeMapData (commit 1~6)
  2. Implement Kafka Headers functionality (commit 7, 10)
  3. Update unit tests (commit 8, 9)

I have the following questions:

  1. Should I separate group 1 as a separated issue?
  2. I found that KafkaSourceSuiteBase's 'Kafka column types' test is missing a select expression. The weird thing is that it works before the update but does not work after the update. (It is why the last commit was added - without specification, this test does not pass.) Is this intended one? Or, Do I misunderstanding something?

Please have a look when you are free. Thanks.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change mostly looks good. Left some comments to point some possible minors as well as style nits.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@dongjinleekr

This comment has been minimized.

@SparkQA

This comment has been minimized.

asfgit pushed a commit that referenced this pull request Sep 7, 2018
## What changes were proposed in this pull request?

Fix unused imports & outdated comments on `kafka-0-10-sql` module. (Found while I was working on [SPARK-23539](#22282))

## How was this patch tested?

Existing unit tests.

Closes #22342 from dongjinleekr/feature/fix-kafka-sql-trivials.

Authored-by: Lee Dongjin <dongjin@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(cherry picked from commit 458f501)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
asfgit pushed a commit that referenced this pull request Sep 7, 2018
## What changes were proposed in this pull request?

Fix unused imports & outdated comments on `kafka-0-10-sql` module. (Found while I was working on [SPARK-23539](#22282))

## How was this patch tested?

Existing unit tests.

Closes #22342 from dongjinleekr/feature/fix-kafka-sql-trivials.

Authored-by: Lee Dongjin <dongjin@apache.org>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
@dongjinleekr
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 11, 2018

Test build #95906 has finished for PR 22282 at commit 220bd0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjinleekr
Copy link
Contributor Author

cc/ @zsxwing @tdas @dongjoon-hyun @srowen Rebased onto the latest master. Please have a look when you are free. Thanks in advance.

@SparkQA
Copy link

SparkQA commented Nov 12, 2018

Test build #98726 has finished for PR 22282 at commit 2c48aae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor feedback to start

@SparkQA
Copy link

SparkQA commented Feb 10, 2019

Test build #102141 has finished for PR 22282 at commit 9fad0ca.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjinleekr
Copy link
Contributor Author

retest this please.

@HyukjinKwon Could you have a look when you are free? cc/ @kiszk @zsxwing @tdas

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Feb 11, 2019

Test build #102174 has finished for PR 22282 at commit 9fad0ca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 11, 2019

If a user uses a Kafka cluster which runs using an old version that doesn't support Kafka headers, will their query fail?

@HeartSaVioR
Copy link
Contributor

@zsxwing @srowen
Could you please take another round of review? The patch now looks to address all of review comments and rebased with master.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small style items

@@ -131,9 +145,26 @@ private[kafka010] abstract class KafkaRowWriter(
throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
s"attribute unsupported type ${t.catalogString}")
}
val headersExpression = inputSchema
.find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
Literal(CatalystTypeConverters.convertToCatalyst(null),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be indented further?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but the formatter reverts the indention to the current status.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The style checker or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code formatter of the IDE. Also, it passes the style checker of mvn.

@dongjinleekr
Copy link
Contributor Author

@srowen Fixed, with removing redundant new in KafkaDataConsumerSuite.

@SparkQA
Copy link

SparkQA commented Sep 11, 2019

Test build #110456 has finished for PR 22282 at commit 243809f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Sep 12, 2019

I think it's OK @dongjinleekr , just needs a rebase now.

@dongjinleekr
Copy link
Contributor Author

@srowen Here is the rebase. Thanks for reviewing! 😄

@SparkQA
Copy link

SparkQA commented Sep 13, 2019

Test build #110550 has finished for PR 22282 at commit de02de4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen srowen closed this in 1675d51 Sep 13, 2019
@srowen
Copy link
Member

srowen commented Sep 13, 2019

Merged to master

@dongjoon-hyun
Copy link
Member

Great! @dongjinleekr , @srowen , @HeartSaVioR , @zsxwing !

@HeartSaVioR
Copy link
Contributor

Finally! Well done, @dongjinleekr !

PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Sep 15, 2019
## What changes were proposed in this pull request?

This update adds support for Kafka Headers functionality in Structured Streaming.

## How was this patch tested?

With following unit tests:

- KafkaRelationSuite: "default starting and ending offsets with headers" (new)
- KafkaSinkSuite: "batch - write to kafka" (updated)

Closes apache#22282 from dongjinleekr/feature/SPARK-23539.

Lead-authored-by: Lee Dongjin <dongjin@apache.org>
Co-authored-by: Jungtaek Lim <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
@gaborgsomogyi
Copy link
Contributor

Just gone through, good job @dongjinleekr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants