Skip to content

Commit

Permalink
Rename last_scheduler_run into last_parsed_time, and ensure it's upda…
Browse files Browse the repository at this point in the history
…ted in DB (#14581)

- Fix functionality
  last_scheduler_run was missed in the process of
  migrating from sync_to_db/bulk_sync_to_db to bulk_write_to_db.

  This issue will fail DAG.deactivate_stale_dags() method,
  and blocks users from checking the last schedule time of each DAG in DB

- Change name last_scheduler_run to last_parsed_time,
  to better reflect what it does now.
  Migration script is added, and codebase is updated

- To ensure the migration scripts can work,
  we have to limit the columns needed in create_dag_specific_permissions(),
  so migration 2c6edca13270 can work with the renamed column.

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
  • Loading branch information
XD-DENG and kaxil authored Mar 5, 2021
1 parent d8bd107 commit c2a0cb9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""rename last_scheduler_run column
Revision ID: 2e42bb497a22
Revises: 8646922c8a04
Create Date: 2021-03-04 19:50:38.880942
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import mssql

# revision identifiers, used by Alembic.
revision = '2e42bb497a22'
down_revision = '8646922c8a04'
branch_labels = None
depends_on = None


def upgrade():
"""Apply rename last_scheduler_run column"""
conn = op.get_bind()
if conn.dialect.name == "mssql":
with op.batch_alter_table('dag') as batch_op:
batch_op.alter_column(
'last_scheduler_run', new_column_name='last_parsed_time', type_=mssql.DATETIME2(precision=6)
)
else:
with op.batch_alter_table('dag') as batch_op:
batch_op.alter_column(
'last_scheduler_run', new_column_name='last_parsed_time', type_=sa.TIMESTAMP(timezone=True)
)


def downgrade():
"""Unapply rename last_scheduler_run column"""
conn = op.get_bind()
if conn.dialect.name == "mssql":
with op.batch_alter_table('dag') as batch_op:
batch_op.alter_column(
'last_parsed_time', new_column_name='last_scheduler_run', type_=mssql.DATETIME2(precision=6)
)
else:
with op.batch_alter_table('dag') as batch_op:
batch_op.alter_column(
'last_parsed_time', new_column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True)
)
7 changes: 4 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,7 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None):
orm_dag.fileloc = dag.fileloc
orm_dag.owners = dag.owner
orm_dag.is_active = True
orm_dag.last_parsed_time = timezone.utcnow()
orm_dag.default_view = dag.default_view
orm_dag.description = dag.description
orm_dag.schedule_interval = dag.schedule_interval
Expand Down Expand Up @@ -1960,13 +1961,13 @@ def deactivate_stale_dags(expiration_date, session=None):
"""
for dag in (
session.query(DagModel)
.filter(DagModel.last_scheduler_run < expiration_date, DagModel.is_active)
.filter(DagModel.last_parsed_time < expiration_date, DagModel.is_active)
.all()
):
log.info(
"Deactivating DAG ID %s since it was last touched by the scheduler at %s",
dag.dag_id,
dag.last_scheduler_run.isoformat(),
dag.last_parsed_time.isoformat(),
)
dag.is_active = False
session.merge(dag)
Expand Down Expand Up @@ -2069,7 +2070,7 @@ class DagModel(Base):
# Whether that DAG was seen on the last DagBag load
is_active = Column(Boolean, default=False)
# Last time the scheduler started
last_scheduler_run = Column(UtcDateTime)
last_parsed_time = Column(UtcDateTime)
# Last time this DAG was pickled
last_pickled = Column(UtcDateTime)
# Time when the DAG last received a refresh signal
Expand Down
9 changes: 5 additions & 4 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,16 @@ def create_dag_specific_permissions(self, session=None):
:return: None.
"""
perms = self.get_all_permissions()
dag_models = (
session.query(models.DagModel)
rows = (
session.query(models.DagModel.dag_id)
.filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
.all()
)

for dag in dag_models:
for row in rows:
dag_id = row[0]
for perm_name in self.DAG_PERMS:
dag_resource_name = self.prefixed_dag_id(dag.dag_id)
dag_resource_name = self.prefixed_dag_id(dag_id)
if dag_resource_name and perm_name and (dag_resource_name, perm_name) not in perms:
self._merge_perm(perm_name, dag_resource_name)

Expand Down
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3740,7 +3740,7 @@ class DagModelView(AirflowModelView):
list_columns = [
'dag_id',
'is_paused',
'last_scheduler_run',
'last_parsed_time',
'last_expired',
'scheduler_lock',
'fileloc',
Expand Down
15 changes: 11 additions & 4 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,15 +646,19 @@ def test_bulk_write_to_db(self):
('dag-bulk-sync-2', 'test-dag'),
('dag-bulk-sync-3', 'test-dag'),
} == set(session.query(DagTag.dag_id, DagTag.name).all())

for row in session.query(DagModel.last_parsed_time).all():
assert row[0] is not None

# Re-sync should do fewer queries
with assert_queries_count(3):
with assert_queries_count(4):
DAG.bulk_write_to_db(dags)
with assert_queries_count(3):
with assert_queries_count(4):
DAG.bulk_write_to_db(dags)
# Adding tags
for dag in dags:
dag.tags.append("test-dag2")
with assert_queries_count(4):
with assert_queries_count(5):
DAG.bulk_write_to_db(dags)
with create_session() as session:
assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
Expand All @@ -673,7 +677,7 @@ def test_bulk_write_to_db(self):
# Removing tags
for dag in dags:
dag.tags.remove("test-dag")
with assert_queries_count(4):
with assert_queries_count(5):
DAG.bulk_write_to_db(dags)
with create_session() as session:
assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
Expand All @@ -686,6 +690,9 @@ def test_bulk_write_to_db(self):
('dag-bulk-sync-3', 'test-dag2'),
} == set(session.query(DagTag.dag_id, DagTag.name).all())

for row in session.query(DagModel.last_parsed_time).all():
assert row[0] is not None

def test_bulk_write_to_db_max_active_runs(self):
"""
Test that DagModel.next_dagrun_create_after is set to NULL when the dag cannot be created due to max
Expand Down

0 comments on commit c2a0cb9

Please sign in to comment.