Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude missing tasks from the gantt view #23627

Merged
merged 2 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3323,6 +3323,8 @@ def gantt(self, dag_id, session=None):

tasks = []
for ti in tis:
if not dag.has_task(ti.task_id):
continue
# prev_attempted_tries will reflect the currently running try_number
# or the try_number of the last complete run
# https://issues.apache.org/jira/browse/AIRFLOW-2143
Expand All @@ -3339,6 +3341,8 @@ def gantt(self, dag_id, session=None):
try_count = 1
prev_task_id = ""
for failed_task_instance in ti_fails:
if not dag.has_task(failed_task_instance.task_id):
continue
if tf_count != 0 and failed_task_instance.task_id == prev_task_id:
try_count += 1
else:
Expand Down
54 changes: 51 additions & 3 deletions tests/www/views/test_views_graph_gantt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import pytest

from airflow.configuration import conf
from airflow.models import DAG
from airflow.models import DAG, DagRun
from airflow.models.baseoperator import BaseOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State, TaskInstanceState

DAG_ID = "dag_for_testing_dt_nr_dr_form"
DEFAULT_DATE = timezone.datetime(2017, 9, 1)
Expand Down Expand Up @@ -297,3 +298,50 @@ def test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_c
_assert_run_is_not_in_dropdown(very_close_dagruns[1], data)
_assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data)
_assert_run_is_selected(very_close_dagruns[3], data)


@pytest.mark.parametrize("endpoint", ENDPOINTS)
def test_view_works_with_deleted_tasks(request, admin_client, app, endpoint):
task_to_state = {
"existing-task": TaskInstanceState.SUCCESS,
"deleted-task-success": TaskInstanceState.SUCCESS,
"deleted-task-failed": TaskInstanceState.FAILED,
}
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
for task_id in task_to_state.keys():
BaseOperator(task_id=task_id, dag=dag)

execution_date = timezone.datetime(2022, 3, 14)
dag_run_id = "test-deleted-tasks-dag-run"
with create_session() as session:
dag_run = dag.create_dagrun(
run_id=dag_run_id,
execution_date=execution_date,
data_interval=(execution_date, execution_date + timedelta(minutes=5)),
state=State.SUCCESS,
external_trigger=True,
session=session,
)
for ti in dag_run.task_instances:
ti.refresh_from_task(dag.get_task(ti.task_id))
ti.state = task_to_state[ti.task_id]
ti.start_date = execution_date
ti.end_date = execution_date + timedelta(minutes=5)
session.merge(ti)

def cleanup_database():
with create_session() as session:
session.query(DagRun).filter_by(run_id=dag_run_id).delete()

request.addfinalizer(cleanup_database)

dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
BaseOperator(task_id="existing-task", dag=dag)
app.dag_bag.bag_dag(dag=dag, root_dag=dag)

response = admin_client.get(
f'{endpoint}&execution_date={execution_date.isoformat()}',
data={"username": "test", "password": "test"},
follow_redirects=True,
)
assert response.status_code == 200