Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5bb169c46e4f09f7e2c78c1233dddb3e320d4a4aa5a202b9cadec87ff1da6ae4
42a5f9219ef3dac4f12b20c3e11557c2aba2602e8f41c5858501ad5e5f82df7e
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``15d84ca19038`` (head) | ``cc92b33c6709`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. |
| ``ab6dc0c82d0e`` (head) | ``15d84ca19038`` | ``3.2.0`` | Change ``serialized_dag`` data column to JSONB for |
| | | | PostgreSQL. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``15d84ca19038`` | ``cc92b33c6709`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``cc92b33c6709`` | ``eaf332f43c7c`` | ``3.1.0`` | Add backward compatibility for serialized DAG format v3 to |
| | | | v2. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def get_dag_structure(
DagRun.id.in_(run_ids),
SerializedDagModel.id != latest_serdag.id,
)
)
),
)
)
merged_nodes: list[GridNodeResponse] = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Change ``serialized_dag`` data column to JSONB for PostgreSQL.

Revision ID: ab6dc0c82d0e
Revises: 15d84ca19038
Create Date: 2025-09-23 12:00:00.000000

"""

from __future__ import annotations

from textwrap import dedent

from alembic import context, op

# revision identifiers, used by Alembic.
revision = "ab6dc0c82d0e"
down_revision = "15d84ca19038"
branch_labels = None
depends_on = None
airflow_version = "3.2.0"


def upgrade():
"""Apply Change serialized_dag data column to JSONB for PostgreSQL."""
conn = op.get_bind()
dialect = conn.dialect.name

if dialect == "postgresql":
if context.is_offline_mode():
print(
dedent("""
------------
-- WARNING: Converting JSON to JSONB in offline mode!
-- This migration converts the 'data' column from JSON to JSONB type.
-- Verify the generated SQL is correct for your use case.
------------
""")
)

# Convert the data column from JSON to JSONB
# This is safe because the column already contains JSON data
op.execute(
"""
ALTER TABLE serialized_dag
ALTER COLUMN data TYPE JSONB
USING data::JSONB
"""
)


def downgrade():
"""Unapply Change serialized_dag data column to JSONB for PostgreSQL."""
conn = op.get_bind()
dialect = conn.dialect.name

if dialect == "postgresql":
if context.is_offline_mode():
print(
dedent("""
------------
-- WARNING: Converting JSONB to JSON in offline mode!
-- This migration converts the 'data' column from JSONB to JSON type.
-- Verify the generated SQL is correct for your use case.
------------
""")
)

# Convert the data column from JSONB back to JSON
op.execute(
"""
ALTER TABLE serialized_dag
ALTER COLUMN data TYPE JSON
USING data::JSON
"""
)
10 changes: 9 additions & 1 deletion airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType
Expand Down Expand Up @@ -285,7 +286,9 @@ class SerializedDagModel(Base):
__tablename__ = "serialized_dag"
id = Column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
dag_id = Column(String(ID_LEN), nullable=False)
_data = Column("data", sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
_data = Column(
"data", sqlalchemy_jsonfield.JSONField(json=json).with_variant(JSONB, "postgresql"), nullable=True
)
_data_compressed = Column("data_compressed", LargeBinary, nullable=True)
created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
last_updated = Column(UtcDateTime, nullable=False, default=timezone.utcnow, onupdate=timezone.utcnow)
Expand Down Expand Up @@ -654,6 +657,11 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[

def load_json(deps_data):
return json.loads(deps_data) if deps_data else []
elif session.bind.dialect.name == "postgresql":
# Use #> operator which works for both JSON and JSONB types
# Returns the JSON sub-object at the specified path
data_col_to_select = cls._data.op("#>")(literal('{"dag","dag_dependencies"}'))
load_json = None
else:
data_col_to_select = func.json_extract_path(cls._data, "dag", "dag_dependencies")
load_json = None
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class MappedClassProtocol(Protocol):
"3.0.0": "29ce7909c52b",
"3.0.3": "fe199e1abd77",
"3.1.0": "cc92b33c6709",
"3.2.0": "15d84ca19038",
"3.2.0": "ab6dc0c82d0e",
}


Expand Down