-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-6819: Add KafkaRecordSink implementations for Kafka 1 and 2 #3854
Conversation
@mattyb149 reviewing |
@mattyb149 Happened to run across an interesting error (which I think is unrelated to PR and may be an knwon issue with Kafka api). My test cluster was initially misconfigured where I had a replication factor of 3 set for the internal consumer offset topic (via offsets.topic.replication.factor) yet only had one broker configured. The Kafka Sink appeared to send data, according to debug, however Consume Kafka Record and the command line consumer could not retrieve any messages. Looking into the error log I saw the issue, however I was surprised that this didn't appear in NiFi logs upon sending of the record: This error can be ignored if the cluster is starting up and not all brokers are up yet. There is a JIRA I believe is related here: https://issues.apache.org/jira/browse/KAFKA-5666 but just want to confirm if this is a potential issue that can be ignored or documented? |
@mattyb149 I was able to verify for secure and non-secure kafka cluster(TLS) both Kafka Record Sink versions 1.0 and 2.0. Records appeared on topics as expected. If we confirm the issue I posted earlier is specific to the Kafka client (and not related to this feature) I'm fine with merging and documenting. |
@mattyb149 I tried this as well with Kerberos and all looks good. |
12ec5f5
to
e94ed24
Compare
I updated the versions to 1.11 and found a bug where I wasn't closing the producer when the CS was disabled so I added some cleanup code. I also found some references to "Flow Files" when I'd copied some properties from the PublishKafkaRecord processor(s) so I corrected those. I was able to verify that when the offsets replication factor is higher than the number of brokers, the data doesn't get published but no error or information is returned to the client. I agree with Yolanda that this is https://issues.apache.org/jira/browse/KAFKA-5666 and unfortunately there is no workaround at the client side. |
@mattyb149 thanks Matt for confirming that error. I was able to revalidate this with your updates and confirmed when disabling the producer did not continue submitting messages. Thanks for adding this feature! +1 Will merge shortly |
…che#3854) * NIFI-6819: Add KafkaRecordSink implementations for Kafka 1 and 2 * Updated version to 1.11, corrected doc, added cleanup code
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
Added RecordSinkService implementations in the Kafka 1 and Kafka 2 bundles. Also refactored some utils for accessibility and removed the local MockRecordWriter impls in favor of the standard one in mock-record-utils.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically
master
)?Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squash
or use--force
when pushing to allow for clean monitoring of changes.For code changes:
mvn -Pcontrib-check clean install
at the rootnifi
folder?LICENSE
file, including the mainLICENSE
file undernifi-assembly
?NOTICE
file, including the mainNOTICE
file found undernifi-assembly
?.displayName
in addition to .name (programmatic access) for each of the new properties?For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.