Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5695a7ac6b40a0756c266fd4908e85d1857d18946e5be9735bf329fffc4beeff
99114dbbb7c6e7ae792b0dd380c55901aea3185c7e8f969c2e44b8fa69e2a467
573 changes: 284 additions & 289 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``665854ef0536`` (head) | ``e812941398f4`` | ``3.2.0`` | Update ORM for asset partitioning. |
| ``b12d4f98a91e`` (head) | ``665854ef0536`` | ``3.1.0`` | Drop ``id`` column from ``team`` table and make ``name`` the |
| | | | primary key. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``665854ef0536`` | ``e812941398f4`` | ``3.2.0`` | Update ORM for asset partitioning. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``e812941398f4`` | ``b87d2135fa50`` | ``3.2.0`` | Replace deadline's inline callback fields with foreign key |
| | | | to callback table. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def get_authorized_connections(
:param method: the method to filter on
:param session: the session
"""
stmt = select(Connection.conn_id, Team.name).join(Team, Connection.team_id == Team.id, isouter=True)
stmt = select(Connection.conn_id, Connection.team_name)
rows = session.execute(stmt).all()
connections_by_team: dict[str | None, set[str]] = defaultdict(set)
for conn_id, team_name in rows:
Expand Down Expand Up @@ -524,14 +524,13 @@ def get_authorized_dag_ids(
:param session: the session
"""
stmt = (
select(DagModel.dag_id, Team.name)
select(DagModel.dag_id, dag_bundle_team_association_table.c.team_name)
.join(DagBundleModel, DagModel.bundle_name == DagBundleModel.name)
.join(
dag_bundle_team_association_table,
DagBundleModel.name == dag_bundle_team_association_table.c.dag_bundle_name,
isouter=True,
)
.join(Team, Team.id == dag_bundle_team_association_table.c.team_id, isouter=True)
)
rows = session.execute(stmt).all()
dags_by_team: dict[str | None, set[str]] = defaultdict(set)
Expand Down Expand Up @@ -592,7 +591,7 @@ def get_authorized_pools(
:param method: the method to filter on
:param session: the session
"""
stmt = select(Pool.pool, Team.name).join(Team, Pool.team_id == Team.id, isouter=True)
stmt = select(Pool.pool, Pool.team_name)
rows = session.execute(stmt).all()
pools_by_team: dict[str | None, set[str]] = defaultdict(set)
for pool_name, team_name in rows:
Expand Down Expand Up @@ -652,8 +651,8 @@ def get_authorized_teams(
:param method: the method to filter on
:param session: the session
"""
teams = Team.get_all_teams_id_to_name_mapping(session=session)
return self.filter_authorized_teams(teams_names=set(teams.values()), user=user, method=method)
team_names = Team.get_all_team_names(session=session)
return self.filter_authorized_teams(teams_names=team_names, user=user, method=method)

def filter_authorized_teams(
self,
Expand Down Expand Up @@ -694,7 +693,7 @@ def get_authorized_variables(
:param method: the method to filter on
:param session: the session
"""
stmt = select(Variable.key, Team.name).join(Team, Variable.team_id == Team.id, isouter=True)
stmt = select(Variable.key, Variable.team_name)
rows = session.execute(stmt).all()
variables_by_team: dict[str | None, set[str]] = defaultdict(set)
for var_key, team_name in rows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
# under the License.
from __future__ import annotations

from uuid import UUID

from airflow.api_fastapi.core_api.base import BaseModel


class TeamResponse(BaseModel):
"""Base serializer for Team."""

id: UUID
name: str


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import json
from collections.abc import Iterable
from uuid import UUID

from pydantic import Field, JsonValue, model_validator

Expand All @@ -36,7 +35,7 @@ class VariableResponse(BaseModel):
val: str = Field(alias="value")
description: str | None
is_encrypted: bool
team_id: UUID | None
team_name: str | None

@model_validator(mode="after")
def redact_val(self) -> Self:
Expand All @@ -59,7 +58,7 @@ class VariableBody(StrictBaseModel):
key: str = Field(max_length=ID_LEN)
value: JsonValue = Field(serialization_alias="val")
description: str | None = Field(default=None)
team_id: UUID | None = Field(default=None)
team_name: str | None = Field(max_length=50, default=None)


class VariableCollectionResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,12 +1075,12 @@ paths:
items:
type: string
description: 'Attributes to order by, multi criteria sort is supported.
Prefix with `-` for descending order. Supported attributes: `id`'
Prefix with `-` for descending order. Supported attributes: `name`'
default:
- id
- name
title: Order By
description: 'Attributes to order by, multi criteria sort is supported. Prefix
with `-` for descending order. Supported attributes: `id`'
with `-` for descending order. Supported attributes: `name`'
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -2642,16 +2642,11 @@ components:
description: Team collection serializer for responses.
TeamResponse:
properties:
id:
type: string
format: uuid
title: Id
name:
type: string
title: Name
type: object
required:
- id
- name
title: TeamResponse
description: Base serializer for Team.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13065,12 +13065,12 @@ components:
- type: string
- type: 'null'
title: Description
team_id:
team_name:
anyOf:
- type: string
format: uuid
maxLength: 50
- type: 'null'
title: Team Id
title: Team Name
additionalProperties: false
type: object
required:
Expand Down Expand Up @@ -13110,19 +13110,18 @@ components:
is_encrypted:
type: boolean
title: Is Encrypted
team_id:
team_name:
anyOf:
- type: string
format: uuid
- type: 'null'
title: Team Id
title: Team Name
type: object
required:
- key
- value
- description
- is_encrypted
- team_id
- team_name
title: VariableResponse
description: Variable serializer for responses.
VersionInfo:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def list_teams(
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(SortParam(["id"], Team).dynamic_depends()),
Depends(SortParam(["name"], Team).dynamic_depends()),
],
readable_teams_filter: ReadableTeamsFilterDep,
session: SessionDep,
Expand Down
13 changes: 7 additions & 6 deletions airflow-core/src/airflow/cli/commands/team_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def _show_teams(teams, output):
data=teams,
output=output,
mapper=lambda x: {
"id": str(x.id),
"name": x.name,
},
)
Expand Down Expand Up @@ -71,7 +70,7 @@ def team_create(args, session=NEW_SESSION):
try:
session.add(new_team)
session.commit()
print(f"Team '{team_name}' created successfully with ID: {new_team.id}")
print(f"Team '{team_name}' created successfully.")
except IntegrityError as e:
session.rollback()
raise SystemExit(f"Failed to create team '{team_name}': {e}")
Expand All @@ -96,23 +95,25 @@ def team_delete(args, session=NEW_SESSION):
dag_bundle_count = session.scalar(
select(func.count())
.select_from(dag_bundle_team_association_table)
.where(dag_bundle_team_association_table.c.team_id == team.id)
.where(dag_bundle_team_association_table.c.team_name == team.name)
)
if dag_bundle_count:
associations.append(f"{dag_bundle_count} DAG bundle(s)")

# Check connection associations
if connection_count := session.scalar(
select(func.count(Connection.id)).where(Connection.team_id == team.id)
select(func.count(Connection.id)).where(Connection.team_name == team.name)
):
associations.append(f"{connection_count} connection(s)")

# Check variable associations
if variable_count := session.scalar(select(func.count(Variable.id)).where(Variable.team_id == team.id)):
if variable_count := session.scalar(
select(func.count(Variable.id)).where(Variable.team_name == team.name)
):
associations.append(f"{variable_count} variable(s)")

# Check pool associations
if pool_count := session.scalar(select(func.count(Pool.id)).where(Pool.team_id == team.id)):
if pool_count := session.scalar(select(func.count(Pool.id)).where(Pool.team_name == team.name)):
associations.append(f"{pool_count} pool(s)")

# If there are associations, prevent deletion
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Drop ``id`` column from ``team`` table and make ``name`` the primary key.

Revision ID: b12d4f98a91e
Revises: 665854ef0536
Create Date: 2025-12-05
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

revision = "b12d4f98a91e"
down_revision = "665854ef0536"
branch_labels = None
depends_on = None
airflow_version = "3.1.0"


def upgrade():
# Drop team id references
for table in ("connection", "variable", "slot_pool"):
with op.batch_alter_table(table) as batch_op:
batch_op.drop_constraint(batch_op.f(f"{table}_team_id_fkey"), type_="foreignkey")

with op.batch_alter_table("dag_bundle_team") as batch_op:
batch_op.drop_constraint("dag_bundle_team_team_id_fkey", type_="foreignkey")
batch_op.drop_index("idx_dag_bundle_team_team_id")

for table in ("connection", "variable", "slot_pool"):
with op.batch_alter_table(table) as batch_op:
batch_op.alter_column(
"team_id",
new_column_name="team_name",
type_=sa.String(50),
existing_type=sa.String(),
nullable=True,
)

with op.batch_alter_table("dag_bundle_team") as batch_op:
batch_op.alter_column(
"team_id",
new_column_name="team_name",
type_=sa.String(50),
nullable=False,
)

# Team table
with op.batch_alter_table("team") as batch_op:
batch_op.drop_constraint("team_pkey", type_="primary")
batch_op.drop_constraint("team_name_uq", type_="unique")
batch_op.drop_column("id")
batch_op.create_primary_key("team_pkey", ["name"])

with op.batch_alter_table("dag_bundle_team") as batch_op:
batch_op.create_index("idx_dag_bundle_team_team_name", ["team_name"])
batch_op.create_foreign_key(
"dag_bundle_team_team_name_fkey",
"team",
["team_name"],
["name"],
ondelete="CASCADE",
)

# Recreate foreign keys referencing team.name
for table in ("connection", "variable", "slot_pool"):
with op.batch_alter_table(table) as batch_op:
batch_op.create_foreign_key(
batch_op.f(f"{table}_team_name_fkey"),
"team",
local_cols=["team_name"],
remote_cols=["name"],
ondelete="SET NULL",
)


def downgrade():
# Drop FKs pointing to name
for table in ("connection", "variable", "slot_pool"):
with op.batch_alter_table(table) as batch_op:
batch_op.drop_constraint(batch_op.f(f"{table}_team_name_fkey"), type_="foreignkey")

with op.batch_alter_table("dag_bundle_team") as batch_op:
batch_op.drop_constraint("dag_bundle_team_team_name_fkey", type_="foreignkey")
batch_op.drop_index("idx_dag_bundle_team_team_name")

# Add back team.id
with op.batch_alter_table("team") as batch_op:
batch_op.drop_constraint("team_pkey", type_="primary")
batch_op.add_column(sa.Column("id", sa.String(36), nullable=False))
batch_op.create_unique_constraint("team_name_uq", ["name"])
batch_op.create_primary_key("team_pkey", ["id"])

# Rename team_name → team_id
for table in ("connection", "variable", "slot_pool"):
with op.batch_alter_table(table) as batch_op:
batch_op.alter_column(
"team_name",
new_column_name="team_id",
type_=sa.String(36),
nullable=True,
)

with op.batch_alter_table("dag_bundle_team") as batch_op:
batch_op.alter_column(
"team_name",
new_column_name="team_id",
type_=sa.String(36),
nullable=False,
)

# Re-create FK on old id
for table in ("connection", "variable", "slot_pool"):
with op.batch_alter_table(table) as batch_op:
batch_op.create_foreign_key(
batch_op.f(f"{table}_team_id_fkey"),
"team",
["team_id"],
["id"],
)

with op.batch_alter_table("dag_bundle_team") as batch_op:
batch_op.create_foreign_key(
"dag_bundle_team_team_id_fkey",
"team",
["team_id"],
["id"],
ondelete="CASCADE",
)
Loading
Loading