Skip to content

‎PubsubPullTrigger does not pass gcp_conn_id to underlying hook #42160

@nickmarx12345678

Description

@nickmarx12345678

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow==2.5.3
apache-airflow-providers-cncf-kubernetes==7.10.0
apache-airflow-providers-common-sql==1.8.1
apache-airflow-providers-ftp==3.6.1
apache-airflow-providers-google==10.12.0
apache-airflow-providers-http==4.7.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-pagerduty==2.1.2
apache-airflow-providers-slack==8.4.0
apache-airflow-providers-sqlite==3.5.0
google-cloud-pubsub==2.18.4

Apache Airflow version

v2.5.3+composer

Operating System

Google Cloud Composer (unsure)

Deployment

Google Cloud Composer

Deployment details

No response

What happened

When attempting to use the Sensor PubSubPullSensor in specifically deferrable mode with a non-default gcp connection, we encounter a lack of permissions despite relevant service account having proper permissions (verified via gcloud command line).

airflow-triggerer , poke_interval=10.0, gcp_conn_id=cre-raw-data-ingest-prod-service-account, impersonation_chain=None> (ID 7752) fired: TriggerEvent<{'status': 'error', 'message': "('Error pulling messages from subscription projects/xp-raw-data-ingest-staging/subscriptions/ili-subscription', PermissionDenied('User not authorized to perform this action.'))"}> 

With the same arguments and connection, in non-deferrable mode, we observe successful sensor operation (ability to wait, pull, and ack messages).

It appears the underlying trigger does not pass along connection id parameters to the hook

In the non-deferrable pathway, we can see the conn_id and impersonation chain passed to the hook

In our triggerer logs, we see related evidence of use of the default gcp conn id.

2024-09-11 09:59:49.344 PDT
airflow-triggerer Using connection ID 'google_cloud_default' for task execution. 
2024-09-11 09:59:49.346 PDT
airflow-triggerer Getting connection using `google.auth.default()` since no explicit credentials are provided. 
2024-09-11 09:59:49.361 PDT
airflow-triggerer Pulling max 1 messages from subscription (path) projects/xp-raw-data-ingest-staging/subscriptions/ili-subscription 
2024-09-11 09:59:49.433 PDT
airflow-triggerer Trigger <airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger project_id=xp-raw-data-ingest-staging, subscription=ili-subscription, max_messages=1, ack_messages=True, messages_callback=def _default_message_callback( 
2024-09-11 09:59:49.434 PDT
airflow-triggerer     pulled_messages: List[ReceivedMessage], 

How to reproduce

  • Create a GCP Pubsub Topic + Pull Subscription
  • Grant roles/pubsub.admin to a service account on the project or created topic
  • Create dag w/ PubSubPullSensor task, with a conn_id referencing the gcp service account which was granted permissions, and toggle between deferrable/non-deferrable mode.
    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id="the_project",
        gcp_conn_id="the_conn_id",
        subscription="the-subscription",
        deferrable=True, # or False
        max_messages=1,
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:providerskind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetprovider:googleGoogle (including GCP) related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions