Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from airflow.providers.openlineage.utils.utils import get_job_name, get_root_information_from_dagrun_conf
from airflow.providers.openlineage.utils.utils import (
get_job_name,
get_parent_information_from_dagrun_conf,
get_root_information_from_dagrun_conf,
)
from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
Expand Down Expand Up @@ -136,7 +140,13 @@ def lineage_root_job_namespace(task_instance: TaskInstance):

def _get_ol_root_id(id_key: str, task_instance: TaskInstance) -> str | None:
dr_conf = _get_dag_run_conf(task_instance=task_instance)
# Check DagRun conf for root info
ol_root_info = get_root_information_from_dagrun_conf(dr_conf=dr_conf)
if ol_root_info and ol_root_info.get(id_key):
return ol_root_info[id_key]
# Then check DagRun conf for parent into that is used as root in case explicit root is missing
id_key = id_key.replace("root_", "")
ol_root_info = get_parent_information_from_dagrun_conf(dr_conf=dr_conf)
if ol_root_info and ol_root_info.get(id_key):
return ol_root_info[id_key]
return None
Expand Down
306 changes: 170 additions & 136 deletions providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,30 @@ def test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
pytest.fail(f"lineage_root_run_id should not throw AttributeError with RuntimeTaskInstance: {e}")


@pytest.mark.parametrize(
"conf",
(
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
},
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
"parentRunId": "33333333-3333-3333-3333-333333333333",
"parentJobNamespace": "parentns",
"parentJobName": "parentjob",
}
},
),
)
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
def test_lineage_root_macros_use_root_from_conf_af3(create_runtime_ti, conf):
from airflow.providers.common.compat.sdk import BaseOperator

task = BaseOperator(task_id="test_task")
Expand All @@ -143,15 +165,19 @@ def test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
task=task,
dag_id="test_dag",
run_id="test_run_id",
conf=None,
conf=conf,
)

result = lineage_root_run_id(runtime_ti)
assert result == "01937fbb-4680-70b3-b49b-1de6b041527a"
root_run_id = lineage_root_run_id(runtime_ti)
root_job_name = lineage_root_job_name(runtime_ti)
root_job_namespace = lineage_root_job_namespace(runtime_ti)
assert root_run_id == "22222222-2222-2222-2222-222222222222"
assert root_job_name == "rootjob"
assert root_job_namespace == "rootns"


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
def test_lineage_root_macros_use_parent_from_conf_when_root_missing_af3(create_runtime_ti):
from airflow.providers.common.compat.sdk import BaseOperator

task = BaseOperator(task_id="test_task")
Expand All @@ -162,60 +188,57 @@ def test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
run_id="test_run_id",
conf={
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
"parentRunId": "33333333-3333-3333-3333-333333333333",
"parentJobNamespace": "parentns",
"parentJobName": "parentjob",
}
},
)

result = lineage_root_run_id(runtime_ti)
assert result == "22222222-2222-2222-2222-222222222222"


def test_lineage_root_run_id_without_conf_af2():
date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
conf = {}
dag_run = mock.MagicMock(run_id="run_id", conf=conf)
dag_run.logical_date = date
dag_run.clear_number = 1
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
dag_run=dag_run,
logical_date=date,
try_number=1,
)

call_result1 = lineage_root_run_id(task_instance)
call_result2 = lineage_root_run_id(task_instance)

# random part value does not matter, it just has to be the same for the same TaskInstance
assert call_result1 == call_result2
assert call_result1 == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"


def test_lineage_root_run_id_with_conf_af2():
conf = {
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
}
task_instance = mock.MagicMock(
dag_run=mock.MagicMock(conf=conf),
)

call_result1 = lineage_root_run_id(task_instance)
call_result2 = lineage_root_run_id(task_instance)

assert call_result1 == call_result2
assert call_result1 == "22222222-2222-2222-2222-222222222222"


root_run_id = lineage_root_run_id(runtime_ti)
root_job_name = lineage_root_job_name(runtime_ti)
root_job_namespace = lineage_root_job_namespace(runtime_ti)
assert root_run_id == "33333333-3333-3333-3333-333333333333"
assert root_job_name == "parentjob"
assert root_job_namespace == "parentns"


@pytest.mark.parametrize(
"conf",
(
{},
None,
{"some": "other"},
{"openlineage": {}},
{"openlineage": "some"},
{"openlineage": {"rootParentRunId": "22222222-2222-2222-2222-222222222222"}},
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobName": "rootjob",
}
},
{
"openlineage": {
"parentRunId": "33333333-3333-3333-3333-333333333333",
}
},
{
"openlineage": {
"parentRunId": "33333333-3333-3333-3333-333333333333",
"parentJobName": "parentjob",
}
},
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"parentRunId": "33333333-3333-3333-3333-333333333333",
}
},
),
)
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
def test_lineage_root_macros_use_dagrun_info_when_missing_or_invalid_conf_af3(create_runtime_ti, conf):
from airflow.providers.common.compat.sdk import BaseOperator

