Skip to content

[Graph view] After clearing the task (and its downstream tasks) in a task group the task group becomes disconnected from the dag #26059

@milczar

Description

@milczar

Apache Airflow version

2.3.4

What happened

n the graph view of the dag, after clearing the task (and its downstream tasks) in a task group and refreshing the page the browser the task group becomes disconnected from the dag. See attached gif.
airflow_2_3_4_task_group_bug
The issue is not persistent and consistent. The graph view becomes disconnected from time to time as you can see on the attached video.

What you think should happen instead

The graph should be rendered properly and consistently.

How to reproduce

  1. Add the following dag to the dag folder:
import logging
import time
from typing import List

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup


def log_function(message: str, **kwargs):
    logging.info(message)
    time.sleep(3)


def create_file_handling_task_group(supplier):
    with TaskGroup(group_id=f"file_handlig_task_group_{supplier}", ui_color='#666666') as file_handlig_task_group:
        entry = PythonOperator(
            task_id='entry',
            python_callable=log_function,
            op_kwargs={'message': 'create_file_handlig_task_group-Entry-task'}
        )
        with TaskGroup(group_id=f"file_handling_task_sub_group-{supplier}",
                       ui_color='#666666') as file_handlig_task_sub_group:
            sub_group_submit = PythonOperator(
                task_id='sub_group_submit',
                python_callable=log_function,
                op_kwargs={'message': 'create_file_handlig_sub_group_submit'}
            )
            sub_group_monitor = PythonOperator(
                task_id='sub_group_monitor',
                python_callable=log_function,
                op_kwargs={'message': 'create_file_handlig_sub_group_monitor'}
            )
            sub_group_submit >> sub_group_monitor
        entry >> file_handlig_task_sub_group
    return file_handlig_task_group


def get_stage_1_taskgroups(supplierlist: List) -> List[TaskGroup]:
    return [create_file_handling_task_group(supplier) for supplier in supplierlist]


def connect_stage1_to_stage2(self, stage1_tasks: List[TaskGroup], stage2_tasks: List[TaskGroup]) -> None:
    if stage2_tasks:
        for stage1_task in stage1_tasks:
            supplier_code: str = self.get_supplier_code(stage1_task)
            stage2_task = self.get_suppliers_tasks(supplier_code, stage2_tasks)
            stage1_task >> stage2_task


def get_stage_2_taskgroup(taskgroup_id: str):
    with TaskGroup(group_id=taskgroup_id, ui_color='#666666') as stage_2_taskgroup:
        sub_group_submit = PythonOperator(
            task_id='sub_group_submit',
            python_callable=log_function,
            op_kwargs={'message': 'create_file_handlig_sub_group_submit'}
        )
        sub_group_monitor = PythonOperator(
            task_id='sub_group_monitor',
            python_callable=log_function,
            op_kwargs={'message': 'create_file_handlig_sub_group_monitor'}
        )
        sub_group_submit >> sub_group_monitor
    return stage_2_taskgroup


def create_dag():
    with DAG(
            dag_id="horizon-task-group-bug",
            start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
            catchup=False,
            description="description"
    ) as dag:
        start = PythonOperator(
            task_id='start_main',
            python_callable=log_function,
            op_kwargs={'message': 'Entry-task'}
        )
        end = PythonOperator(
            task_id='end_main',
            python_callable=log_function,
            op_kwargs={'message': 'End-task'}
        )
        with TaskGroup(group_id=f"main_file_task_group", ui_color='#666666') as main_file_task_group:
            end_main_file_task_stage_1 = PythonOperator(
                task_id='end_main_file_task_stage_1',
                python_callable=log_function,
                op_kwargs={'message': 'end_main_file_task_stage_1'}
            )
            first_stage = get_stage_1_taskgroups(['9001', '9002'])
            first_stage >> get_stage_2_taskgroup("stage_2_1_taskgroup")
            first_stage >> get_stage_2_taskgroup("stage_2_2_taskgroup")
            first_stage >> end_main_file_task_stage_1
        start >> main_file_task_group >> end
        return dag


dag = create_dag()

  1. Go to de graph view of the dag.
  2. Run the dag.
  3. After the dag run has finished. Clear the "sub_group_submit" task within the "stage_2_1_taskgroup" with downstream tasks.
  4. Refresh the page multiple times and notice how from time to time the "stage_2_1_taskgroup" becomes disconnected from the dag.
  5. Clear the "sub_group_submit" task within the "stage_2_2_taskgroup" with downstream tasks.
  6. Refresh the page multiple times and notice how from time to time the "stage_2_2_taskgroup" becomes disconnected from the dag.

Operating System

Mac OS, Linux

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

Custom docker image based on apache/airflow:2.3.4-python3.10

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions