Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,22 @@ def lineage_root_run_id(task_instance: TaskInstance):
return OpenLineageAdapter.build_dag_run_id(
dag_id=task_instance.dag_id,
logical_date=_get_logical_date(task_instance),
clear_number=task_instance.dag_run.clear_number,
clear_number=_get_dag_run_clear_number(task_instance),
)

def _get_dag_run_clear_number(task_instance: TaskInstance):
# todo: remove when min airflow version >= 3.0
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
else:
dag_run = context["dag_run"]
return dag_run.clear_number
return task_instance.dag_run.clear_number




def _get_logical_date(task_instance):
# todo: remove when min airflow version >= 3.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from datetime import datetime, timezone
from unittest import mock
from unittest.mock import patch

import pytest

Expand All @@ -27,6 +28,9 @@
lineage_job_name,
lineage_job_namespace,
lineage_parent_id,
lineage_root_job_name,
lineage_root_parent_id,
lineage_root_run_id,
lineage_run_id,
)

Expand Down Expand Up @@ -108,3 +112,24 @@ def test_lineage_parent_id(mock_run_id):
actual = lineage_parent_id(task_instance)
expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"
assert actual == expected


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
"""Test lineage_root_run_id with real RuntimeTaskInstance object doesn't throw AttributeError."""
from airflow.sdk.bases.operator import BaseOperator

task = BaseOperator(task_id="test_task")

runtime_ti = create_runtime_ti(
task=task,
dag_id="test_dag",
run_id="test_run_id",
)

# Explicitly test that the function doesn't throw an AttributeError
# This was the original issue: AttributeError: 'RuntimeTaskInstance' object has no attribute 'dag_run'
try:
assert lineage_root_run_id(runtime_ti) is not None
except AttributeError as e:
pytest.fail(f"lineage_root_run_id should not throw AttributeError with RuntimeTaskInstance: {e}")