Skip to content

Some DAGS stuck in schedule state after migrating from 2.10.5 to 3.0.0 and also DAG code is not visible for them #49618

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

3.0.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Some DAGS stuck in the schedule state after migrating from 2.10.5 to 3.0.0 and also DAG code is not visible for them

Image Image

What you think should happen instead?

No response

How to reproduce

  1. ADD below DAG in AF2
  2. Migrate from AF2 to AF3
  3. Try to check soruce_code

DAG

from airflow.models import DAG
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago

from airflow_dag_introspection import assert_the_task_states

from datetime import datetime, time
import time as Time

docs = """
####Purpose
This dag tests the BranchDateTimeOperator to ensure that tasks within the timeframe set run and tasks outside of that timeframe are skipped.\n
It acheives this test by setting the start date of the tasks that are skipped to start one day earlier than set in the dag parameter 'start_date' so that the skipped tasks are always outside of the time range.\n
####Expected Behavior
This dag has 6 tasks 4 of the tasks are expected to succeed while 2 of the tasks are expected to be skipped.\n
This dag should pass.
"""

with DAG(
    dag_id="branch_date_time_operator",
    start_date=days_ago(2),
    schedule_interval=None,
    concurrency=1,
    doc_md=docs,
    tags=['core']
) as dag:

    d0 = DummyOperator(task_id="dummy0", start_date=days_ago(3))
    d1 = DummyOperator(task_id="dummy1", start_date=days_ago(3))

    d2 = DummyOperator(task_id="dummy2")
    d3 = DummyOperator(task_id="dummy3")

    bdto = BranchDateTimeOperator(
        task_id="branch_datetime",
        follow_task_ids_if_true=["dummy0", "dummy1"],
        follow_task_ids_if_false=["dummy2", "dummy3"],
        # target_upper and target_lower are time ranges for tasks to be in
        target_upper=time(hour=2, minute=33),
        target_lower=time(hour=0, minute=0, second=22),
    )

    py0 = PythonOperator(
        task_id="assert_the_states",
        python_callable=assert_the_task_states,
        op_args=[{
            "branch_datetime": "success",
            "dummy0": "skipped",
            "dummy1": "skipped",
            "dummy2": "success",
            "dummy3": "success",
            }],
        trigger_rule=TriggerRule.ALL_DONE,
        
    )

bdto >> [d0, d1, d2, d3] >> py0

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

affected_version:3.0Issues Reported for 3.0area:corearea:upgradeFacilitating migration to a newer version of Airflowkind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions