-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
affected_version:3.0Issues Reported for 3.0Issues Reported for 3.0area:corearea:upgradeFacilitating migration to a newer version of AirflowFacilitating migration to a newer version of Airflowkind:bugThis is a clearly a bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release
Milestone
Description
Apache Airflow version
3.0.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Some DAGS stuck in the schedule state after migrating from 2.10.5 to 3.0.0 and also DAG code is not visible for them
What you think should happen instead?
No response
How to reproduce
- ADD below DAG in AF2
- Migrate from AF2 to AF3
- Try to check soruce_code
DAG
from airflow.models import DAG
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago
from airflow_dag_introspection import assert_the_task_states
from datetime import datetime, time
import time as Time
docs = """
####Purpose
This dag tests the BranchDateTimeOperator to ensure that tasks within the timeframe set run and tasks outside of that timeframe are skipped.\n
It acheives this test by setting the start date of the tasks that are skipped to start one day earlier than set in the dag parameter 'start_date' so that the skipped tasks are always outside of the time range.\n
####Expected Behavior
This dag has 6 tasks 4 of the tasks are expected to succeed while 2 of the tasks are expected to be skipped.\n
This dag should pass.
"""
with DAG(
dag_id="branch_date_time_operator",
start_date=days_ago(2),
schedule_interval=None,
concurrency=1,
doc_md=docs,
tags=['core']
) as dag:
d0 = DummyOperator(task_id="dummy0", start_date=days_ago(3))
d1 = DummyOperator(task_id="dummy1", start_date=days_ago(3))
d2 = DummyOperator(task_id="dummy2")
d3 = DummyOperator(task_id="dummy3")
bdto = BranchDateTimeOperator(
task_id="branch_datetime",
follow_task_ids_if_true=["dummy0", "dummy1"],
follow_task_ids_if_false=["dummy2", "dummy3"],
# target_upper and target_lower are time ranges for tasks to be in
target_upper=time(hour=2, minute=33),
target_lower=time(hour=0, minute=0, second=22),
)
py0 = PythonOperator(
task_id="assert_the_states",
python_callable=assert_the_task_states,
op_args=[{
"branch_datetime": "success",
"dummy0": "skipped",
"dummy1": "skipped",
"dummy2": "success",
"dummy3": "success",
}],
trigger_rule=TriggerRule.ALL_DONE,
)
bdto >> [d0, d1, d2, d3] >> py0
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:3.0Issues Reported for 3.0Issues Reported for 3.0area:corearea:upgradeFacilitating migration to a newer version of AirflowFacilitating migration to a newer version of Airflowkind:bugThis is a clearly a bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release