diff --git a/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py b/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py new file mode 100644 index 0000000000000..97d8ff6211afc --- /dev/null +++ b/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""rename last_scheduler_run column + +Revision ID: 2e42bb497a22 +Revises: 8646922c8a04 +Create Date: 2021-03-04 19:50:38.880942 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mssql + +# revision identifiers, used by Alembic. +revision = '2e42bb497a22' +down_revision = '8646922c8a04' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply rename last_scheduler_run column""" + conn = op.get_bind() + if conn.dialect.name == "mssql": + with op.batch_alter_table('dag') as batch_op: + batch_op.alter_column( + 'last_scheduler_run', new_column_name='last_parsed_time', type_=mssql.DATETIME2(precision=6) + ) + else: + with op.batch_alter_table('dag') as batch_op: + batch_op.alter_column( + 'last_scheduler_run', new_column_name='last_parsed_time', type_=sa.TIMESTAMP(timezone=True) + ) + + +def downgrade(): + """Unapply rename last_scheduler_run column""" + conn = op.get_bind() + if conn.dialect.name == "mssql": + with op.batch_alter_table('dag') as batch_op: + batch_op.alter_column( + 'last_parsed_time', new_column_name='last_scheduler_run', type_=mssql.DATETIME2(precision=6) + ) + else: + with op.batch_alter_table('dag') as batch_op: + batch_op.alter_column( + 'last_parsed_time', new_column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True) + ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 4af6ec791a11f..fa33ee5fa73ee 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1876,6 +1876,7 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None): orm_dag.fileloc = dag.fileloc orm_dag.owners = dag.owner orm_dag.is_active = True + orm_dag.last_parsed_time = timezone.utcnow() orm_dag.default_view = dag.default_view orm_dag.description = dag.description orm_dag.schedule_interval = dag.schedule_interval @@ -1960,13 +1961,13 @@ def deactivate_stale_dags(expiration_date, session=None): """ for dag in ( session.query(DagModel) - .filter(DagModel.last_scheduler_run < expiration_date, DagModel.is_active) + .filter(DagModel.last_parsed_time < expiration_date, DagModel.is_active) .all() ): log.info( "Deactivating DAG ID %s since it was last touched by the scheduler at %s", dag.dag_id, - dag.last_scheduler_run.isoformat(), + dag.last_parsed_time.isoformat(), ) dag.is_active = False session.merge(dag) @@ -2069,7 +2070,7 @@ class DagModel(Base): # Whether that DAG was seen on the last DagBag load is_active = Column(Boolean, default=False) # Last time the scheduler started - last_scheduler_run = Column(UtcDateTime) + last_parsed_time = Column(UtcDateTime) # Last time this DAG was pickled last_pickled = Column(UtcDateTime) # Time when the DAG last received a refresh signal diff --git a/airflow/www/security.py b/airflow/www/security.py index 443afc69921fe..499465700602e 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -474,15 +474,16 @@ def create_dag_specific_permissions(self, session=None): :return: None. """ perms = self.get_all_permissions() - dag_models = ( - session.query(models.DagModel) + rows = ( + session.query(models.DagModel.dag_id) .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) .all() ) - for dag in dag_models: + for row in rows: + dag_id = row[0] for perm_name in self.DAG_PERMS: - dag_resource_name = self.prefixed_dag_id(dag.dag_id) + dag_resource_name = self.prefixed_dag_id(dag_id) if dag_resource_name and perm_name and (dag_resource_name, perm_name) not in perms: self._merge_perm(perm_name, dag_resource_name) diff --git a/airflow/www/views.py b/airflow/www/views.py index 521aea2bd8cf0..f75960490c793 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3740,7 +3740,7 @@ class DagModelView(AirflowModelView): list_columns = [ 'dag_id', 'is_paused', - 'last_scheduler_run', + 'last_parsed_time', 'last_expired', 'scheduler_lock', 'fileloc', diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 2e3b4d15799c5..3345da7ab6875 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -646,15 +646,19 @@ def test_bulk_write_to_db(self): ('dag-bulk-sync-2', 'test-dag'), ('dag-bulk-sync-3', 'test-dag'), } == set(session.query(DagTag.dag_id, DagTag.name).all()) + + for row in session.query(DagModel.last_parsed_time).all(): + assert row[0] is not None + # Re-sync should do fewer queries - with assert_queries_count(3): + with assert_queries_count(4): DAG.bulk_write_to_db(dags) - with assert_queries_count(3): + with assert_queries_count(4): DAG.bulk_write_to_db(dags) # Adding tags for dag in dags: dag.tags.append("test-dag2") - with assert_queries_count(4): + with assert_queries_count(5): DAG.bulk_write_to_db(dags) with create_session() as session: assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == { @@ -673,7 +677,7 @@ def test_bulk_write_to_db(self): # Removing tags for dag in dags: dag.tags.remove("test-dag") - with assert_queries_count(4): + with assert_queries_count(5): DAG.bulk_write_to_db(dags) with create_session() as session: assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == { @@ -686,6 +690,9 @@ def test_bulk_write_to_db(self): ('dag-bulk-sync-3', 'test-dag2'), } == set(session.query(DagTag.dag_id, DagTag.name).all()) + for row in session.query(DagModel.last_parsed_time).all(): + assert row[0] is not None + def test_bulk_write_to_db_max_active_runs(self): """ Test that DagModel.next_dagrun_create_after is set to NULL when the dag cannot be created due to max