Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,166 @@
# under the License.
from __future__ import annotations

from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any

def _get_asset_compat_hook_lineage_collector():
from airflow.lineage.hook import get_hook_lineage_collector
from airflow.lineage.hook import LineageContext

collector = get_hook_lineage_collector()

if all(
getattr(collector, asset_method_name, None)
for asset_method_name in ("add_input_asset", "add_output_asset", "collected_assets")
def _lacks_asset_methods(collector):
"""Return True if the collector is missing any asset-related methods or properties."""
if ( # lazy evaluation, early return
hasattr(collector, "add_input_asset") # method
and hasattr(collector, "add_output_asset") # method
and hasattr(collector, "create_asset") # method
# If below we called hasattr(collector, "collected_assets") we'd call the property unnecessarily
and hasattr(type(collector), "collected_assets") # property
):
return collector
return False

# dataset is renamed as asset in Airflow 3.0
return True


def _lacks_add_extra_method(collector):
"""Return True if the collector does not define an 'add_extra' method."""
# Method may be on class and attribute may be dynamically set on instance
if hasattr(collector, "add_extra") and hasattr(collector, "_extra"):
return False
return True


def _add_extra_polyfill(collector):
"""
Add support for `add_extra` method to collector that may be lacking it (e.g., Airflow versions < 3.2.).

This polyfill adds the `add_extra` method to a class, modifies `collected_assets` and `has_collected`
properties and sets `_extra` and `_extra_counts` attributes on instance if not already there.

This function should be called after renaming on collectors that have `collected_assets` method,
so f.e. for Airflow 2 it should happen after renaming from dataset to asset.
"""
import hashlib
import json
from collections import defaultdict

import attr

from airflow.lineage.hook import HookLineage as _BaseHookLineage

# Add `extra` to HookLineage returned by `collected_assets` property
@attr.define
class ExtraLineageInfo:
"""
Holds lineage information for arbitrary non-asset metadata.

This class represents additional lineage context captured during a hook execution that is not
associated with a specific asset. It includes the metadata payload itself, the count of
how many times it has been encountered, and the context in which it was encountered.
"""

key: str
value: Any
count: int
context: LineageContext

@attr.define
class HookLineage(_BaseHookLineage):
# mypy is not happy, as base class is using other ExtraLineageInfo, but this code will never
# run on AF3.2, where this other one is used, so this is fine - we can ignore.
extra: list[ExtraLineageInfo] = attr.field(factory=list) # type: ignore[assignment]

# Initialize extra tracking attributes on this collector instance
collector._extra = {}
collector._extra_counts = defaultdict(int)

# Overwrite the `collected_assets` property on a class
_original_collected_assets = collector.__class__.collected_assets

def _compat_collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
# Defensive check since we patch the class property, but initialized _extra only on this instance.
if not hasattr(self, "_extra"):
self._extra = {}
if not hasattr(self, "_extra_counts"):
self._extra_counts = defaultdict(int)

# call the original `collected_assets` getter
lineage = _original_collected_assets.fget(self)
extra_list = [
ExtraLineageInfo(
key=key,
value=value,
count=self._extra_counts[count_key],
context=context,
)
for count_key, (key, value, context) in self._extra.items()
]
return HookLineage(
inputs=lineage.inputs,
outputs=lineage.outputs,
extra=extra_list,
)

type(collector).collected_assets = property(_compat_collected_assets)

# Overwrite the `has_collected` property on a class
_original_has_collected = collector.__class__.has_collected

def _compat_has_collected(self) -> bool:
# Defensive check since we patch the class property, but initialized _extra only on this instance.
if not hasattr(self, "_extra"):
self._extra = {}
# call the original `has_collected` getter
has_collected = _original_has_collected.fget(self)
return bool(has_collected or self._extra)

type(collector).has_collected = property(_compat_has_collected)

# Add `add_extra` method on the class
def _compat_add_extra(self, context, key, value):
"""Add extra information for older Airflow versions."""
_max_collected_extra = 200

if len(self._extra) >= _max_collected_extra:
if hasattr(self, "log"):
self.log.debug("Maximum number of extra exceeded. Skipping.")
return

if not key or not value:
if hasattr(self, "log"):
self.log.debug("Missing required parameter: both 'key' and 'value' must be provided.")
return

# Defensive check since we patch the class property, but initialized _extra only on this instance.
if not hasattr(self, "_extra"):
self._extra = {}
if not hasattr(self, "_extra_counts"):
self._extra_counts = defaultdict(int)

extra_str = json.dumps(value, sort_keys=True, default=str)
value_hash = hashlib.md5(extra_str.encode()).hexdigest()
entry_id = f"{key}_{value_hash}_{id(context)}"
if entry_id not in self._extra:
self._extra[entry_id] = (key, value, context)
self._extra_counts[entry_id] += 1

if len(self._extra) == _max_collected_extra:
if hasattr(self, "log"):
self.log.warning("Maximum number of extra exceeded. Skipping subsequent inputs.")

type(collector).add_extra = _compat_add_extra
return collector


def _add_asset_naming_compatibility_layer(collector):
"""
Handle AF 2.x compatibility for dataset -> asset terminology rename.

This is only called for AF 2.x where we need to provide asset-named methods
that wrap the underlying dataset methods.
"""
from functools import wraps

from airflow.lineage.hook import DatasetLineageInfo, HookLineage
Expand All @@ -55,9 +199,9 @@ def wrapper(*args, **kwargs):
collector.add_input_asset = rename_asset_kwargs_to_dataset_kwargs(collector.add_input_dataset)
collector.add_output_asset = rename_asset_kwargs_to_dataset_kwargs(collector.add_output_dataset)

def collected_assets_compat(collector) -> HookLineage:
def _compat_collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
lineage = collector.collected_datasets
lineage = self.collected_datasets
return HookLineage(
[
DatasetLineageInfo(dataset=item.dataset, count=item.count, context=item.context)
Expand All @@ -69,20 +213,30 @@ def collected_assets_compat(collector) -> HookLineage:
],
)

setattr(
collector.__class__,
"collected_assets",
property(lambda collector: collected_assets_compat(collector)),
)

type(collector).collected_assets = property(_compat_collected_assets)
return collector


def get_hook_lineage_collector():
# Dataset has been renamed as Asset in 3.0
if AIRFLOW_V_3_0_PLUS:
from airflow.lineage.hook import get_hook_lineage_collector
"""
Return a hook lineage collector with all required compatibility layers applied.

Compatibility is determined by inspecting the collector's available methods and
properties (duck typing), rather than relying on the Airflow version number.

Behavior by example:
Airflow 2: Collector is missing asset-based methods and `add_extra` - apply both layers.
Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` - apply single layer.
Airflow 3.2+: Collector has asset-based methods and `add_extra` support - no action required.
"""
from airflow.lineage.hook import get_hook_lineage_collector as get_global_collector

global_collector = get_global_collector()

if _lacks_asset_methods(global_collector):
global_collector = _add_asset_naming_compatibility_layer(global_collector)

return get_hook_lineage_collector()
if _lacks_add_extra_method(global_collector):
global_collector = _add_extra_polyfill(global_collector)

return _get_asset_compat_hook_lineage_collector()
return global_collector
Loading
Loading