Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import BaseXCom
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
from airflow.providers_manager import ProvidersManager
from airflow.sdk.definitions.asset import (
Expand All @@ -63,7 +64,6 @@
BaseAsset,
)
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
Expand All @@ -88,6 +88,7 @@
)
from airflow.utils.db import LazySelectSequence
from airflow.utils.docs import get_docs_url
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.timezone import from_timestamp, parse_timezone
Expand Down Expand Up @@ -1991,3 +1992,38 @@ def get_run_data_interval(self, run: DagRun) -> DataInterval:
access_control: Mapping[str, Mapping[str, Collection[str]] | Collection[str]] | None = pydantic.Field(
init=False, default=None
)


@attrs.define()
class XComOperatorLink(LoggingMixin):
"""A generic operator link class that can retrieve link only using XCOMs. Used while deserializing operators."""

name: str
xcom_key: str

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
"""
Retrieve the link from the XComs.

:param operator: The Airflow operator object this link is associated to.
:param ti_key: TaskInstance ID to return link for.
:return: link to external system, but by pulling it from XComs
"""
self.log.info(
"Attempting to retrieve link from XComs with key: %s for task id: %s", self.xcom_key, ti_key
)
value = BaseXCom.get_one(
key=self.xcom_key,
run_id=ti_key.run_id,
dag_id=ti_key.dag_id,
task_id=ti_key.task_id,
map_index=ti_key.map_index,
)
if not value:
self.log.debug(
"No link with name: %s present in XCom as key: %s, returning empty link",
self.name,
self.xcom_key,
)
return ""
return value
38 changes: 0 additions & 38 deletions task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,11 @@

import attrs

from airflow.models.xcom import BaseXCom
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey


@attrs.define()
class XComOperatorLink(LoggingMixin):
"""A generic operator link class that can retrieve link only using XCOMs. Used while deserializing operators."""

name: str
xcom_key: str

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
"""
Retrieve the link from the XComs.

:param operator: The Airflow operator object this link is associated to.
:param ti_key: TaskInstance ID to return link for.
:return: link to external system, but by pulling it from XComs
"""
self.log.info(
"Attempting to retrieve link from XComs with key: %s for task id: %s", self.xcom_key, ti_key
)
value = BaseXCom.get_one(
key=self.xcom_key,
run_id=ti_key.run_id,
dag_id=ti_key.dag_id,
task_id=ti_key.task_id,
map_index=ti_key.map_index,
)
if not value:
self.log.debug(
"No link with name: %s present in XCom as key: %s, returning empty link",
self.name,
self.xcom_key,
)
return ""
return value


@attrs.define()
class BaseOperatorLink(metaclass=ABCMeta):
"""Abstract base class that defines how we get an operator link."""
Expand Down
2 changes: 1 addition & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.security import permissions
from airflow.serialization.enums import Encoding
Expand All @@ -76,6 +75,7 @@
BaseSerialization,
SerializedBaseOperator,
SerializedDAG,
XComOperatorLink,
)
from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
from airflow.timetables.simple import NullTimetable, OnceTimetable
Expand Down
Loading