Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,9 @@ def _schedule_dag_run(
self.log.error("Execution date is in future: %s", dag_run.execution_date)
return callback

self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session):
self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should make this a debug level logging. For example, maybe the dag disappeared during the scheduler loop but on the next loop, it appears. The user doesn't have control over it (I think)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, this shouldn’t be a warning since the user can’t really do anything about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine to show the warning. This is a really, really rare race condition. - and realy intermittent and it can at least give our user an indication and notice that something is wrong.

If we make it ito debug - we will never see it. And when user is asked for logs, there wil never be any suspicious thing there.

I think it should be a warning. just to let us know somthing strange is happening.

return callback
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
if dag_run.state in State.finished:
Expand All @@ -1331,20 +1333,27 @@ def _schedule_dag_run(

return callback_to_run

def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> None:
"""Only run DagRun.verify integrity if Serialized DAG has changed since it is slow"""
def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool:
"""
Only run DagRun.verify integrity if Serialized DAG has changed since it is slow.

Return True if we determine that DAG still exists.
"""
latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
if dag_run.dag_hash == latest_version:
self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
return
return True

dag_run.dag_hash = latest_version

# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
if not dag_run.dag:
return False
Comment on lines +1351 to +1352
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be an exception (caught outside) instead; returning True and False doesn’t feel natural.

Or maybe we should refactor the code around this to be

# This returns
# a. DAG is it changed.
# b. None if not changed.
# c. None if the DAG is not found in DagBag, with a log message.
refreshed_dag = self._get_changed_dag_()
if refreshed_dag is not None:
    dag_run.dag = refreshed_dag
    dag_run.verify_integrity(session=session)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's pretty goot to be like it is. Exceptions are fine, but sometimes True/False is fine - especially if the method is named as "verify..".


# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)
return True

def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackRequest | None = None) -> None:
self._send_sla_callbacks_to_processor(dag)
Expand Down
44 changes: 44 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2699,6 +2699,50 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
session.rollback()
session.close()

def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog):
# CleanUp
with create_session() as session:
session.query(SerializedDagModel).filter(
SerializedDagModel.dag_id == "test_verify_integrity_if_dag_disappeared"
).delete(synchronize_session=False)

with dag_maker(dag_id="test_verify_integrity_if_dag_disappeared") as dag:
BashOperator(task_id="dummy", bash_command="echo hi")

self.scheduler_job = SchedulerJob(subdir=os.devnull)

session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.processor_agent = mock.MagicMock()
dag = self.scheduler_job.dagbag.get_dag("test_verify_integrity_if_dag_disappeared", session=session)
self.scheduler_job._create_dag_runs([orm_dag], session)
dag_id = dag.dag_id
drs = DagRun.find(dag_id=dag_id, session=session)
assert len(drs) == 1
dr = drs[0]

dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session)
assert dr.dag_hash == dag_version_1
assert self.scheduler_job.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag}
assert len(self.scheduler_job.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1

SerializedDagModel.remove_dag(dag_id=dag_id)
dag = self.scheduler_job.dagbag.dags[dag_id]
self.scheduler_job.dagbag.dags = MagicMock()
self.scheduler_job.dagbag.dags.get.side_effect = [dag, None]
session.flush()
with caplog.at_level(logging.WARNING):
callback = self.scheduler_job._schedule_dag_run(dr, session)
assert "The DAG disappeared before verifying integrity" in caplog.text

assert callback is None

session.rollback()
session.close()

@pytest.mark.need_serialized_dag
def test_retry_still_in_executor(self, dag_maker):
"""
Expand Down