Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,13 @@
type: string
example: ~
default: "0"
- name: session_lifetime_days
description: |
The UI cookie lifetime in days
version_added: ~
type: string
example: ~
default: "30"

- name: email
description: ~
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ update_fab_perms = True
# 0 means never get forcibly logged out
force_log_out_after = 0

# The UI cookie lifetime in days
session_lifetime_days = 30

[email]
email_backend = airflow.utils.email.send_email_smtp

Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
tags=['example']
)

run_this_last = DummyOperator(
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily",
tags=['example']
)

run_this_first = DummyOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
tags=['example']
)


Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

dag = models.DAG(
dag_id='example_gcs_to_bq_operator', default_args=args,
schedule_interval=None)
schedule_interval=None, tags=['example'])

create_test_dataset = bash_operator.BashOperator(
task_id='create_airflow_test_dataset',
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


with models.DAG(
"example_gcs_to_gcs", default_args=default_args, schedule_interval=None
"example_gcs_to_gcs", default_args=default_args, schedule_interval=None, tags=['example']
) as dag:
sync_full_bucket = GoogleCloudStorageSynchronizeBuckets(
task_id="sync-full-bucket",
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_gcs_to_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None, tags=['example']
) as dag:
# [START howto_operator_gcs_to_sftp_copy_single_file]
copy_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_http_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
'retry_delay': timedelta(minutes=5),
}

dag = DAG('example_http_operator', default_args=default_args)
dag = DAG('example_http_operator', default_args=default_args, tags=['example'])

dag.doc_md = __doc__

Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_latest_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example']
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_latest_only_with_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example']
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
},
schedule_interval='*/1 * * * *',
dagrun_timeout=timedelta(minutes=4),
tags=['example']
)


Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_pig_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
dag_id='example_pig_operator',
default_args=args,
schedule_interval=None,
tags=['example']
)

run_this = PigOperator(
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
dag_id='example_python_operator',
default_args=args,
schedule_interval=None,
tags=['example']
)


Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
'start_date': dates.days_ago(2),
}

dag = DAG(dag_id='example_short_circuit_operator', default_args=args)
dag = DAG(dag_id='example_short_circuit_operator', default_args=args, tags=['example'])

cond_true = ShortCircuitOperator(
task_id='condition_is_True',
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_skip_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
join >> final


dag = DAG(dag_id='example_skip_dag', default_args=args)
dag = DAG(dag_id='example_skip_dag', default_args=args, tags=['example'])
create_test_pipeline('1', 'all_success', dag)
create_test_pipeline('2', 'one_success', dag)
1 change: 1 addition & 0 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
tags=['example']
)

start = DummyOperator(
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow", "start_date": days_ago(2)},
schedule_interval="@once",
tags=['example']
)

trigger = TriggerDagRunOperator(
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
dag_id="example_trigger_target_dag",
default_args={"start_date": days_ago(2), "owner": "airflow"},
schedule_interval=None,
tags=['example']
)


Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args)
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

dag = DAG(dag_id='test_utils', schedule_interval=None)
dag = DAG(dag_id='test_utils', schedule_interval=None, tags=['example'])

