Reduce per-DAG queries during DAG serialization with bulk prefetch#64929
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR optimizes DAG serialization writes by reducing per-DAG database round trips during SerializedDagModel.write_dag, primarily targeting the DAG processing bulk sync path. It introduces a bulk prefetch of write-time metadata (latest serialized DAG timestamp/hash and latest DagVersion) and threads that prefetched metadata through the DAG parsing/serialization workflow.
Changes:
- Add
SerializedDagModel._prefetch_dag_write_metadata()andDagWriteMetadatato bulk-load write-time metadata in two queries and reuse it inwrite_dag. - Update DAG parsing DB sync (
dag_processing.collection) to prefetch metadata once per batch and pass it into per-DAG serialization calls; also pass the caller session intoDagCode.update_source_code. - Add unit tests for the new prefetch helper and adjust an existing dag-processing retry test for the new
write_dag(..., _prefetched=...)call shape.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
airflow-core/src/airflow/models/serialized_dag.py |
Introduces DagWriteMetadata, implements bulk metadata prefetch, and updates write_dag to reuse prefetched values. |
airflow-core/src/airflow/dag_processing/collection.py |
Bulk-prefetches metadata during parsing DB sync, passes _prefetched into serialization, reuses the session for DagCode.update_source_code, and eager-loads dag_owner_links. |
airflow-core/tests/unit/models/test_serialized_dag.py |
Adds unit tests validating prefetch behavior across multiple DAGs and ensuring the latest DagVersion is returned. |
airflow-core/tests/unit/dag_processing/test_collection.py |
Updates retry test expectations to include the new _prefetched parameter passed to SerializedDagModel.write_dag. |
kaxil
reviewed
Apr 11, 2026
kaxil
approved these changes
Apr 11, 2026
d3b8755 to
6e9e308
Compare
Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries.
6e9e308 to
afc1c7c
Compare
ephraimbuddy
added a commit
that referenced
this pull request
Apr 14, 2026
…64929) * Reduce per-DAG queries during DAG serialization with bulk prefetch Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries. * fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch * fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch (cherry picked from commit ef00040)
ephraimbuddy
added a commit
that referenced
this pull request
Apr 14, 2026
…64929) (#65208) * Reduce per-DAG queries during DAG serialization with bulk prefetch Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries. * fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch * fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch (cherry picked from commit ef00040)
vatsrahul1001
pushed a commit
that referenced
this pull request
Apr 15, 2026
…64929) (#65208) * Reduce per-DAG queries during DAG serialization with bulk prefetch Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries. * fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch * fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch (cherry picked from commit ef00040)
vatsrahul1001
pushed a commit
that referenced
this pull request
Apr 15, 2026
…64929) (#65208) * Reduce per-DAG queries during DAG serialization with bulk prefetch Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries. * fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch * fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch (cherry picked from commit ef00040)
vatsrahul1001
pushed a commit
that referenced
this pull request
Apr 15, 2026
…64929) (#65208) * Reduce per-DAG queries during DAG serialization with bulk prefetch Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries. * fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch * fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch (cherry picked from commit ef00040)
vatsrahul1001
pushed a commit
that referenced
this pull request
Apr 15, 2026
…64929) (#65208) * Reduce per-DAG queries during DAG serialization with bulk prefetch Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries. * fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch * fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch (cherry picked from commit ef00040)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries.
Was generative AI tooling used to co-author this PR?
Claude Opus 4.6