task = BaseOperator(task_id="test_task")
Expand All @@ -224,111 +247,122 @@ def test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
task=task,
dag_id="test_dag",
run_id="test_run_id",
conf=None,
conf=conf,
)

result = lineage_root_job_name(runtime_ti)
assert result == "test_dag"
root_run_id = lineage_root_run_id(runtime_ti)
root_job_name = lineage_root_job_name(runtime_ti)
root_job_namespace = lineage_root_job_namespace(runtime_ti)
assert root_run_id == "01937fbb-4680-70b3-b49b-1de6b041527a"
assert root_job_name == "test_dag"
assert root_job_namespace == _DAG_NAMESPACE


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_job_name_with_conf_af3(create_runtime_ti):
from airflow.providers.common.compat.sdk import BaseOperator

task = BaseOperator(task_id="test_task")

runtime_ti = create_runtime_ti(
task=task,
dag_id="test_dag",
run_id="test_run_id",
conf={
@pytest.mark.parametrize(
"conf",
(
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
},
)

result = lineage_root_job_name(runtime_ti)
assert result == "rootjob"


def test_lineage_root_job_name_without_conf_af2():
task_instance = mock.MagicMock(
dag_id="dag_id",
dag_run=mock.MagicMock(conf={}),
)
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
"parentRunId": "33333333-3333-3333-3333-333333333333",
"parentJobNamespace": "parentns",
"parentJobName": "parentjob",
}
},
),
)
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
def test_lineage_root_macros_use_root_from_conf_af2(conf):
task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))

result = lineage_root_job_name(task_instance)
assert result == "dag_id"
root_run_id = lineage_root_run_id(task_instance)
root_job_name = lineage_root_job_name(task_instance)
root_job_namespace = lineage_root_job_namespace(task_instance)
assert root_run_id == "22222222-2222-2222-2222-222222222222"
assert root_job_name == "rootjob"
assert root_job_namespace == "rootns"


def test_lineage_root_job_name_with_conf_af2():
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
def test_lineage_root_macros_use_parent_from_conf_when_root_missing_af2():
conf = {
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
"parentRunId": "33333333-3333-3333-3333-333333333333",
"parentJobNamespace": "parentns",
"parentJobName": "parentjob",
}
}
task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))

result = lineage_root_job_name(task_instance)
assert result == "rootjob"


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_job_namespace_no_conf_af3(create_runtime_ti):
from airflow.providers.common.compat.sdk import BaseOperator

task = BaseOperator(task_id="test_task")

runtime_ti = create_runtime_ti(task=task, dag_id="test_dag", run_id="test_run_id", conf=None)

result = lineage_root_job_namespace(runtime_ti)
assert result == _DAG_NAMESPACE


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 3.0+")
def test_lineage_root_job_namespace_with_conf_af3(create_runtime_ti):
from airflow.providers.common.compat.sdk import BaseOperator

task = BaseOperator(task_id="test_task")

runtime_ti = create_runtime_ti(
task=task,
dag_id="test_dag",
run_id="test_run_id",
conf={
root_run_id = lineage_root_run_id(task_instance)
root_job_name = lineage_root_job_name(task_instance)
root_job_namespace = lineage_root_job_namespace(task_instance)
assert root_run_id == "33333333-3333-3333-3333-333333333333"
assert root_job_name == "parentjob"
assert root_job_namespace == "parentns"


@pytest.mark.parametrize(
"conf",
(
{},
None,
{"some": "other"},
{"openlineage": {}},
{"openlineage": "some"},
{"openlineage": {"rootParentRunId": "22222222-2222-2222-2222-222222222222"}},
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
},
{
"openlineage": {
"parentRunId": "33333333-3333-3333-3333-333333333333",
}
},
{
"openlineage": {
"parentRunId": "33333333-3333-3333-3333-333333333333",
"parentJobName": "parentjob",
}
},
{
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"parentRunId": "33333333-3333-3333-3333-333333333333",
}
},
),
)
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
def test_lineage_root_macros_use_dagrun_info_when_missing_or_invalid_conf_af2(conf):
date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
conf = {}
dag_run = mock.MagicMock(run_id="run_id", conf=conf)
dag_run.logical_date = date
dag_run.clear_number = 1
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
dag_run=dag_run,
logical_date=date,
try_number=1,
)

result = lineage_root_job_namespace(runtime_ti)
assert result == "rootns"


def test_lineage_root_job_namespace_without_conf_af2():
task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf={}))

result = lineage_root_job_namespace(task_instance)
assert result == _DAG_NAMESPACE


def test_lineage_root_job_namespace_with_conf_af2():
conf = {
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
}
task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))

result = lineage_root_job_namespace(task_instance)
assert result == "rootns"
root_run_id = lineage_root_run_id(task_instance)
root_job_name = lineage_root_job_name(task_instance)
root_job_namespace = lineage_root_job_namespace(task_instance)
assert root_run_id == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
assert root_job_name == "dag_id"
assert root_job_namespace == _DAG_NAMESPACE