Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1552,24 +1552,25 @@ repos:

# TODO: These files need to be refactored to remove SDK coupling
^airflow-core/src/airflow/__init__\.py$|
^airflow-core/src/airflow/models/__init__\.py$|
^airflow-core/src/airflow/api/common/mark_tasks\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/services/public/connections\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/variables\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/services/public/connections\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/services/ui/connections\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/services/ui/task_group.py$|
^airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl\.py$|
^airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances\.py$|
^airflow-core/src/airflow/api_fastapi/logging/decorators\.py$|
^airflow-core/src/airflow/assets/evaluation\.py$|
^airflow-core/src/airflow/assets/manager\.py$|
^airflow-core/src/airflow/cli/commands/connection_command\.py$|
^airflow-core/src/airflow/cli/commands/task_command\.py$|
^airflow-core/src/airflow/cli/commands/triggerer_command.py$|
^airflow-core/src/airflow/configuration\.py$|
^airflow-core/src/airflow/dag_processing/collection\.py$|
^airflow-core/src/airflow/dag_processing/manager\.py$|
Expand All @@ -1582,58 +1583,57 @@ repos:
^airflow-core/src/airflow/listeners/spec/asset\.py$|
^airflow-core/src/airflow/listeners/spec/taskinstance\.py$|
^airflow-core/src/airflow/logging/remote\.py$|
^airflow-core/src/airflow/models/__init__\.py$|
^airflow-core/src/airflow/models/asset\.py$|
^airflow-core/src/airflow/models/baseoperator\.py$|
^airflow-core/src/airflow/models/connection\.py$|
^airflow-core/src/airflow/models/dag\.py$|
^airflow-core/src/airflow/models/deadline\.py$|
^airflow-core/src/airflow/models/dagbag\.py$|
^airflow-core/src/airflow/models/dagrun\.py$|
^airflow-core/src/airflow/models/deadline\.py$|
^airflow-core/src/airflow/models/expandinput\.py$|
^airflow-core/src/airflow/models/mappedoperator\.py$|
^airflow-core/src/airflow/models/operator\.py$|
^airflow-core/src/airflow/models/param\.py$|
^airflow-core/src/airflow/models/renderedtifields\.py$|
^airflow-core/src/airflow/models/serialized_dag\.py$|
^airflow-core/src/airflow/models/taskinstance\.py$|
^airflow-core/src/airflow/models/taskinstancekey\.py$|
^airflow-core/src/airflow/models/taskmap\.py$|
^airflow-core/src/airflow/models/taskmixin\.py$|
^airflow-core/src/airflow/models/taskreschedule\.py$|
^airflow-core/src/airflow/models/variable\.py$|
^airflow-core/src/airflow/models/xcom\.py$|
^airflow-core/src/airflow/models/xcom_arg\.py$|
^airflow-core/src/airflow/operators/subdag\.py$|
^airflow-core/src/airflow/plugins_manager\.py$|
^airflow-core/src/airflow/providers_manager\.py$|
^airflow-core/src/airflow/serialization/dag\.py$|
^airflow-core/src/airflow/serialization/enums\.py$|
^airflow-core/src/airflow/serialization/helpers\.py$|
^airflow-core/src/airflow/serialization/serialized_objects\.py$|
^airflow-core/src/airflow/settings\.py$|
^airflow-core/src/airflow/task/task_runner/bash_task_runner\.py$|
^airflow-core/src/airflow/task/task_runner/standard_task_runner\.py$|
^airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep\.py$|
^airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep\.py$|
^airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep\.py$|
^airflow-core/src/airflow/timetables/assets\.py$|
^airflow-core/src/airflow/timetables/base\.py$|
^airflow-core/src/airflow/timetables/simple\.py$|
^airflow-core/src/airflow/utils/cli\.py$|
^airflow-core/src/airflow/utils/context\.py$|
^airflow-core/src/airflow/utils/dag_cycle_tester\.py$|
^airflow-core/src/airflow/utils/dag_edges\.py$|
^airflow-core/src/airflow/utils/dag_parsing_context\.py$|
^airflow-core/src/airflow/utils/decorators\.py$|
^airflow-core/src/airflow/utils/dot_renderer\.py$|
^airflow-core/src/airflow/utils/edgemodifier\.py$|
^airflow-core/src/airflow/utils/email\.py$|
^airflow-core/src/airflow/utils/helpers\.py$|
^airflow-core/src/airflow/utils/operator_helpers\.py$|
^airflow-core/src/airflow/utils/session\.py$|
^airflow-core/src/airflow/utils/task_group\.py$|
^airflow-core/src/airflow/utils/trigger_rule\.py$|
^airflow-core/src/airflow/utils/xcom\.py$|
^airflow-core/src/airflow/providers_manager\.py$|
^airflow-core/src/airflow/timetables/assets\.py$|
^airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep\.py$|
^airflow-core/src/airflow/utils/context\.py$|
^airflow-core/src/airflow/models/taskmixin\.py$|
^airflow-core/src/airflow/utils/edgemodifier\.py$|
^airflow-core/src/airflow/utils/email\.py$|
^airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep\.py$|
^airflow-core/src/airflow/utils/helpers\.py$|
^airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep\.py$|
^airflow-core/src/airflow/utils/types\.py$|
^airflow-core/src/airflow/utils/dag_edges\.py$|
^airflow-core/src/airflow/utils/cli\.py$|
^airflow-core/src/airflow/timetables/base\.py$|
^airflow-core/src/airflow/utils/dot_renderer\.py$|
^airflow-core/src/airflow/models/xcom_arg\.py$|
^airflow-core/src/airflow/plugins_manager\.py$|
^airflow-core/src/airflow/models/xcom\.py$|
^airflow-core/src/airflow/timetables/simple\.py$|
^airflow-core/src/airflow/settings\.py$|
^airflow-core/src/airflow/models/renderedtifields\.py$|
^airflow-core/src/airflow/serialization/helpers\.py$|
^airflow-core/src/airflow/models/expandinput\.py$|
^airflow-core/src/airflow/cli/commands/triggerer_command.py$
^airflow-core/src/airflow/utils/types\.py$
## ONLY ADD PREK HOOKS HERE THAT REQUIRE CI IMAGE
14 changes: 14 additions & 0 deletions airflow-core/newsfragments/54857.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Remove ``get_task_group_children_getter`` and ``task_group_to_dict`` from task-sdk

