Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add index to task_instance table

Revision ID: 937cbd173ca1
Revises: 98ae134e6fff
Create Date: 2023-05-03 11:31:32.527362

"""
from __future__ import annotations

from alembic import op

# revision identifiers, used by Alembic.
revision = "937cbd173ca1"
down_revision = "98ae134e6fff"
branch_labels = None
depends_on = None
airflow_version = "2.7.0"


def upgrade():
"""Apply Add index to task_instance table"""
op.create_index(
"ti_state_incl_start_date",
"task_instance",
["dag_id", "task_id", "state"],
postgresql_include=["start_date"],
)


def downgrade():
"""Unapply Add index to task_instance table"""
op.drop_index("ti_state_incl_start_date", table_name="task_instance")
9 changes: 9 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,15 @@ class TaskInstance(Base, LoggingMixin):
Index("ti_dag_run", dag_id, run_id),
Index("ti_state", state),
Index("ti_state_lkp", dag_id, task_id, run_id, state),
# The below index has been added to improve performance on postgres setups with tens of millions of
# taskinstance rows. Aim is to improve the below query (it can be used to find the last successful
# execution date of a task instance):
# SELECT start_date FROM task_instance WHERE dag_id = 'xx' AND task_id = 'yy' AND state = 'success'
# ORDER BY start_date DESC NULLS LAST LIMIT 1;
# Existing "ti_state_lkp" is not enough for such query when this table has millions of rows, since
# rows have to be fetched in order to retrieve the start_date column. With this index, INDEX ONLY SCAN
# is performed and that query runs within milliseconds.
Index("ti_state_incl_start_date", dag_id, task_id, state, postgresql_include=["start_date"]),
Index("ti_pool", pool, state, priority_weight),
Index("ti_job_id", job_id),
Index("ti_trigger_id", trigger_id),
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
b409a0826f10dc5ef934616fe2d5e72fff322bb1aebe321aad475e4d72c25a4f
87f70d3f35c883e544ce7fe6337227c000a9cf995d148648ab19a931b490ecb8
Loading