task = BashOperator(
task_id='sleeps_forever',
Expand Down
50 changes: 50 additions & 0 deletions airflow/migrations/versions/7939bcff74ba_add_dagtags_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add DagTags table

Revision ID: 7939bcff74ba
Revises: fe461863935f
Create Date: 2020-01-07 19:39:01.247442

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = '7939bcff74ba'
down_revision = 'fe461863935f'
branch_labels = None
depends_on = None


def upgrade():
"""Apply Add DagTags table"""
op.create_table(
'dag_tag',
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.ForeignKeyConstraint(['dag_id'], ['dag.dag_id'], ),
sa.PrimaryKeyConstraint('name', 'dag_id')
)


def downgrade():
"""Unapply Add DagTags table"""
op.drop_table('dag_tag')
2 changes: 1 addition & 1 deletion airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from airflow.models.base import ID_LEN, Base # noqa: F401
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink # noqa: F401
from airflow.models.connection import Connection # noqa: F401
from airflow.models.dag import DAG, DagModel # noqa: F401
from airflow.models.dag import DAG, DagModel, DagTag # noqa: F401
from airflow.models.dagbag import DagBag # noqa: F401
from airflow.models.dagpickle import DagPickle # noqa: F401
from airflow.models.dagrun import DagRun # noqa: F401
Expand Down
46 changes: 43 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import pendulum
from croniter import croniter
from dateutil.relativedelta import relativedelta
from sqlalchemy import Boolean, Column, Index, Integer, String, Text, func, or_
from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_
from sqlalchemy.orm import backref, relationship

from airflow import settings, utils
from airflow.configuration import conf
Expand Down Expand Up @@ -181,6 +182,8 @@ class DAG(BaseDag, LoggingMixin):
<https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_

:type jinja_environment_kwargs: dict
:param tags: List of tags to help filtering DAGS in the UI.
:type tags: List[str]
"""
_comps = {
'dag_id',
Expand Down Expand Up @@ -221,7 +224,8 @@ def __init__(
params: Optional[Dict] = None,
access_control: Optional[Dict] = None,
is_paused_upon_creation: Optional[bool] = None,
jinja_environment_kwargs: Optional[Dict] = None
jinja_environment_kwargs: Optional[Dict] = None,
tags: Optional[List[str]] = None
):
self.user_defined_macros = user_defined_macros
self.user_defined_filters = user_defined_filters
Expand Down Expand Up @@ -310,6 +314,7 @@ def __init__(
self.is_paused_upon_creation = is_paused_upon_creation

self.jinja_environment_kwargs = jinja_environment_kwargs
self.tags = tags

def __repr__(self):
return "<DAG: {self.dag_id}>".format(self=self)
Expand Down Expand Up @@ -1366,6 +1371,7 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
if self.is_paused_upon_creation is not None:
orm_dag.is_paused = self.is_paused_upon_creation
self.log.info("Creating ORM DAG for %s", self.dag_id)
session.add(orm_dag)
if self.is_subdag:
orm_dag.is_subdag = True
orm_dag.fileloc = self.parent_dag.fileloc
Expand All @@ -1379,7 +1385,8 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
orm_dag.default_view = self._default_view
orm_dag.description = self.description
orm_dag.schedule_interval = self.schedule_interval
session.merge(orm_dag)
orm_dag.tags = self.get_dagtags(session=session)

session.commit()

for subdag in self.subdags:
Expand All @@ -1395,6 +1402,28 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
session=session
)

@provide_session
def get_dagtags(self, session=None):
"""
Creating a list of DagTags, if one is missing from the DB, will insert.

:return: The DagTag list.
:rtype: list
"""
tags = []
if not self.tags:
return tags

for name in set(self.tags):
tag = session.query(
DagTag).filter(DagTag.name == name).filter(DagTag.dag_id == self.dag_id).first()
if not tag:
tag = DagTag(name=name, dag_id=self.dag_id)
session.add(tag)
tags.append(tag)
session.commit()
return tags

@staticmethod
@provide_session
def deactivate_unknown_dags(active_dag_ids, session=None):
Expand Down Expand Up @@ -1529,6 +1558,15 @@ def get_serialized_fields(cls):
return cls.__serialized_fields


class DagTag(Base):
"""
A tag name per dag, to allow quick filtering in the DAG view.
"""
__tablename__ = "dag_tag"
name = Column(String(100), primary_key=True)
dag_id = Column(String(ID_LEN), ForeignKey('dag.dag_id'), primary_key=True)


class DagModel(Base):

__tablename__ = "dag"
Expand Down Expand Up @@ -1571,6 +1609,8 @@ class DagModel(Base):
default_view = Column(String(25))
# Schedule interval
schedule_interval = Column(Interval)
# Tags for view filter
tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag'))

__table_args__ = (
Index('idx_root_dag_id', root_dag_id, unique=False),
Expand Down
3 changes: 2 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@
"doc_md": { "type" : "string"},
"_default_view": { "type" : "string"},
"_access_control": {"$ref": "#/definitions/dict" },
"is_paused_upon_creation": { "type": "boolean" }
"is_paused_upon_creation": { "type": "boolean" },
"tags": { "type": "array" }
},
"required": [
"params",
Expand Down
10 changes: 9 additions & 1 deletion airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import datetime
import logging
import socket
from datetime import timedelta
from typing import Any, Optional
from urllib.parse import urlparse

import flask
import flask_login
from flask import Flask
from flask import Flask, session as flask_session
from flask_appbuilder import SQLA, AppBuilder
from flask_caching import Cache
from flask_wtf.csrf import CSRFProtect
Expand Down Expand Up @@ -60,6 +61,9 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
)
app.secret_key = conf.get('webserver', 'SECRET_KEY')

session_lifetime_days = conf.getint('webserver', 'SESSION_LIFETIME_DAYS', fallback=30)
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=session_lifetime_days)

app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True)
app.config['APP_NAME'] = app_name
app.config['TESTING'] = testing
Expand Down Expand Up @@ -257,6 +261,10 @@ def apply_caching(response):
def shutdown_session(exception=None): # pylint: disable=unused-variable
settings.Session.remove()

@app.before_request
def make_session_permanent():
flask_session.permanent = True

return app, appbuilder


Expand Down
Loading