Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow-core/docs/administration-and-deployment/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This functionality helps you understand how data flows throughout your Airflow p
A global instance of ``HookLineageCollector`` serves as the central hub for collecting lineage information.
Hooks can send details about assets they interact with to this collector.
The collector then uses this data to construct AIP-60 compliant Assets, a standard format for describing assets.
Hooks can also send arbitrary non-asset related data to this collector as shown in the example below.

.. code-block:: python

Expand All @@ -40,6 +41,7 @@ The collector then uses this data to construct AIP-60 compliant Assets, a standa
collector = get_hook_lineage_collector()
collector.add_input_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/out"})
collector.add_extra(self, key="external_system_job_id", value="some_id_123")

Lineage data collected by the ``HookLineageCollector`` can be accessed using an instance of ``HookLineageReader``,
which is registered in an Airflow plugin.
Expand Down
107 changes: 90 additions & 17 deletions airflow-core/src/airflow/lineage/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import hashlib
import json
from collections import defaultdict
from typing import TYPE_CHECKING, TypeAlias
from typing import TYPE_CHECKING, Any, TypeAlias

import attr

Expand All @@ -43,6 +43,9 @@
# Input assets and output assets are collected separately.
MAX_COLLECTED_ASSETS = 100

# Maximum number of extra metadata that can be collected in a single hook execution.
MAX_COLLECTED_EXTRA = 200


@attr.define
class AssetLineageInfo:
Expand All @@ -58,6 +61,22 @@ class AssetLineageInfo:
context: LineageContext


@attr.define
class ExtraLineageInfo:
"""
Holds lineage information for arbitrary non-asset metadata.

This class represents additional lineage context captured during a hook execution that is not
associated with a specific asset. It includes the metadata payload itself, the count of
how many times it has been encountered, and the context in which it was encountered.
"""

key: str
value: Any
count: int
context: LineageContext


@attr.define
class HookLineage:
"""
Expand All @@ -70,6 +89,7 @@ class HookLineage:

inputs: list[AssetLineageInfo] = attr.ib(factory=list)
outputs: list[AssetLineageInfo] = attr.ib(factory=list)
extra: list[ExtraLineageInfo] = attr.ib(factory=list)


class HookLineageCollector(LoggingMixin):
Expand All @@ -88,19 +108,44 @@ def __init__(self, **kwargs):
self._input_counts: dict[str, int] = defaultdict(int)
self._output_counts: dict[str, int] = defaultdict(int)
self._asset_factories = ProvidersManager().asset_factories
self._extra_counts: dict[str, int] = defaultdict(int)
self._extra: dict[str, tuple[str, Any, LineageContext]] = {}

@staticmethod
def _generate_hash(value: Any) -> str:
"""
Generate a deterministic MD5 hash for the given value.

If the value is dictionary it's JSON-serialized with `sort_keys=True`, and unsupported types
are converted to strings (`default=str`) to favor producing a hash rather than raising an error,
even if that means a less precise encoding.
"""
value_str = json.dumps(value, sort_keys=True, default=str)
value_hash = hashlib.md5(value_str.encode()).hexdigest()
return value_hash

def _generate_key(self, asset: Asset, context: LineageContext) -> str:
def _generate_asset_entry_id(self, asset: Asset, context: LineageContext) -> str:
"""
Generate a unique key for the given asset and context.

This method creates a unique key by combining the asset URI, the MD5 hash of the asset's extra
dictionary, and the LineageContext's unique identifier. This ensures that the generated key is
dictionary, and the LineageContext's unique identifier. This ensures that the generated entry_id is
unique for each combination of asset and context.
"""
extra_str = json.dumps(asset.extra, sort_keys=True)
extra_hash = hashlib.md5(extra_str.encode()).hexdigest()
extra_hash = self._generate_hash(value=asset.extra)
return f"{asset.uri}_{extra_hash}_{id(context)}"

def _generate_extra_entry_id(self, key: str, value: Any, context: LineageContext) -> str:
"""
Generate a unique key for the given extra lineage information and context.

