-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow==2.5.3
apache-airflow-providers-cncf-kubernetes==7.10.0
apache-airflow-providers-common-sql==1.8.1
apache-airflow-providers-ftp==3.6.1
apache-airflow-providers-google==10.12.0
apache-airflow-providers-http==4.7.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-pagerduty==2.1.2
apache-airflow-providers-slack==8.4.0
apache-airflow-providers-sqlite==3.5.0
google-cloud-pubsub==2.18.4
Apache Airflow version
v2.5.3+composer
Operating System
Google Cloud Composer (unsure)
Deployment
Google Cloud Composer
Deployment details
No response
What happened
When attempting to use the Sensor PubSubPullSensor in specifically deferrable mode with a non-default gcp connection, we encounter a lack of permissions despite relevant service account having proper permissions (verified via gcloud command line).
airflow-triggerer , poke_interval=10.0, gcp_conn_id=cre-raw-data-ingest-prod-service-account, impersonation_chain=None> (ID 7752) fired: TriggerEvent<{'status': 'error', 'message': "('Error pulling messages from subscription projects/xp-raw-data-ingest-staging/subscriptions/ili-subscription', PermissionDenied('User not authorized to perform this action.'))"}>
With the same arguments and connection, in non-deferrable mode, we observe successful sensor operation (ability to wait, pull, and ack messages).
It appears the underlying trigger does not pass along connection id parameters to the hook
In the non-deferrable pathway, we can see the conn_id and impersonation chain passed to the hook
In our triggerer logs, we see related evidence of use of the default gcp conn id.
2024-09-11 09:59:49.344 PDT
airflow-triggerer Using connection ID 'google_cloud_default' for task execution.
2024-09-11 09:59:49.346 PDT
airflow-triggerer Getting connection using `google.auth.default()` since no explicit credentials are provided.
2024-09-11 09:59:49.361 PDT
airflow-triggerer Pulling max 1 messages from subscription (path) projects/xp-raw-data-ingest-staging/subscriptions/ili-subscription
2024-09-11 09:59:49.433 PDT
airflow-triggerer Trigger <airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger project_id=xp-raw-data-ingest-staging, subscription=ili-subscription, max_messages=1, ack_messages=True, messages_callback=def _default_message_callback(
2024-09-11 09:59:49.434 PDT
airflow-triggerer pulled_messages: List[ReceivedMessage],
How to reproduce
- Create a GCP Pubsub Topic + Pull Subscription
- Grant
roles/pubsub.adminto a service account on the project or created topic - Create dag w/ PubSubPullSensor task, with a conn_id referencing the gcp service account which was granted permissions, and toggle between deferrable/non-deferrable mode.
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id="the_project",
gcp_conn_id="the_conn_id",
subscription="the-subscription",
deferrable=True, # or False
max_messages=1,
)Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct