-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Create operators for working with Consumer Groups for GCP Apache Kafka #47056
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create operators for working with Consumer Groups for GCP Apache Kafka #47056
Conversation
7d4472d to
735b394
Compare
735b394 to
9befc60
Compare
potiuk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIce - more reuse between providers :)
| from confluent_kafka.admin import AdminClient | ||
|
|
||
| from airflow.hooks.base import BaseHook | ||
| from airflow.providers.google.cloud.hooks.managed_kafka import ManagedKafkaHook |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we import Google hook in kafka provider?
This doesn't feel right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaBaseHook is used in many places this import force to install google provider to use the Kafka provider.
Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah indeed. sorry I missed that one. Good you noticed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaksYermak -> can you please fix that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eladkal @potiuk I am ready to prepare a fix ASAP. But I have a question. In case when we can't use a google Hook inside Kafka provider what is a better way to use code from the google provider? Could I make a conditional import here as a solution?
The reason for using Google Hook was that for the Kafka cluster managed by Google for establishing connection we need to pass a function which generates a token for Confluent with google's credentials. And a function which gets a google's credential exists only inside the Google provider.
In this PR I have created operators for working with Consumer Groups for GCP Managed Kafka service. Also, I added documentation on how to interact with operators from the Apache Kafka provider.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.