This method creates a unique key by combining the extra information key, an MD5 hash of the value,
and the LineageContext's unique identifier. This ensures that the generated entry_id is unique
for each combination of extra lineage information and context.
"""
value_hash = self._generate_hash(value=value)
return f"{key}_{value_hash}_{id(context)}"

def create_asset(
self,
*,
Expand Down Expand Up @@ -173,10 +218,10 @@ def add_input_asset(
scheme=scheme, uri=uri, name=name, group=group, asset_kwargs=asset_kwargs, asset_extra=asset_extra
)
if asset:
key = self._generate_key(asset, context)
if key not in self._inputs:
self._inputs[key] = (asset, context)
self._input_counts[key] += 1
entry_id = self._generate_asset_entry_id(asset, context)
if entry_id not in self._inputs:
self._inputs[entry_id] = (asset, context)
self._input_counts[entry_id] += 1
if len(self._inputs) == MAX_COLLECTED_ASSETS:
self.log.warning("Maximum number of asset inputs exceeded. Skipping subsequent inputs.")

Expand All @@ -198,31 +243,55 @@ def add_output_asset(
scheme=scheme, uri=uri, name=name, group=group, asset_kwargs=asset_kwargs, asset_extra=asset_extra
)
if asset:
key = self._generate_key(asset, context)
if key not in self._outputs:
self._outputs[key] = (asset, context)
self._output_counts[key] += 1
entry_id = self._generate_asset_entry_id(asset=asset, context=context)
if entry_id not in self._outputs:
self._outputs[entry_id] = (asset, context)
self._output_counts[entry_id] += 1
if len(self._outputs) == MAX_COLLECTED_ASSETS:
self.log.warning("Maximum number of asset outputs exceeded. Skipping subsequent inputs.")

def add_extra(
self,
context: LineageContext,
key: str,
value: Any,
):
"""Add the extra information and its corresponding hook execution context to the collector."""
if len(self._extra) >= MAX_COLLECTED_EXTRA:
self.log.debug("Maximum number of extra exceeded. Skipping.")
return
if not key or not value:
self.log.debug("Missing required parameter: both 'key' and 'value' must be provided.")
return
entry_id = self._generate_extra_entry_id(key=key, value=value, context=context)
if entry_id not in self._extra:
self._extra[entry_id] = (key, value, context)
self._extra_counts[entry_id] += 1
if len(self._extra) == MAX_COLLECTED_EXTRA:
self.log.warning("Maximum number of extra exceeded. Skipping subsequent inputs.")

@property
def collected_assets(self) -> HookLineage:
"""Get the collected hook lineage information."""
return HookLineage(
[
inputs=[
AssetLineageInfo(asset=asset, count=self._input_counts[key], context=context)
for key, (asset, context) in self._inputs.items()
],
[
outputs=[
AssetLineageInfo(asset=asset, count=self._output_counts[key], context=context)
for key, (asset, context) in self._outputs.items()
],
extra=[
ExtraLineageInfo(key=key, value=value, count=self._extra_counts[count_key], context=context)
for count_key, (key, value, context) in self._extra.items()
],
)

@property
def has_collected(self) -> bool:
"""Check if any assets have been collected."""
return len(self._inputs) != 0 or len(self._outputs) != 0
return bool(self._inputs or self._outputs or self._extra)


class NoOpCollector(HookLineageCollector):
Expand All @@ -238,20 +307,24 @@ def add_input_asset(self, *_, **__):
def add_output_asset(self, *_, **__):
pass

def add_extra(self, *_, **__):
pass

@property
def collected_assets(
self,
) -> HookLineage:
self.log.debug(
"Data lineage tracking is disabled. Register a hook lineage reader to start tracking hook lineage."
)
return HookLineage([], [])
return HookLineage([], [], [])


class HookLineageReader(LoggingMixin):
"""Class used to retrieve the hook lineage information collected by HookLineageCollector."""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.lineage_collector = get_hook_lineage_collector()

def retrieve_hook_lineage(self) -> HookLineage:
Expand Down
Loading