Skip to content

Conversation

@MaksYermak
Copy link
Contributor

In this PR I have created operators for working with Consumer Groups for GCP Managed Kafka service. Also, I added documentation on how to interact with operators from the Apache Kafka provider.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added provider:apache-kafka provider:google Google (including GCP) related issues labels Feb 25, 2025
@MaksYermak MaksYermak force-pushed the apache-kafka-consumer-groups branch from 7d4472d to 735b394 Compare March 3, 2025 10:09
@MaksYermak MaksYermak force-pushed the apache-kafka-consumer-groups branch from 735b394 to 9befc60 Compare March 3, 2025 12:17
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIce - more reuse between providers :)

@potiuk potiuk merged commit 6008483 into apache:main Mar 3, 2025
147 checks passed
shahar1 pushed a commit to shahar1/airflow that referenced this pull request Mar 5, 2025
from confluent_kafka.admin import AdminClient

from airflow.hooks.base import BaseHook
from airflow.providers.google.cloud.hooks.managed_kafka import ManagedKafkaHook
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we import Google hook in kafka provider?
This doesn't feel right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaBaseHook is used in many places this import force to install google provider to use the Kafka provider.
Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed. sorry I missed that one. Good you noticed it.

Copy link
Member

@potiuk potiuk Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaksYermak -> can you please fix that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal @potiuk I am ready to prepare a fix ASAP. But I have a question. In case when we can't use a google Hook inside Kafka provider what is a better way to use code from the google provider? Could I make a conditional import here as a solution?

The reason for using Google Hook was that for the Kafka cluster managed by Google for establishing connection we need to pass a function which generates a token for Confluent with google's credentials. And a function which gets a google's credential exists only inside the Google provider.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

provider:apache-kafka provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants