Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions airflow-core/src/airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import DagBag, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -104,7 +103,6 @@ def _trigger_dag(
run_conf = None
if conf:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=coerced_logical_date,
Expand All @@ -113,7 +111,6 @@ def _trigger_dag(
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
dag_version=dag_version,
state=DagRunState.QUEUED,
session=session,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
TaskOutletAssetReference,
)
from airflow.models.dag import DAG
from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down Expand Up @@ -331,7 +330,6 @@ def materialize_asset(
run_after=run_after,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
dag_version=DagVersion.get_latest_version(dag_id, session=session),
state=DagRunState.QUEUED,
session=session,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
from airflow.models import DAG, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -411,7 +410,6 @@ def trigger_dag_run(
conf=params["conf"],
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
)
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ def _get_dag_run(
run_after=run_after,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
state=DagRunState.RUNNING,
session=session,
)
Expand Down
6 changes: 0 additions & 6 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,8 +1462,6 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
self.log.error("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue

latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

data_interval = dag.get_next_data_interval(dag_model)
# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
Expand All @@ -1486,7 +1484,6 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
run_after=dag_model.next_dagrun_create_after,
run_type=DagRunType.SCHEDULED,
triggered_by=DagRunTriggeredByType.TIMETABLE,
dag_version=latest_dag_version,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
session=session,
Expand Down Expand Up @@ -1535,8 +1532,6 @@ def _create_dag_runs_asset_triggered(
)
continue

latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

triggered_date = triggered_dates[dag.dag_id]
cte = (
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
Expand Down Expand Up @@ -1569,7 +1564,6 @@ def _create_dag_runs_asset_triggered(
run_after=triggered_date,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
dag_version=latest_dag_version,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
session=session,
Expand Down
3 changes: 0 additions & 3 deletions airflow-core/src/airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.base import Base, StringID
from airflow.models.dag_version import DagVersion
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -327,7 +326,6 @@ def _create_backfill_dag_run(
)
return

dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
try:
dr = dag.create_dagrun(
run_id=DagRun.generate_run_id(
Expand All @@ -339,7 +337,6 @@ def _create_backfill_dag_run(
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
Expand Down
7 changes: 1 addition & 6 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def _create_orm_dagrun(
conf: Any,
state: DagRunState | None,
run_type: DagRunType,
dag_version: DagVersion | None,
creating_job_id: int | None,
backfill_id: int | None,
triggered_by: DagRunTriggeredByType,
Expand Down Expand Up @@ -290,8 +289,7 @@ def _create_orm_dagrun(
run.dag = dag
# create the associated task instances
# state is None at the moment of creation
if not dag_version:
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
run.verify_integrity(session=session, dag_version_id=dag_version.id if dag_version else None)
return run

Expand Down Expand Up @@ -1793,7 +1791,6 @@ def create_dagrun(
conf: dict | None = None,
run_type: DagRunType,
triggered_by: DagRunTriggeredByType,
dag_version: DagVersion | None = None,
state: DagRunState,
start_date: datetime | None = None,
creating_job_id: int | None = None,
Expand Down Expand Up @@ -1867,7 +1864,6 @@ def create_dagrun(
conf=conf,
state=state,
run_type=run_type,
dag_version=dag_version,
creating_job_id=creating_job_id,
backfill_id=backfill_id,
triggered_by=triggered_by,
Expand Down Expand Up @@ -2622,7 +2618,6 @@ def _get_or_create_dagrun(
run_type=DagRunType.MANUAL,
state=DagRunState.RUNNING,
triggered_by=triggered_by,
dag_version=DagVersion.get_latest_version(dag.dag_id, session=session),
start_date=start_date or logical_date,
session=session,
)
Expand Down
14 changes: 10 additions & 4 deletions airflow-core/src/airflow/models/dag_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,32 @@ def write_dag(
return dag_version

@classmethod
def _latest_version_select(cls, dag_id: str) -> Select:
def _latest_version_select(cls, dag_id: str, bundle_version: str | None = None) -> Select:
"""
Get the select object to get the latest version of the DAG.

:param dag_id: The DAG ID.
:return: The select object.
"""
return select(cls).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1)
query = select(cls).where(cls.dag_id == dag_id)
if bundle_version:
query = query.where(cls.bundle_version == bundle_version)
query = query.order_by(cls.created_at.desc()).limit(1)
return query

@classmethod
@provide_session
def get_latest_version(cls, dag_id: str, *, session: Session = NEW_SESSION) -> DagVersion | None:
def get_latest_version(
cls, dag_id: str, *, bundle_version: str | None = None, session: Session = NEW_SESSION
) -> DagVersion | None:
"""
Get the latest version of the DAG.

:param dag_id: The DAG ID.
:param session: The database session.
:return: The latest version of the DAG or None if not found.
"""
return session.scalar(cls._latest_version_select(dag_id))
return session.scalar(cls._latest_version_select(dag_id, bundle_version=bundle_version))

@classmethod
@provide_session
Expand Down
2 changes: 0 additions & 2 deletions airflow-core/tests/unit/api_fastapi/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from airflow.api_fastapi.app import create_app
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.models import Connection
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.operators.empty import EmptyOperator

Expand Down Expand Up @@ -161,7 +160,6 @@ def make_dag_with_multiple_versions(dag_maker, configure_git_connection_for_dag_
dag_maker.create_dagrun(
run_id=f"run{version_number}",
logical_date=datetime.datetime(2020, 1, version_number, tzinfo=datetime.timezone.utc),
dag_version=DagVersion.get_version(dag_id=dag_id, version_number=version_number),
)
dag.sync_to_db()

Expand Down
3 changes: 0 additions & 3 deletions airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def setup_class(cls):
logical_date=DEFAULT_DATE,
data_interval=data_interval,
run_after=DEFAULT_DATE,
dag_version=None,
triggered_by=DagRunTriggeredByType.TEST,
)

Expand Down Expand Up @@ -353,7 +352,6 @@ def test_task_states_for_dag_run(self):
data_interval=data_interval,
run_after=default_date2,
run_type=DagRunType.MANUAL,
dag_version=None,
triggered_by=DagRunTriggeredByType.CLI,
)
ti2 = TaskInstance(task2, run_id=dagrun.run_id)
Expand Down Expand Up @@ -438,7 +436,6 @@ def setup_method(self) -> None:
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.MANUAL,
dag_version=None,
triggered_by=DagRunTriggeredByType.TEST,
)
self.tis = self.dr.get_task_instances()
Expand Down
Loading