Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6819: Add KafkaRecordSink implementations for Kafka 1 and 2 #3854

Merged
merged 2 commits into from
Nov 5, 2019

Conversation

mattyb149
Copy link
Contributor

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

Added RecordSinkService implementations in the Kafka 1 and Kafka 2 bundles. Also refactored some utils for accessibility and removed the local MockRecordWriter impls in favor of the standard one in mock-record-utils.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on both JDK 8 and JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

@YolandaMDavis
Copy link
Contributor

@mattyb149 reviewing

@YolandaMDavis
Copy link
Contributor

YolandaMDavis commented Oct 31, 2019

@mattyb149 Happened to run across an interesting error (which I think is unrelated to PR and may be an knwon issue with Kafka api). My test cluster was initially misconfigured where I had a replication factor of 3 set for the internal consumer offset topic (via offsets.topic.replication.factor) yet only had one broker configured. The Kafka Sink appeared to send data, according to debug, however Consume Kafka Record and the command line consumer could not retrieve any messages. Looking into the error log I saw the issue, however I was surprised that this didn't appear in NiFi logs upon sending of the record:

This error can be ignored if the cluster is starting up and not all brokers are up yet.
2019-10-31 14:16:45,149 ERROR kafka.server.KafkaApis: [KafkaApi-72] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

There is a JIRA I believe is related here: https://issues.apache.org/jira/browse/KAFKA-5666 but just want to confirm if this is a potential issue that can be ignored or documented?

@YolandaMDavis
Copy link
Contributor

@mattyb149 I was able to verify for secure and non-secure kafka cluster(TLS) both Kafka Record Sink versions 1.0 and 2.0. Records appeared on topics as expected. If we confirm the issue I posted earlier is specific to the Kafka client (and not related to this feature) I'm fine with merging and documenting.

@YolandaMDavis
Copy link
Contributor

@mattyb149 I tried this as well with Kerberos and all looks good.

@mattyb149
Copy link
Contributor Author

I updated the versions to 1.11 and found a bug where I wasn't closing the producer when the CS was disabled so I added some cleanup code. I also found some references to "Flow Files" when I'd copied some properties from the PublishKafkaRecord processor(s) so I corrected those.

I was able to verify that when the offsets replication factor is higher than the number of brokers, the data doesn't get published but no error or information is returned to the client. I agree with Yolanda that this is https://issues.apache.org/jira/browse/KAFKA-5666 and unfortunately there is no workaround at the client side.

@YolandaMDavis
Copy link
Contributor

@mattyb149 thanks Matt for confirming that error. I was able to revalidate this with your updates and confirmed when disabling the producer did not continue submitting messages. Thanks for adding this feature!

+1

Will merge shortly

@YolandaMDavis YolandaMDavis merged commit 442a80b into apache:master Nov 5, 2019
patricker pushed a commit to patricker/nifi that referenced this pull request Jan 22, 2020
…che#3854)

* NIFI-6819: Add KafkaRecordSink implementations for Kafka 1 and 2

* Updated version to 1.11, corrected doc, added cleanup code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants