Skip to content

Commit

Permalink
Flush dataset events before queuing dagruns (#26276)
Browse files Browse the repository at this point in the history
When we go to schedule dagruns from the dataset dagrun queue, we assume
the events will happen before the queue records, which isn't the case
unless we explicitly flush them first. This ensures that dagruns are
properly related to their upstream dataset events.
  • Loading branch information
jedcunningham authored Sep 9, 2022
1 parent eb03959 commit 954349a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def register_dataset_change(
extra=extra,
)
)
session.flush()
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()
Expand Down
1 change: 1 addition & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,7 @@ def _create_dag_runs_dataset_triggered(
]
if previous_dag_run:
dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date)

dataset_events = (
session.query(DatasetEvent)
.join(
Expand Down
6 changes: 6 additions & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,12 @@ def test_outlet_datasets(self, create_task_instance):
DatasetEvent.source_task_instance == ti
).one() == ('s3://dag1/output_1.txt',)

# check that the dataset event has an earlier timestamp than the DDRQ's
ddrq_timestamps = (
session.query(DatasetDagRunQueue.created_at).filter_by(dataset_id=event.dataset.id).all()
)
assert all([event.timestamp < ddrq_timestamp for (ddrq_timestamp,) in ddrq_timestamps])

def test_outlet_datasets_failed(self, create_task_instance):
"""
Verify that when we have an outlet dataset on a task, and the task
Expand Down

0 comments on commit 954349a

Please sign in to comment.