The ``get_task_group_children_getter`` and ``task_group_to_dict`` functions have been removed from the task-sdk (``airflow.sdk.definitions.taskgroup``) and moved to server-side API services. These functions are now internal to Airflow's API layer and should not be imported directly by users.

* Types of change

* [ ] Dag changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
_find_aggregates,
_merge_node_dicts,
)
from airflow.api_fastapi.core_api.services.ui.task_group import (
get_task_group_children_getter,
task_group_to_dict_grid,
)
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.definitions.taskgroup import (
get_task_group_children_getter,
task_group_to_dict_grid,
)

log = structlog.get_logger(logger_name=__name__)
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
bind_output_assets_to_tasks,
get_upstream_assets,
)
from airflow.api_fastapi.core_api.services.ui.task_group import task_group_to_dict
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk.definitions.taskgroup import task_group_to_dict
from airflow.utils.dag_edges import dag_edges

structure_router = AirflowRouter(tags=["Structure"], prefix="/structure")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import structlog

from airflow.api_fastapi.common.parameters import state_priority
from airflow.api_fastapi.core_api.services.ui.task_group import get_task_group_children_getter
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskmap import TaskMap
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup, get_task_group_children_getter
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
from airflow.serialization.serialized_objects import SerializedBaseOperator

log = structlog.get_logger(logger_name=__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task group utilities for UI API services."""

from __future__ import annotations

from collections.abc import Callable
from functools import cache
from operator import methodcaller

from airflow.configuration import conf
from airflow.models.mappedoperator import MappedOperator
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
from airflow.serialization.serialized_objects import SerializedBaseOperator


@cache
def get_task_group_children_getter() -> Callable:
"""Get the Task Group Children Getter for the DAG."""
sort_order = conf.get("api", "grid_view_sorting_order")
if sort_order == "topological":
return methodcaller("topological_sort")
return methodcaller("hierarchical_alphabetical_sort")


def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False):
"""Create a nested dict representation of this TaskGroup and its children used to construct the Graph."""
if isinstance(task := task_item_or_group, (SerializedBaseOperator, MappedOperator)):
node_operator = {
"id": task.task_id,
"label": task.label,
"operator": task.operator_name,
"type": "task",
}
if task.is_setup:
node_operator["setup_teardown_type"] = "setup"
elif task.is_teardown:
node_operator["setup_teardown_type"] = "teardown"
if isinstance(task, MappedOperator) or parent_group_is_mapped:
node_operator["is_mapped"] = True
return node_operator

task_group = task_item_or_group
is_mapped = isinstance(task_group, MappedTaskGroup)
children = [
task_group_to_dict(child, parent_group_is_mapped=parent_group_is_mapped or is_mapped)
for child in get_task_group_children_getter()(task_group)
]

if task_group.upstream_group_ids or task_group.upstream_task_ids:
# This is the join node used to reduce the number of edges between two TaskGroup.
children.append({"id": task_group.upstream_join_id, "label": "", "type": "join"})

if task_group.downstream_group_ids or task_group.downstream_task_ids:
# This is the join node used to reduce the number of edges between two TaskGroup.
children.append({"id": task_group.downstream_join_id, "label": "", "type": "join"})

return {
"id": task_group.group_id,
"label": task_group.label,
"tooltip": task_group.tooltip,
"is_mapped": is_mapped,
"children": children,
"type": "task",
}


def task_group_to_dict_grid(task_item_or_group, parent_group_is_mapped=False):
"""Create a nested dict representation of this TaskGroup and its children used to construct the Grid."""
if isinstance(task := task_item_or_group, (MappedOperator, SerializedBaseOperator)):
is_mapped = None
if task.is_mapped or parent_group_is_mapped:
is_mapped = True
setup_teardown_type = None
if task.is_setup is True:
setup_teardown_type = "setup"
elif task.is_teardown is True:
setup_teardown_type = "teardown"
return {
"id": task.task_id,
"label": task.label,
"is_mapped": is_mapped,
"children": None,
"setup_teardown_type": setup_teardown_type,
}

task_group = task_item_or_group
task_group_sort = get_task_group_children_getter()
is_mapped_group = isinstance(task_group, MappedTaskGroup)
children = [
task_group_to_dict_grid(x, parent_group_is_mapped=parent_group_is_mapped or is_mapped_group)
for x in task_group_sort(task_group)
]

return {
"id": task_group.group_id,
"label": task_group.label,
"is_mapped": is_mapped_group or None,
"children": children or None,
}
2 changes: 0 additions & 2 deletions airflow-core/src/airflow/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
},
"task_group": {
"TaskGroup": "airflow.sdk.TaskGroup",
"get_task_group_children_getter": "airflow.sdk.definitions.taskgroup.get_task_group_children_getter",
"task_group_to_dict": "airflow.sdk.definitions.taskgroup.task_group_to_dict",
},
"timezone": {
# Since we have corrected all uses inside core to use the internal version, anything hitting this
Expand Down
Loading