Skip to content

"execute cannot be called outside TaskInstance" warning fired despite being in a TaskInstance #41470

@SKalide

Description

@SKalide

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

BigQueryTableExistenceSensor fire an "execute cannot be called outside TaskInstance" warning on execute

My observation: The ExecutorSafeguard is being called twice for a single sensor execution

Reason is, that the first encounter is over the TaskInstance _execute_callable (as intended). Now the decorater checks, if it is executed within a Taskinstance. After that, it will call the "real" execute function, which is also decorated and therefor will do the check again, but without a sentinel, which leads to a failed check and fires a warning (or raise a exception if allow_nested_operators is set).

See also: #41426

And there was also already opened an issue with the same problem but not solved, see #39413

What you think should happen instead?

Decorater should not fire a warning. Maybe it should be checked, if the decorater was already called and therefor did already checked it.

How to reproduce

from airflow import DAG
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
from airflow.utils.dates import days_ago
from airflow.models.baseoperator import BaseOperator


class LoggingBaseOperator(BaseOperator):
    def execute(self, context):
        self.log.info(f"Executing {self.__class__.__name__}")
        return super().execute(context)


class LoggingBigQueryTableExistenceSensor(
    BigQueryTableExistenceSensor, LoggingBaseOperator
):
    def poke(self, context):
        self.log.info(f"Poking {self.__class__.__name__}")
        return True  


dag = DAG(
    "test_bigquery_sensor_double_execution",
    default_args={
        "start_date": days_ago(1),
    },
    description="A simple DAG to test BigQueryTableExistenceSensor execution",
    schedule_interval=None,
)

sensor_task = LoggingBigQueryTableExistenceSensor(
    task_id="test_sensor",
    project_id="your-project-id",
    dataset_id="your-dataset-id",
    table_id="your-table-id",
    poke_interval=60,
    dag=dag,
)

log_bg.txt

Operating System

Ubuntu 22.04.4 LTS

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Docker image: apache/airflow:2.9.3-python3.11

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions