-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.11.0
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
Summary:
CloudComposerDAGRunSensor incorrectly returns True (success) when no DAG runs exist within the specified execution_range, potentially causing downstream tasks to execute without the expected upstream dependencies being met.
What you think should happen instead?
The sensor should only return True when:
- At least one DAG run exists within the execution_range, AND
- All DAG runs within that range are in allowed_states
If no runs exist in the time window, the sensor should return False (continue waiting).
Actual Behavior
The sensor returns True even when no DAG runs exist within the execution_range, causing downstream tasks to proceed without the expected dependency being met.
Impact
This bug can cause serious production issues:
- Silent Data Pipeline Failures: Downstream tasks execute without required upstream data
- Data Corruption: Tasks process incomplete or missing datasets
- No Error Alerts: The sensor succeeds silently, providing no indication of the missing dependency
- Difficult to Debug: The issue only manifests as data inconsistencies, not task failures
Real-World Scenario
Daily ETL pipeline on Astronomer depends on Composer DAG
wait_for_composer = CloudComposerDAGRunSensor(
composer_dag_id="daily_data_ingestion",
execution_range=[yesterday_9am, yesterday_5pm], # Yesterday's business hours
)If the Composer DAG daily_data_ingestion never ran yesterday:
- ❌ Buggy Behavior: Sensor returns True → Downstream ETL proceeds with no data → Data corruption
- ✅ Expected Behavior: Sensor returns False → Keeps waiting → Eventually times out with clear error
How to reproduce
Steps to Reproduce
1. Create Test DAG
from airflow.decorators import dag
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
from datetime import datetime, timedelta, timezone
from airflow.utils.dates import days_ago
@dag(schedule_interval=None, start_date=days_ago(1), catchup=False)
def test_composer_sensor_bug():
# Set up time window where NO runs exist
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
start_time = datetime(yesterday.year, yesterday.month, yesterday.day, 11, 0, 0, tzinfo=timezone.utc)
end_time = datetime(yesterday.year, yesterday.month, yesterday.day, 14, 0, 0, tzinfo=timezone.utc)
# Monitor a DAG that has runs, but NONE in the time window
sensor = CloudComposerDAGRunSensor(
task_id="test_sensor",
project_id="my-project",
region="us-east1",
environment_id="my-composer-env",
composer_dag_id="target_dag", # This DAG exists but didn't run in the window
execution_range=[start_time, end_time],
poke_interval=10,
timeout=60,
)
test_composer_sensor_bug()2. Prerequisites
- A Composer environment with a DAG that has historical runs
- Ensure the target DAG has NO runs within the specified time window
- Example: If checking yesterday 11:00-14:00, ensure the DAG didn't run during that time
3. Execute DAG
airflow dags trigger test_composer_sensor_bug
4. Observe Buggy Behavior
- Expected: Sensor times out after 60 seconds (no runs in window)
- Actual: Sensor immediately succeeds (returns True)
Root Cause Analysis
Buggy Code
Location: airflow/providers/google/cloud/sensors/cloud_composer.py
def _check_dag_runs_states( self, dag_runs: list[dict], start_date: datetime, end_date: datetime, ) -> bool: for dag_run in dag_runs: if ( start_date.timestamp() < parser.parse( dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"] ).timestamp() < end_date.timestamp() ) and dag_run["state"] not in self.allowed_states: return False return True # ❌ BUG: Returns True even if no runs found in window!
Why It Fails
Scenario: No DAG runs in time window
- dag_runs contains runs, but all are outside the time window
- Loop iterates through all runs
- For each run, the time window check start_date < execution_date < end_date evaluates to False
- The and operator short-circuits, so dag_run["state"] not in self.allowed_states is never evaluated
- Loop never returns False
- Method reaches return True at the end ❌
Proposed Fix
Fixed Code
def _check_dag_runs_states(
self,
dag_runs: list[dict],
start_date: datetime,
end_date: datetime,
) -> bool:
found_runs_in_window = False # Track if we found any runs
for dag_run in dag_runs:
execution_date = parser.parse(
dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"]
)
# Check if run is within time window
if start_date.timestamp() < execution_date.timestamp() < end_date.timestamp():
found_runs_in_window = True # Mark that we found at least one run
# If any run in window is not in allowed states, return False immediately
if dag_run["state"] not in self.allowed_states:
return False
# ✅ FIX: Only return True if we found at least one run in the window
return found_runs_in_windowKey Changes
- Added found_runs_in_window flag: Tracks whether at least one run exists in the time window
- Separated conditions: Split the compound condition to set the flag independently
- Conditional return: Only return True if runs were found AND all are in allowed states
Operating System
/usr/local/airflow$ cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"
Versions of Apache Airflow Providers
apache-airflow-providers-google==15.1.0
Deployment
Astronomer
Deployment details
Astro Runtime
13.2.0 (Based on Airflow 2.11.0)
Anything else?
diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py b/airflow/providers/google/cloud/sensors/cloud_composer.py
index abc123..def456 100644
--- a/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -XXX,XX +XXX,XX @@ class CloudComposerDAGRunSensor(BaseSensorOperator):
def _check_dag_runs_states(
self,
dag_runs: list[dict],
start_date: datetime,
end_date: datetime,
) -> bool:
+ found_runs_in_window = False
+
for dag_run in dag_runs:
- if (
- start_date.timestamp()
- < parser.parse(
- dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"]
- ).timestamp()
- < end_date.timestamp()
- ) and dag_run["state"] not in self.allowed_states:
- return False
- return True
+ execution_date = parser.parse(
+ dag_run["execution_date" if self._composer_airflow_version < 3 else "logical_date"]
+ )
+
+ if start_date.timestamp() < execution_date.timestamp() < end_date.timestamp():
+ found_runs_in_window = True
+
+ if dag_run["state"] not in self.allowed_states:
+ return False
+
+ return found_runs_in_windowAre you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct