Skip to content

Conversation

@dejii
Copy link
Contributor

@dejii dejii commented Aug 14, 2025

This PR implements PubsubMessageQueueProvider as part of the AIP-82 #52712 initiative to expand Event-Driven Scheduling, enabling Google Cloud Pub/Sub subscriptions as message queues in Airflow's common messaging framework.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:providers kind:documentation provider:google Google (including GCP) related issues labels Aug 14, 2025
@potiuk
Copy link
Member

potiuk commented Aug 15, 2025

checks failing :(

@jason810496 jason810496 self-requested a review August 17, 2025 06:29
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the PR!

However, we may need to hold off on merging this until Refactor Common Queue Interface #54651 is merged.

As we discuss in #53556 (comment), there are some drawbacks to using Queue URI, and we should replace Queue URI with just "scheme".

In the meantime, it would be nice to take 000b6c3 as a guide for replacing the "Queue" logic with "Scheme" logic.

Thanks!

@dejii
Copy link
Contributor Author

dejii commented Aug 19, 2025

Nice! Thanks for the PR!

However, we may need to hold off on merging this until Refactor Common Queue Interface #54651 is merged.

As we discuss in #53556 (comment), there are some drawbacks to using Queue URI, and we should replace Queue URI with just "scheme".

In the meantime, it would be nice to take 000b6c3 as a guide for replacing the "Queue" logic with "Scheme" logic.

Thanks!

Thanks! I just went through the scheme logic and it looks solid. One question I had - do we have a standard for generating scheme names? Or is it decided ad hoc based on the queue type, as long as we make sure it doesn’t collide with existing ones?

If not, inspired by the redis+pubsub scheme, would it make sense to adopt a consistent pattern like <provider>+<queue>? For this PR, that would mean using google+pubsub as the scheme.

@jason810496
Copy link
Member

Or is it decided ad hoc based on the queue type, as long as we make sure it doesn’t collide with existing ones?

Yes, as long as the scheme identifier is different from existing one, the naming should be fine.

If not, inspired by the redis+pubsub scheme, would it make sense to adopt a consistent pattern like +? For this PR, that would mean using google+pubsub as the scheme.

Sure, I named the Redis one as redis+pubsub because there are multiple ways to leverage Redis as a message queue (using List, Stream, and the Redis PubSub as in the current Redis provider implementation). Therefore, it's better to maintain flexibility by using +pubsub as a suffix in Redis case.

There might also be other services in GCP that provide message queue-like functionality, so naming it google+pubsub works well for me.

Thanks!

@VladaZakharova
Copy link
Contributor

hi there :)
yes, there is this PR which is in draft: #54684
and maybe makes sense to close it to not duplicate...
also I have some questions: are we sure we want to keep this name? it can be confusing for users why we have queue (as a separate service in Google provider), but inside there is still pubsub operators.
And are you planning to add system tests for this? if yes, can you please explain what should be configured in order to run it? in general in Google provider we are trying to keep every test atomic, so all the configuration and connections that should be used inside the system tests should be created and then deleted in the system test

@dejii
Copy link
Contributor Author

dejii commented Aug 20, 2025

Hi :)

also I have some questions: are we sure we want to keep this name? it can be confusing for users why we have queue (as a separate service in Google provider), but inside there is still pubsub operators.

This structure comes from the implementation details of AIP-82 - it’s how the common.messaging provider discovers the available queues.

Reference:

def _discover_queues(self) -> None:
"""Retrieve all queues defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("queues"):
for queue_class_name in provider.data["queues"]:
if _correctness_check(provider_package, queue_class_name, provider):
self._queue_class_name_set.add(queue_class_name)

MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in providers_manager.queue_class_names]

And are you planning to add system tests for this? if yes, can you please explain what should be configured in order to run it? in general in Google provider we are trying to keep every test atomic, so all the configuration and connections that should be used inside the system tests should be created and then deleted in the system test

This can be looked into. A Pub/Sub subscription (and implicitly a topic) would be required, but given the way event-driven scheduling works, I’m not sure it can be created and torn down entirely within the scope of a system test as the subscription is what triggers the DAG.

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there is this PR which is in draft: #54684
and maybe makes sense to close it to not duplicate...

Would it be better to leave change of PubsubPullTrigger in #54684 instead of directly closing it?
Then this PR can focus on adding PubsubMessageQueueProvider , and #54684 can work indenpedently.

also I have some questions: are we sure we want to keep this name? it can be confusing for users why we have queue (as a separate service in Google provider), but inside there is still pubsub operators.

The term "Queue" in the class naming is primarily due to the AIP-82 Common Message Queue interface. To distinguish between different underlying implementations, the current PubSub version will use the scheme google+pubsub, while a future GCP Queue service can be designated as google+queue. The "Queue URI" referenced in this PR will be replaced by "scheme" after #54651.

And are you planning to add system tests for this? if yes, can you please explain what should be configured in order to run it? in general in Google provider we are trying to keep every test atomic, so all the configuration and connections that should be used inside the system tests should be created and then deleted in the system test

If I not get @VladaZakharova wrong, we should add system test under https://github.com/apache/airflow/blob/main/providers/google/tests/system/google/cloud/pubsub .

@jroachgolf84
Copy link
Collaborator

I also had a draft PR open for this quite a while ago. Please feel free to use this as reference if needed: #54684

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! The change for common message queue LGTM.

It would be nice to split the fix for PubSubTrigger to another PR, so that we can separate the new feature with the fix. Thanks!

@VladaZakharova
Copy link
Contributor

yes, please 😿

@jason810496
Copy link
Member

Sorry, so that @dejii might need to raise PR again. Thanks!

abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
dabla pushed a commit to dabla/airflow that referenced this pull request Oct 12, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
TyrellHaywood pushed a commit to TyrellHaywood/airflow that referenced this pull request Oct 22, 2025
* AIP-82: implement Google Pub/Sub message queue provider

* fix: mypy checks

* fix: spell checks

* fix: tests

* switch to scheme based configuration

* add system tests, update docs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers kind:documentation provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants