Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _on_task_instance_running(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
task_instance.task_id,
)
return

Expand All @@ -170,14 +170,14 @@ def on_running():
clear_number = dagrun.clear_number

parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
dag_id=task_instance.dag_id,
logical_date=date,
clear_number=clear_number,
)

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
try_number=task_instance.try_number,
logical_date=date,
map_index=task_instance.map_index,
Expand All @@ -199,7 +199,7 @@ def on_running():

redacted_event = self.adapter.start_task(
run_id=task_uuid,
job_name=get_job_name(task),
job_name=get_job_name(task_instance),
job_description=dag.description,
event_time=start_date.isoformat(),
nominal_start_time=data_interval_start,
Expand Down Expand Up @@ -278,7 +278,7 @@ def _on_task_instance_success(self, task_instance: RuntimeTaskInstance, dag, dag
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
task_instance.task_id,
)
return

Expand All @@ -289,14 +289,14 @@ def on_success():
date = dagrun.run_after

parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
dag_id=task_instance.dag_id,
logical_date=date,
clear_number=dagrun.clear_number,
)

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
try_number=task_instance.try_number,
logical_date=date,
map_index=task_instance.map_index,
Expand All @@ -321,7 +321,7 @@ def on_success():

redacted_event = self.adapter.complete_task(
run_id=task_uuid,
job_name=get_job_name(task),
job_name=get_job_name(task_instance),
end_time=end_date.isoformat(),
task=task_metadata,
# If task owner is default ("airflow"), use DAG owner instead that may have more details
Expand Down Expand Up @@ -409,7 +409,7 @@ def _on_task_instance_failed(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
task_instance.task_id,
)
return

Expand All @@ -420,14 +420,14 @@ def on_failure():
date = dagrun.run_after

parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
dag_id=task_instance.dag_id,
logical_date=date,
clear_number=dagrun.clear_number,
)

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
try_number=task_instance.try_number,
logical_date=date,
map_index=task_instance.map_index,
Expand All @@ -452,7 +452,7 @@ def on_failure():

redacted_event = self.adapter.fail_task(
run_id=task_uuid,
job_name=get_job_name(task),
job_name=get_job_name(task_instance),
end_time=end_date.isoformat(),
task=task_metadata,
error=error,
Expand Down Expand Up @@ -489,13 +489,13 @@ def _on_task_instance_manual_state_change(
def on_state_change():
date = dagrun.logical_date or dagrun.run_after
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dagrun.dag_id,
dag_id=ti.dag_id,
logical_date=date,
clear_number=dagrun.clear_number,
)

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dagrun.dag_id,
dag_id=ti.dag_id,
task_id=ti.task_id,
try_number=ti.try_number,
logical_date=date,
Expand All @@ -507,6 +507,10 @@ def on_state_change():
"job_name": get_job_name(ti),
"end_time": end_date.isoformat(),
"task": OperatorLineage(),
"nominal_start_time": None,
"nominal_end_time": None,
"tags": None,
"owners": None,
"run_facets": {
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=ti.dag_id),
**get_airflow_debug_facet(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
SecretsMasker,
should_hide_value_for_key,
)
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
else:
try:
Expand Down Expand Up @@ -127,7 +128,7 @@ def get_operator_class(task: BaseOperator) -> type:
return task.__class__


def get_job_name(task: TaskInstance) -> str:
def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"


Expand Down
Loading