Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,27 @@ <h2>{{ page_title }}</h2>
<div class="dags-table-header">
<div class="form-group btn-group">
<a
href="{{ url_for('Airflow.index', status='all', search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
href="{{ url_for('Airflow.index', status='all', lastrun=request.args.get('lastrun', None), search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if status_filter == 'all' else 'btn-default' }}"
title="Show active and paused DAGs">All <span class="badge">{{ "{:,}".format(status_count_all) }}</span></a>
<a
href="{{ url_for('Airflow.index', status='active', search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
href="{{ url_for('Airflow.index', status='active', lastrun=request.args.get('lastrun', None), search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if status_filter == 'active' else 'btn-default' }}"
title="Show only active DAGs">Active <span class="badge">{{ "{:,}".format(status_count_active) }}</span></a>
<a
href="{{ url_for('Airflow.index', status='paused', search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
href="{{ url_for('Airflow.index', status='paused', lastrun=request.args.get('lastrun', None), search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if status_filter == 'paused' else 'btn-default' }}"
title="Show only paused DAGs">Paused <span class="badge">{{ "{:,}".format(status_count_paused) }}</span></a>
</div>
<div class="form-group btn-group p-2">
<a
href="{{ url_for('Airflow.index', status='running', search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if status_filter == 'running' else 'btn-default' }}"
title="Show currently running DAG runs">Running <span class="badge">{{ "{:,}".format(status_count_running) }}</span></a>
href="{{ url_for('Airflow.index', lastrun='reset_filter' if lastrun_filter == 'running' else 'running', status=request.args.get('status', None), search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if lastrun_filter == 'running' else 'btn-default' }}"
title="Show currently running DAG runs">Running <span class="badge">{{ "{:,}".format(lastrun_count_running) }}</span></a>
<a
href="{{ url_for('Airflow.index', status='failed', search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if status_filter == 'failed' else 'btn-default' }}"
title="Show DAGs with failed latest DAG run">Failed <span class="badge">{{ "{:,}".format(status_count_failed) }}</span></a>
href="{{ url_for('Airflow.index', lastrun='reset_filter' if lastrun_filter == 'failed' else 'failed', status=request.args.get('status', None), search=request.args.get('search', None), tags=request.args.getlist('tags', None)) }}"
class="btn {{ 'btn-primary' if lastrun_filter == 'failed' else 'btn-default' }}"
title="Show DAGs with failed latest DAG run">Failed <span class="badge">{{ "{:,}".format(lastrun_count_failed) }}</span></a>
</div>
<div style="min-width: 200px; padding: 0 10px;">
<form id="tags_form">
Expand Down
85 changes: 60 additions & 25 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
PAGE_SIZE = conf.getint("webserver", "page_size")
FILTER_TAGS_COOKIE = "tags_filter"
FILTER_STATUS_COOKIE = "dag_status_filter"
FILTER_LASTRUN_COOKIE = "last_run_filter"
LINECHART_X_AXIS_TICKFORMAT = (
"function (d, i) { let xLabel;"
"if (i === undefined) {xLabel = d3.time.format('%H:%M, %d %b %Y')(new Date(parseInt(d)));"
Expand Down Expand Up @@ -780,6 +781,7 @@ def index(self):
arg_search_query = request.args.get("search")
arg_tags_filter = request.args.getlist("tags")
arg_status_filter = request.args.get("status")
arg_lastrun_filter = request.args.get("lastrun")
arg_sorting_key = request.args.get("sorting_key", "dag_id")
arg_sorting_direction = request.args.get("sorting_direction", default="asc")

Expand All @@ -788,17 +790,29 @@ def index(self):
# Remove the reset_tags=reset from the URL
return redirect(url_for("Airflow.index"))

cookie_val = flask_session.get(FILTER_TAGS_COOKIE)
if arg_lastrun_filter == "reset_filter":
flask_session[FILTER_LASTRUN_COOKIE] = None
return redirect(url_for("Airflow.index"))

filter_tags_cookie_val = flask_session.get(FILTER_TAGS_COOKIE)
if arg_tags_filter:
flask_session[FILTER_TAGS_COOKIE] = ",".join(arg_tags_filter)
elif cookie_val:
elif filter_tags_cookie_val:
# If tags exist in cookie, but not URL, add them to the URL
return redirect(url_for("Airflow.index", tags=cookie_val.split(",")))
return redirect(url_for("Airflow.index", tags=filter_tags_cookie_val.split(",")))

filter_lastrun_cookie_val = flask_session.get(FILTER_LASTRUN_COOKIE)
if arg_lastrun_filter:
arg_lastrun_filter = arg_lastrun_filter.strip().lower()
flask_session[FILTER_LASTRUN_COOKIE] = arg_lastrun_filter
elif filter_lastrun_cookie_val:
# If tags exist in cookie, but not URL, add them to the URL
return redirect(url_for("Airflow.index", lastrun=filter_lastrun_cookie_val))

if arg_status_filter is None:
cookie_val = flask_session.get(FILTER_STATUS_COOKIE)
if cookie_val:
arg_status_filter = cookie_val
filter_status_cookie_val = flask_session.get(FILTER_STATUS_COOKIE)
if filter_status_cookie_val:
arg_status_filter = filter_status_cookie_val
else:
arg_status_filter = "active" if hide_paused_dags_by_default else "all"
flask_session[FILTER_STATUS_COOKIE] = arg_status_filter
Expand Down Expand Up @@ -842,14 +856,19 @@ def index(self):
flask_session[FILTER_TAGS_COOKIE] = None
return redirect(url_for("Airflow.index"))

all_dags = dags_query
active_dags = dags_query.where(~DagModel.is_paused)
paused_dags = dags_query.where(DagModel.is_paused)

# find DAGs which have a RUNNING DagRun
running_dags = dags_query.join(DagRun, DagModel.dag_id == DagRun.dag_id).where(
DagRun.state == DagRunState.RUNNING
(DagRun.state == DagRunState.RUNNING) | (DagRun.state == DagRunState.QUEUED)
)

lastrun_running_is_paused = session.execute(
running_dags.with_only_columns(DagModel.dag_id, DagModel.is_paused).distinct(DagModel.dag_id)
).all()

lastrun_running_count_active = len(
list(filter(lambda x: not x.is_paused, lastrun_running_is_paused))
)
lastrun_running_count_paused = len(list(filter(lambda x: x.is_paused, lastrun_running_is_paused)))

# find DAGs for which the latest DagRun is FAILED
subq_all = (
Expand All @@ -876,34 +895,49 @@ def index(self):
)
failed_dags = dags_query.join(subq_join, DagModel.dag_id == subq_join.c.dag_id)

is_paused_count = dict(
lastrun_failed_is_paused_count = dict(
session.execute(
all_dags.with_only_columns(DagModel.is_paused, func.count()).group_by(DagModel.is_paused)
failed_dags.with_only_columns(DagModel.is_paused, func.count()).group_by(
DagModel.is_paused
)
).all()
)

status_count_active = is_paused_count.get(False, 0)
status_count_paused = is_paused_count.get(True, 0)
lastrun_failed_count_active = lastrun_failed_is_paused_count.get(False, 0)
lastrun_failed_count_paused = lastrun_failed_is_paused_count.get(True, 0)

if arg_lastrun_filter == "running":
dags_query = running_dags
elif arg_lastrun_filter == "failed":
dags_query = failed_dags

all_dags = dags_query
active_dags = dags_query.where(~DagModel.is_paused)
paused_dags = dags_query.where(DagModel.is_paused)

status_count_running = get_query_count(running_dags, session=session)
status_count_failed = get_query_count(failed_dags, session=session)
status_is_paused = session.execute(
all_dags.with_only_columns(DagModel.dag_id, DagModel.is_paused).distinct(DagModel.dag_id)
).all()

status_count_active = len(list(filter(lambda x: not x.is_paused, status_is_paused)))
status_count_paused = len(list(filter(lambda x: x.is_paused, status_is_paused)))
all_dags_count = status_count_active + status_count_paused

if arg_status_filter == "active":
current_dags = active_dags
num_of_all_dags = status_count_active
lastrun_count_running = lastrun_running_count_active
lastrun_count_failed = lastrun_failed_count_active
elif arg_status_filter == "paused":
current_dags = paused_dags
num_of_all_dags = status_count_paused
elif arg_status_filter == "running":
current_dags = running_dags
num_of_all_dags = status_count_running
elif arg_status_filter == "failed":
current_dags = failed_dags
num_of_all_dags = status_count_failed
lastrun_count_running = lastrun_running_count_paused
lastrun_count_failed = lastrun_failed_count_paused
else:
current_dags = all_dags
num_of_all_dags = all_dags_count
lastrun_count_running = lastrun_running_count_active + lastrun_running_count_paused
lastrun_count_failed = lastrun_failed_count_active + lastrun_failed_count_paused

if arg_sorting_key == "dag_id":
if arg_sorting_direction == "desc":
Expand Down Expand Up @@ -1105,8 +1139,9 @@ def _iter_parsed_moved_data_table_names():
status_count_all=all_dags_count,
status_count_active=status_count_active,
status_count_paused=status_count_paused,
status_count_running=status_count_running,
status_count_failed=status_count_failed,
lastrun_filter=arg_lastrun_filter,
lastrun_count_running=lastrun_count_running,
lastrun_count_failed=lastrun_count_failed,
tags_filter=arg_tags_filter,
sorting_key=arg_sorting_key,
sorting_direction=arg_sorting_direction,
Expand Down
2 changes: 1 addition & 1 deletion tests/www/views/test_views_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_index_redirect(admin_client):


def test_homepage_query_count(admin_client):
with assert_queries_count(17):
with assert_queries_count(20):
resp = admin_client.get("/home")
check_content_in_response("DAGs", resp)

Expand Down
22 changes: 13 additions & 9 deletions tests/www/views/test_views_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.www.utils import UIAlert
from airflow.www.views import FILTER_STATUS_COOKIE, FILTER_TAGS_COOKIE
from airflow.www.views import FILTER_LASTRUN_COOKIE, FILTER_STATUS_COOKIE, FILTER_TAGS_COOKIE
from tests.test_utils.api_connexion_utils import create_user
from tests.test_utils.db import clear_db_dags, clear_db_import_errors, clear_db_serialized_dags
from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login
Expand Down Expand Up @@ -101,15 +101,18 @@ def test_home_status_filter_cookie(admin_client):
admin_client.get("home?status=paused", follow_redirects=True)
assert "paused" == flask.session[FILTER_STATUS_COOKIE]

admin_client.get("home?status=running", follow_redirects=True)
assert "running" == flask.session[FILTER_STATUS_COOKIE]

admin_client.get("home?status=failed", follow_redirects=True)
assert "failed" == flask.session[FILTER_STATUS_COOKIE]

admin_client.get("home?status=all", follow_redirects=True)
assert "all" == flask.session[FILTER_STATUS_COOKIE]

admin_client.get("home?lastrun=running", follow_redirects=True)
assert "running" == flask.session[FILTER_LASTRUN_COOKIE]

admin_client.get("home?lastrun=failed", follow_redirects=True)
assert "failed" == flask.session[FILTER_LASTRUN_COOKIE]

admin_client.get("home?lastrun=all_states", follow_redirects=True)
assert "all_states" == flask.session[FILTER_LASTRUN_COOKIE]


@pytest.fixture(scope="module")
def user_no_importerror(app):
Expand Down Expand Up @@ -305,8 +308,9 @@ def test_home_no_importerrors_perm(broken_dags, client_no_importerror):
"home?status=all",
"home?status=active",
"home?status=paused",
"home?status=running",
"home?status=failed",
"home?lastrun=running",
"home?lastrun=failed",
"home?lastrun=all_states",
],
)
def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, client_single_dag, page):
Expand Down