Skip to content

Reduce per-DAG queries during DAG serialization with bulk prefetch#64929

Merged
ephraimbuddy merged 3 commits into
apache:mainfrom
astronomer:optimize-dag-serialization
Apr 13, 2026
Merged

Reduce per-DAG queries during DAG serialization with bulk prefetch#64929
ephraimbuddy merged 3 commits into
apache:mainfrom
astronomer:optimize-dag-serialization

Conversation

@ephraimbuddy
Copy link
Copy Markdown
Contributor

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash comparison, version fetch) with 2 bulk queries via a new _prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code to reuse the caller's session and eagerly loads dag_owner_links to prevent N+1 queries.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    Claude Opus 4.6

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes DAG serialization writes by reducing per-DAG database round trips during SerializedDagModel.write_dag, primarily targeting the DAG processing bulk sync path. It introduces a bulk prefetch of write-time metadata (latest serialized DAG timestamp/hash and latest DagVersion) and threads that prefetched metadata through the DAG parsing/serialization workflow.

Changes:

  • Add SerializedDagModel._prefetch_dag_write_metadata() and DagWriteMetadata to bulk-load write-time metadata in two queries and reuse it in write_dag.
  • Update DAG parsing DB sync (dag_processing.collection) to prefetch metadata once per batch and pass it into per-DAG serialization calls; also pass the caller session into DagCode.update_source_code.
  • Add unit tests for the new prefetch helper and adjust an existing dag-processing retry test for the new write_dag(..., _prefetched=...) call shape.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
airflow-core/src/airflow/models/serialized_dag.py Introduces DagWriteMetadata, implements bulk metadata prefetch, and updates write_dag to reuse prefetched values.
airflow-core/src/airflow/dag_processing/collection.py Bulk-prefetches metadata during parsing DB sync, passes _prefetched into serialization, reuses the session for DagCode.update_source_code, and eager-loads dag_owner_links.
airflow-core/tests/unit/models/test_serialized_dag.py Adds unit tests validating prefetch behavior across multiple DAGs and ensuring the latest DagVersion is returned.
airflow-core/tests/unit/dag_processing/test_collection.py Updates retry test expectations to include the new _prefetched parameter passed to SerializedDagModel.write_dag.

Comment thread airflow-core/src/airflow/dag_processing/collection.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comment thread airflow-core/src/airflow/dag_processing/collection.py Outdated
Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.
@ephraimbuddy ephraimbuddy force-pushed the optimize-dag-serialization branch from 6e9e308 to afc1c7c Compare April 12, 2026 18:39
@ephraimbuddy ephraimbuddy merged commit ef00040 into apache:main Apr 13, 2026
80 checks passed
@ephraimbuddy ephraimbuddy deleted the optimize-dag-serialization branch April 13, 2026 07:19
@ephraimbuddy ephraimbuddy added this to the Airflow 3.2.1 milestone Apr 13, 2026
ephraimbuddy added a commit that referenced this pull request Apr 14, 2026
…64929)

* Reduce per-DAG queries during DAG serialization with bulk prefetch

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.

* fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

* fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

(cherry picked from commit ef00040)
ephraimbuddy added a commit that referenced this pull request Apr 14, 2026
…64929) (#65208)

* Reduce per-DAG queries during DAG serialization with bulk prefetch

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.

* fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

* fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

(cherry picked from commit ef00040)
vatsrahul1001 pushed a commit that referenced this pull request Apr 15, 2026
…64929) (#65208)

* Reduce per-DAG queries during DAG serialization with bulk prefetch

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.

* fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

* fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

(cherry picked from commit ef00040)
vatsrahul1001 pushed a commit that referenced this pull request Apr 15, 2026
…64929) (#65208)

* Reduce per-DAG queries during DAG serialization with bulk prefetch

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.

* fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

* fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

(cherry picked from commit ef00040)
vatsrahul1001 pushed a commit that referenced this pull request Apr 15, 2026
…64929) (#65208)

* Reduce per-DAG queries during DAG serialization with bulk prefetch

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.

* fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

* fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

(cherry picked from commit ef00040)
vatsrahul1001 pushed a commit that referenced this pull request Apr 15, 2026
…64929) (#65208)

* Reduce per-DAG queries during DAG serialization with bulk prefetch

Replaces 3 SELECTs per DAG in write_dag (update interval check, hash
comparison, version fetch) with 2 bulk queries via a new
_prefetch_dag_write_metadata classmethod. Also fixes DagCode.update_source_code
to reuse the caller's session and eagerly loads dag_owner_links to prevent
N+1 queries.

* fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

* fixup! fixup! Reduce per-DAG queries during DAG serialization with bulk prefetch

(cherry picked from commit ef00040)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants