Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual DAG runs cause prev_data_interval_start_success and prev_data_interval_end_success to behave unexpectedly #28016

Open
1 of 2 tasks
matthewblock opened this issue Nov 30, 2022 · 2 comments
Labels
airflow3.0:breaking Candidates for Airflow 3.0 that contain breaking changes airflow3.0:candidate Potential candidates for Airflow 3.0 area:core kind:feature Feature Requests

Comments

@matthewblock
Copy link

matthewblock commented Nov 30, 2022

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Using Airflow Version 2.4.2

Manually triggered DAG runs interspersed between scheduled DAG runs cause the scheduled DAG runs' template variables — in our specific use case, prev_data_interval_start_success and prev_data_interval_end_success — to no longer behave as expected. This appears to be due to how Airflow determines a previous DAG run.

When we have mixed manually triggered and scheduled DAG runs, the logical_date are not equivalent - For manually triggered DAG runs, the logical_date is the exact timestamp the DAG was triggered.

The following table shows mixed manually triggered and scheduled DAG runs, and some of the templated timestamps. Notice that this table is ordered by the Actual execution date, which is the exact timestamp the DAG executed, and that the logical_date is out of order for manually triggered DAGs.

Bolded italics are the template variables that IMO do not behave as predicted:

DAG Run Triggered logical_date Actual execution date data_interval
_start
data_interval
_end
prev_data_
interval_
start_success
prev_data_
interval_
end_success
1 Scheduled 1:05:00 1:10:29 1:05:00 1:10:00 None None
2 Manual 1:11:26 1:11:26 1:05:00 1:10:00 1:05:00 1:10:00
3 Scheduled 1:10:00 1:15:00 1:10:00 1:15:00 1:05:00 1:10:00
4 Manual 1:17:51 1:17:52 1:10:00 1:15:00 1:05:00 1:10:00
5 Scheduled 1:15:00 1:20:01 1:15:00 1:20:00 1:05:00 1:10:00
6 Scheduled 1:20:00 1:25:00 1:20:00 1:25:00 1:10:00 1:15:00
7 Scheduled 1:25:00 1:30:01 1:25:00 1:30:00 1:20:00 1:25:00

For the bolded scheduled DAG runs, note that the prev_data_interval_start_success and prev_data_interval_end_success both point to the data interval that is technically earlier than the most recent successful DAG run's data interval.

Causes

I believe this is due to the way Airflow calculates the previous DAG run. If we order by logical_date, this function will prioritize manually triggered DAG runs, because:

  • Manually triggered DAG runs' logical_date (==execution_date) are later than data_interval_end
  • Scheduled DAG runs' logical_date are equal to data_interval_start, which occurs before data_interval_end for an equivalent "trigger" timestamp

See in airflow/models/dagrun.py:

    @provide_session
    def get_previous_dagrun(
        self, state: DagRunState | None = None, session: Session = NEW_SESSION
    ) -> DagRun | None:
        """The previous DagRun, if there is one"""
        filters = [
            DagRun.dag_id == self.dag_id,
            DagRun.execution_date < self.execution_date, <-- THIS LINE
        ]
        if state is not None:
            filters.append(DagRun.state == state)
        return session.query(DagRun).filter(*filters).order_by(DagRun.execution_date.desc()).first()

What you think should happen instead

IMO this is a bug - DAG Run 6 in the table above should identify DAG Run 5 as the most recent successful DAG run, and the template variables should reflect this accordingly.

A deeper issue in how Airflow draws the line between scheduled DAG runs and manually triggered DAG runs.

There have been discussions on keeping manually triggered DAGs and scheduled DAGs entirely separate - best practice being don't ever manually trigger scheduled DAG runs.

In our use case, we want to be able to easily trigger a DAG run when there is a gap in execution, and that DAG run should have a data interval that covers any gaps.

Possible Solutions

  • For DAGs that have a time-based schedule (not null and not a Dataset), manually triggered DAG runs set logical_date to data_interval_start
  • Enhance backfill feature by enabling it via UI, not just CLI
    • Backfill should be much more accessible to Airflow users

How to reproduce

Sample DAG that prints template variables every 5 minutes.

To reproduce, manually trigger the DAG between scheduled DAG runs and note the lag in prev_data_interval_start_success and prev_data_interval_end_success.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 1, 1),
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
}


def print_data_intervals(
    data_interval_start: datetime,
    data_interval_end: datetime,
    prev_data_interval_start_success: datetime,
    prev_data_interval_end_success: datetime,
    logical_date: datetime,
):
    print(f"{data_interval_start=}")
    print(f"{data_interval_end=}")
    print(f"{prev_data_interval_start_success=}")
    print(f"{prev_data_interval_end_success=}")
    print(f"{logical_date=}")


with DAG(
    dag_id="test_data_intervals",
    default_args=default_args,
    schedule="*/5 * * * *",
    catchup=False,
):
    task = PythonOperator(
        task_id="print_data_intervals",
        python_callable=print_data_intervals,
    )

    task

Operating System

debian

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@matthewblock matthewblock added area:core kind:bug This is a clearly a bug labels Nov 30, 2022
@uranusjr
Copy link
Member

uranusjr commented Dec 1, 2022

Unfortunately we can’t change how manual runs’ logical dates are calculated without breaking compatibility. Enhancing backfilling is a possible approach (and I believe has been raised previously).

A solution would be to introduce a new timetable that does what you want, and use that instead of the cron expression, which I think is the most reasonable. (The backfill thing is worth doing regardless of this addition.)

@uranusjr uranusjr added kind:feature Feature Requests and removed kind:bug This is a clearly a bug labels Dec 1, 2022
@eladkal
Copy link
Contributor

eladkal commented Dec 1, 2022

There have been #22232 on keeping manually triggered DAGs and scheduled DAGs entirely separate - best practice being don't ever manually trigger scheduled DAG runs.

I also don't mix scheduled with manual. More over when I need to rerun something I do it with clear - I don't trigger a manual DagRun to compensate. it just make things harder to track.

In our use case, we want to be able to easily trigger a DAG run when there is a gap in execution, and that DAG run should have a data interval that covers any gaps.

Like @uranusjr mentioned this is a feature we can do only in Airflow 3 as this is a breaking change (and Airflow is 3 is far far far in the future).
I suggest to follow the advice given by @uranusjr to workaround this problem for your case.

If no other thoughts I will convert this into a GItHub discussion since this feature request is not actionable

@uranusjr uranusjr added airflow3.0:candidate Potential candidates for Airflow 3.0 airflow3.0:breaking Candidates for Airflow 3.0 that contain breaking changes and removed involves core breaking change labels Aug 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
airflow3.0:breaking Candidates for Airflow 3.0 that contain breaking changes airflow3.0:candidate Potential candidates for Airflow 3.0 area:core kind:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

3 participants