Skip to content

Sensor task is not skipped after ShortCircuitOperator task if direct child relation #52869

@Ferdinanddb

Description

@Ferdinanddb

Apache Airflow Provider(s)

standard

Versions of Apache Airflow Providers

1.3.0

Apache Airflow version

3.0.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Official Apache Airflow Helm Chart

Deployment details

I created my own custom container image using the official one docker.io/apache/airflow:3.0.2-python3.12

In it I install the the following requirements.txt file with the following command:

RUN pip install \
    --no-cache-dir \
    -r /var/airflow/requirements.txt
apache-airflow[async]==3.0.2
astronomer-cosmos==1.10.1
apache-airflow-providers-amazon==9.9.0
apache-airflow-providers-google==16.0.0
apache-airflow-providers-cncf-kubernetes==10.6.0
apache-airflow-providers-celery==3.12.0
apache-airflow-providers-standard==1.3.0
apache-airflow-providers-postgres==6.2.0
apache-airflow-providers-slack==9.1.1
apache-airflow-providers-http==5.3.1
apache-airflow-providers-fab==2.2.1

I did not use the pip's constraint flag to be able to try using the latest providers' versions.

What happened

I am using the ShortCircuitOperator and it turns out that my sensor task (custom sensor class created by me to add a missing option on the official S3FileSensor) that is happening after my ShortCircuitOperator is not skipped when my task evaluates to False (meaning all child tasks should be skipped).

Image

In the following screenshot showing the XCOMs, we can clearly see that the sensor task is not included in the value.

Image

The code of the two first tasks is:

    check_dis_is_mon_to_fri_not_holiday = ShortCircuitOperator(
        task_id="check_dis_is_mon_to_fri_not_holiday",
        python_callable=should_run_mon_to_fri,
        op_kwargs={
            "check_algoseek_holiday": True,
        },
    )

    wait_for_ticker_to_secid_lookup_s3_file = S3FileSensor(
        task_id="wait_for_ticker_to_secid_lookup_s3_file",
        bucket_name=S3_EQUITY_LOOKUP_BUCKET_NAME,
        object_key="ticker_to_secid_lookup.csv",
        poke_interval=600,
        timeout=timedelta(seconds=24 * 60 * 60 * 5),
        modified_after="{{ data_interval_end | ts }}",
        mode="poke",
        deferrable=True,
    )

The interesting part is that if I add a dummy EmptyOperator task between my shortcircuit and my sensor then I get the expected behavior, this workaround is fine for me but I still think something is off.

Image

What you think should happen instead

The ShortCircuitOperator should skip all the child tasks (Operators, Sensors, ...) when it evaluates to False and when the ignore_downstream_trigger_rules flag is set to True.

How to reproduce

I think it can be reproduced by trying to execute any Sensor task after a ShortCircuitOperator task evaluates to False.

Anything else

My workaround is to add a dummy EmptyOperator between the ShortCircuitOperator and the Sensor, it works as expected with that, but it also clearly indicates that something is off in my opinion. The same code was working fine in Airflow 2.11.0.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions