Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
get_dag_documentation,
get_dag_parent_run_facet,
get_job_name,
get_root_information_from_dagrun_conf,
get_task_documentation,
get_task_parent_run_facet,
get_user_provided_run_facets,
Expand Down Expand Up @@ -229,7 +228,7 @@ def on_running():
**get_task_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_name=dag.dag_id,
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
dr_conf=getattr(dagrun, "conf", {}),
),
**get_airflow_mapped_task_facet(task_instance),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
Expand Down Expand Up @@ -360,7 +359,7 @@ def on_success():
**get_task_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_name=dag.dag_id,
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
dr_conf=getattr(dagrun, "conf", {}),
),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**get_airflow_debug_facet(),
Expand Down Expand Up @@ -502,7 +501,7 @@ def on_failure():
**get_task_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_name=dag.dag_id,
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
dr_conf=getattr(dagrun, "conf", {}),
),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**get_airflow_debug_facet(),
Expand Down Expand Up @@ -557,7 +556,7 @@ def on_state_change():
**get_task_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_name=ti.dag_id,
**get_root_information_from_dagrun_conf(getattr(dagrun, "conf", {})),
dr_conf=getattr(dagrun, "conf", {}),
),
**get_airflow_debug_facet(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,35 @@ def get_task_parent_run_facet(
root_parent_run_id: str | None = None,
root_parent_job_name: str | None = None,
root_parent_job_namespace: str | None = None,
dr_conf: dict | None = None,
) -> dict[str, Any]:
"""Retrieve the parent run facet."""
all_root_info = (root_parent_run_id, root_parent_job_namespace, root_parent_job_name)
# If not all root identifiers are provided explicitly, try to get them from dagrun conf.
if not all(all_root_info):
# If some but not all root identifiers are provided warn and do not use any of them, we need all.
if any(all_root_info):
log.warning(
"Incomplete root OpenLineage information provided. "
"No root information will be used. Found values: "
"root_parent_run_id='%s', root_parent_job_namespace='%s', root_parent_job_name='%s'.",
root_parent_run_id,
root_parent_job_namespace,
root_parent_job_name,
)
root_parent_run_id, root_parent_job_namespace, root_parent_job_name = None, None, None
elif dr_conf:
# Check for root identifiers in dagrun conf
if root_info := get_root_information_from_dagrun_conf(dr_conf):
root_parent_run_id = root_info["root_parent_run_id"]
root_parent_job_namespace = root_info["root_parent_job_namespace"]
root_parent_job_name = root_info["root_parent_job_name"]
# If not present, check for parent identifiers in dagrun conf and use them as root
elif parent_info := get_parent_information_from_dagrun_conf(dr_conf):
root_parent_run_id = parent_info["parent_run_id"]
root_parent_job_namespace = parent_info["parent_job_namespace"]
root_parent_job_name = parent_info["parent_job_name"]

return _get_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_namespace=parent_job_namespace,
Expand All @@ -205,50 +232,50 @@ def get_root_information_from_dagrun_conf(dr_conf: dict | None) -> dict[str, str
log.debug("No 'openlineage' data found in DAG run config.")
return {}

root_run_id = ol_data.get("rootParentRunId", "")
root_namespace = ol_data.get("rootParentJobNamespace", "")
root_name = ol_data.get("rootParentJobName", "")
root_parent_run_id = ol_data.get("rootParentRunId", "")
root_parent_job_namespace = ol_data.get("rootParentJobNamespace", "")
root_parent_job_name = ol_data.get("rootParentJobName", "")

all_root_info = (root_run_id, root_namespace, root_name)
all_root_info = (root_parent_run_id, root_parent_job_namespace, root_parent_job_name)
if not all(all_root_info):
if any(all_root_info):
log.warning(
"Incomplete root OpenLineage information in DAG run config. "
"No root information will be used. Found values: "
"rootParentRunId='%s', rootParentJobNamespace='%s', rootParentJobName='%s'.",
root_run_id,
root_namespace,
root_name,
root_parent_run_id,
root_parent_job_namespace,
root_parent_job_name,
)
else:
log.debug("No 'openlineage' root information found in DAG run config.")
return {}

try: # Validate that runId is correct UUID
parent_run.RootRun(runId=root_run_id)
parent_run.RootRun(runId=root_parent_run_id)
except ValueError:
log.warning(
"Invalid OpenLineage rootParentRunId '%s' in DAG run config - expected a valid UUID.",
root_run_id,
root_parent_run_id,
)
return {}

log.debug(
"Extracted valid root OpenLineage identifiers from DAG run config: "
"rootParentRunId='%s', rootParentJobNamespace='%s', rootParentJobName='%s'.",
root_run_id,
root_namespace,
root_name,
root_parent_run_id,
root_parent_job_namespace,
root_parent_job_name,
)
return {
"root_parent_run_id": root_run_id,
"root_parent_job_namespace": root_namespace,
"root_parent_job_name": root_name,
"root_parent_run_id": root_parent_run_id,
"root_parent_job_namespace": root_parent_job_namespace,
"root_parent_job_name": root_parent_job_name,
}


def get_dag_parent_run_facet(dr_conf: dict | None) -> dict[str, parent_run.ParentRunFacet]:
"""Build the OpenLineage parent run facet from a DAG run config."""
def get_parent_information_from_dagrun_conf(dr_conf: dict | None) -> dict[str, str]:
"""Extract parent run and job information from a DAG run config."""
ol_data = _get_openlineage_data_from_dagrun_conf(dr_conf)
if not ol_data:
log.debug("No 'openlineage' data found in DAG run config.")
Expand Down Expand Up @@ -290,8 +317,46 @@ def get_dag_parent_run_facet(dr_conf: dict | None) -> dict[str, parent_run.Paren
parent_job_name,
)

root_info = get_root_information_from_dagrun_conf(dr_conf)
if root_info and all(root_info.values()):
return {
"parent_run_id": parent_run_id,
"parent_job_namespace": parent_job_namespace,
"parent_job_name": parent_job_name,
}


def get_dag_parent_run_facet(dr_conf: dict | None) -> dict[str, parent_run.ParentRunFacet]:
"""
Build the OpenLineage parent run facet from a DAG run configuration.

This function extracts parent run identifiers - run ID, job namespace, and job name -
from the DAG run configuration to construct an OpenLineage `ParentRunFacet`. It requires
a complete set of parent identifiers to proceed; if some but not all are present, or if
the run ID is invalid, the function returns an empty dictionary.

When valid parent identifiers are found, it also attempts to retrieve corresponding
root identifiers using `get_root_information_from_dagrun_conf`, which may fall back
to the parent identifiers if no explicit root data is available. The resulting facet
links both the immediate parent and root lineage information for the run.

Args:
dr_conf: The DAG run configuration dictionary.

Returns:
A dictionary containing a single entry mapping the facet name to the constructed
`ParentRunFacet`. Returns an empty dictionary if the configuration does not contain
a complete or valid set of parent identifiers.
"""
ol_data = _get_openlineage_data_from_dagrun_conf(dr_conf)
if not ol_data:
log.debug("No 'openlineage' data found in DAG run config.")
return {}

parent_info = get_parent_information_from_dagrun_conf(dr_conf)
if not parent_info: # Validation and logging is done in the above function
return {}

root_parent_run_id, root_parent_job_namespace, root_parent_job_name = None, None, None
if root_info := get_root_information_from_dagrun_conf(dr_conf):
root_parent_run_id = root_info["root_parent_run_id"]
root_parent_job_namespace = root_info["root_parent_job_namespace"]
root_parent_job_name = root_info["root_parent_job_name"]
Expand All @@ -300,16 +365,12 @@ def get_dag_parent_run_facet(dr_conf: dict | None) -> dict[str, parent_run.Paren
"Missing OpenLineage root identifiers in DAG run config, "
"parent identifiers will be used as root instead."
)
root_parent_run_id, root_parent_job_namespace, root_parent_job_name = (
parent_run_id,
parent_job_namespace,
parent_job_name,
)

# Function already uses parent as root if root is missing, no need to explicitly pass it
return _get_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_namespace=parent_job_namespace,
parent_job_name=parent_job_name,
parent_run_id=parent_info["parent_run_id"],
parent_job_namespace=parent_info["parent_job_namespace"],
parent_job_name=parent_info["parent_job_name"],
root_parent_run_id=root_parent_run_id,
root_parent_job_namespace=root_parent_job_namespace,
root_parent_job_name=root_parent_job_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,9 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_

listener.on_task_instance_failed(previous_state=None, task_instance=task_instance, error=err)
mock_get_task_parent_run_facet.assert_called_once_with(
parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", parent_job_name=task_instance.dag_id
parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0",
parent_job_name=task_instance.dag_id,
dr_conf={},
)
expected_args = dict(
end_time="2023-01-03T13:01:01+00:00",
Expand Down Expand Up @@ -1643,7 +1645,9 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta
calls = listener.adapter.complete_task.call_args_list
assert len(calls) == 1
mock_get_task_parent_run_facet.assert_called_once_with(
parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", parent_job_name=task_instance.dag_id
parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0",
parent_job_name=task_instance.dag_id,
dr_conf={},
)
expected_args = dict(
end_time="2023-01-03T13:01:01+00:00",
Expand Down
Loading