-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Handle DAG disappearing mid-flight when dag verification happens #27720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1315,7 +1315,9 @@ def _schedule_dag_run( | |
| self.log.error("Execution date is in future: %s", dag_run.execution_date) | ||
| return callback | ||
|
|
||
| self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session) | ||
| if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session): | ||
| self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id) | ||
| return callback | ||
| # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? | ||
| schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) | ||
| if dag_run.state in State.finished: | ||
|
|
@@ -1331,20 +1333,27 @@ def _schedule_dag_run( | |
|
|
||
| return callback_to_run | ||
|
|
||
| def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> None: | ||
| """Only run DagRun.verify integrity if Serialized DAG has changed since it is slow""" | ||
| def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool: | ||
| """ | ||
| Only run DagRun.verify integrity if Serialized DAG has changed since it is slow. | ||
|
|
||
| Return True if we determine that DAG still exists. | ||
| """ | ||
| latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session) | ||
| if dag_run.dag_hash == latest_version: | ||
| self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id) | ||
| return | ||
| return True | ||
|
|
||
| dag_run.dag_hash = latest_version | ||
|
|
||
| # Refresh the DAG | ||
| dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session) | ||
| if not dag_run.dag: | ||
| return False | ||
|
||
|
|
||
| # Verify integrity also takes care of session.flush | ||
| dag_run.verify_integrity(session=session) | ||
| return True | ||
|
|
||
| def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackRequest | None = None) -> None: | ||
| self._send_sla_callbacks_to_processor(dag) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should make this a debug level logging. For example, maybe the dag disappeared during the scheduler loop but on the next loop, it appears. The user doesn't have control over it (I think)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, this shouldn’t be a warning since the user can’t really do anything about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine to show the warning. This is a really, really rare race condition. - and realy intermittent and it can at least give our user an indication and notice that something is wrong.
If we make it ito debug - we will never see it. And when user is asked for logs, there wil never be any suspicious thing there.
I think it should be a warning. just to let us know somthing strange is happening.