Skip to content

CloudComposerDAGRunSensor returns success when no Dag runs found in execution_range #57512

@Git4Vishal

Description

@Git4Vishal

Apache Airflow version

2.11.0

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

Summary:
CloudComposerDAGRunSensor incorrectly returns True (success) when no DAG runs exist within the specified execution_range, potentially causing downstream tasks to execute without the expected upstream dependencies being met.

What you think should happen instead?

The sensor should only return True when:

  1. At least one DAG run exists within the execution_range, AND
  2. All DAG runs within that range are in allowed_states

If no runs exist in the time window, the sensor should return False (continue waiting).

Actual Behavior
The sensor returns True even when no DAG runs exist within the execution_range, causing downstream tasks to proceed without the expected dependency being met.

Impact
This bug can cause serious production issues:

  1. Silent Data Pipeline Failures: Downstream tasks execute without required upstream data
  2. Data Corruption: Tasks process incomplete or missing datasets
  3. No Error Alerts: The sensor succeeds silently, providing no indication of the missing dependency
  4. Difficult to Debug: The issue only manifests as data inconsistencies, not task failures

Real-World Scenario

Daily ETL pipeline on Astronomer depends on Composer DAG

wait_for_composer = CloudComposerDAGRunSensor(
    composer_dag_id="daily_data_ingestion",
    execution_range=[yesterday_9am, yesterday_5pm],  # Yesterday's business hours
)

If the Composer DAG daily_data_ingestion never ran yesterday:

  • ❌ Buggy Behavior: Sensor returns True → Downstream ETL proceeds with no data → Data corruption
  • ✅ Expected Behavior: Sensor returns False → Keeps waiting → Eventually times out with clear error

How to reproduce

Steps to Reproduce
1. Create Test DAG

from airflow.decorators import dag
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
from datetime import datetime, timedelta, timezone
from airflow.utils.dates import days_ago

@dag(schedule_interval=None, start_date=days_ago(1), catchup=False)
def test_composer_sensor_bug():

    # Set up time window where NO runs exist
    yesterday = datetime.now(timezone.utc) - timedelta(days=1)
    start_time = datetime(yesterday.year, yesterday.month, yesterday.day, 11, 0, 0, tzinfo=timezone.utc)
    end_time = datetime(yesterday.year, yesterday.month, yesterday.day, 14, 0, 0, tzinfo=timezone.utc)

    # Monitor a DAG that has runs, but NONE in the time window
    sensor = CloudComposerDAGRunSensor(
        task_id="test_sensor",
        project_id="my-project",
        region="us-east1",
        environment_id="my-composer-env",
        composer_dag_id="target_dag",  # This DAG exists but didn't run in the window
        execution_range=[start_time, end_time],
        poke_interval=10,
        timeout=60,
    )

test_composer_sensor_bug()

2. Prerequisites

  • A Composer environment with a DAG that has historical runs
  • Ensure the target DAG has NO runs within the specified time window
  • Example: If checking yesterday 11:00-14:00, ensure the DAG didn't run during that time

3. Execute DAG
airflow dags trigger test_composer_sensor_bug
4. Observe Buggy Behavior

  • Expected: Sensor times out after 60 seconds (no runs in window)
  • Actual: Sensor immediately succeeds (returns True)

Root Cause Analysis
Buggy Code
Location: airflow/providers/google/cloud/sensors/cloud_composer.py
def _check_dag_runs_states( self, dag_runs: list[dict], start_date: datetime, end_date: datetime, ) -> bool: for dag_run in dag_runs: if ( start_date.timestamp() < parser.parse( dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"] ).timestamp() < end_date.timestamp() ) and dag_run["state"] not in self.allowed_states: return False return True # ❌ BUG: Returns True even if no runs found in window!
Why It Fails
Scenario: No DAG runs in time window

  • dag_runs contains runs, but all are outside the time window
  • Loop iterates through all runs
  • For each run, the time window check start_date < execution_date < end_date evaluates to False
  • The and operator short-circuits, so dag_run["state"] not in self.allowed_states is never evaluated
  • Loop never returns False
  • Method reaches return True at the end ❌

Proposed Fix
Fixed Code

def _check_dag_runs_states(
    self,
    dag_runs: list[dict],
    start_date: datetime,
    end_date: datetime,
) -> bool:
    found_runs_in_window = False  # Track if we found any runs

    for dag_run in dag_runs:
        execution_date = parser.parse(
            dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"]
        )

        # Check if run is within time window
        if start_date.timestamp() < execution_date.timestamp() < end_date.timestamp():
            found_runs_in_window = True  # Mark that we found at least one run

            # If any run in window is not in allowed states, return False immediately
            if dag_run["state"] not in self.allowed_states:
                return False

    # ✅ FIX: Only return True if we found at least one run in the window
    return found_runs_in_window

Key Changes

  1. Added found_runs_in_window flag: Tracks whether at least one run exists in the time window
  2. Separated conditions: Split the compound condition to set the flag independently
  3. Conditional return: Only return True if runs were found AND all are in allowed states

Operating System

/usr/local/airflow$ cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

Versions of Apache Airflow Providers

apache-airflow-providers-google==15.1.0

Deployment

Astronomer

Deployment details

Astro Runtime
13.2.0 (Based on Airflow 2.11.0)

Anything else?

diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py b/airflow/providers/google/cloud/sensors/cloud_composer.py
index abc123..def456 100644
--- a/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -XXX,XX +XXX,XX @@ class CloudComposerDAGRunSensor(BaseSensorOperator):
     def _check_dag_runs_states(
         self,
         dag_runs: list[dict],
         start_date: datetime,
         end_date: datetime,
     ) -> bool:
+        found_runs_in_window = False
+
         for dag_run in dag_runs:
-            if (
-                start_date.timestamp()
-                < parser.parse(
-                    dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"]
-                ).timestamp()
-                < end_date.timestamp()
-            ) and dag_run["state"] not in self.allowed_states:
-                return False
-        return True
+            execution_date = parser.parse(
+                dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"]
+            )
+
+            if start_date.timestamp() < execution_date.timestamp() < end_date.timestamp():
+                found_runs_in_window = True
+
+                if dag_run["state"] not in self.allowed_states:
+                    return False
+
+        return found_runs_in_window

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions