Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ def on_task_instance_success(
self.log.debug("OpenLineage listener got notification about task instance success")

if isinstance(task_instance, TaskInstance):
# On AF3 we still get DB TaskInstance model when task instance state is changed manually
# (via UI or API). The listener is called on API server so we do not have task and dag models.
self._on_task_instance_manual_state_change(
ti=task_instance,
dagrun=task_instance.dag_run,
Expand Down Expand Up @@ -382,6 +384,12 @@ def on_task_instance_failed(
self.log.debug("OpenLineage listener got notification about task instance failure")

if isinstance(task_instance, TaskInstance):
# There are two cases where on AF3 we still get DB TaskInstance model:
# 1. when task instance state is changed manually (via UI or API, models.patch_task_instance
# endpoint). The listener is called on API server so we do not have task and dag models.
# 2. `process_executor_events` method on scheduler, where the external state change is handled
# https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
# In second case, we still should not run user code, but at least we have access to operator
self._on_task_instance_manual_state_change(
ti=task_instance,
dagrun=task_instance.dag_run,
Expand Down Expand Up @@ -645,6 +653,24 @@ def _on_task_instance_manual_state_change(
self.log.debug("`_on_task_instance_manual_state_change` was called with state: `%s`.", ti_state)
end_date = timezone.utcnow()

task = getattr(ti, "task") # on scheduler, we should have access to task
if task and is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return

if task and not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
ti.task_id,
)
return

@print_warning(self.log)
def on_state_change():
date = dagrun.logical_date or dagrun.run_after
Expand All @@ -662,23 +688,45 @@ def on_state_change():
map_index=ti.map_index,
)

data_interval_start = dagrun.data_interval_start
if isinstance(data_interval_start, datetime):
data_interval_start = data_interval_start.isoformat()
data_interval_end = dagrun.data_interval_end
if isinstance(data_interval_end, datetime):
data_interval_end = data_interval_end.isoformat()

dag_tags, owners, doc, doc_type = None, None, None, None
airflow_run_facet = {}
if task: # on scheduler, we should have access to task
doc, doc_type = get_task_documentation(task)
dag = getattr(task, "dag")
if dag:
if not doc:
doc, doc_type = get_dag_documentation(dag)

dag_tags = dag.tags
owners = [x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")]

airflow_run_facet = get_airflow_run_facet(dagrun, dag, ti, task, task_uuid)

adapter_kwargs = {
"run_id": task_uuid,
"job_name": get_job_name(ti),
"end_time": end_date.isoformat(),
"task": OperatorLineage(),
"nominal_start_time": None,
"nominal_end_time": None,
"tags": None,
"owners": None,
"job_description": None,
"job_description_type": None,
"nominal_start_time": data_interval_start,
"nominal_end_time": data_interval_end,
"tags": dag_tags,
"owners": owners,
"job_description": doc,
"job_description_type": doc_type,
"run_facets": {
**get_task_parent_run_facet(
parent_run_id=parent_run_id,
parent_job_name=ti.dag_id,
dr_conf=getattr(dagrun, "conf", {}),
),
**airflow_run_facet,
**get_airflow_debug_facet(),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ def _create_listener_and_task_instance(
listener, task_instance = _create_listener_and_task_instance()
# Now you can use listener and task_instance in your tests to simulate their interaction.
"""
from airflow.sdk.definitions.dag import DAG

if not runtime_ti:
# TaskInstance is used when on API server (when listener gets called about manual state change)
Expand All @@ -1097,15 +1098,27 @@ def _create_listener_and_task_instance(
)
else:
task_instance = TaskInstance(task=MagicMock(), dag_version_id=uuid7())

dag = DAG(
dag_id="dag_id_from_dag_not_ti",
description="Test DAG Description",
tags=["tag1", "tag2"],
)
task = EmptyOperator(
task_id="task_id_from_task_not_ti", dag=dag, owner="task_owner", doc_md="TASK Description"
)

task_instance.dag_run = DagRun()
task_instance.dag_run.dag_id = "dag_id_from_dagrun_and_not_ti"
task_instance.dag_run.run_id = "dag_run_run_id"
task_instance.dag_run.clear_number = 0
task_instance.dag_run.logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1)
task_instance.dag_run.run_after = timezone.datetime(2020, 1, 1, 1, 1, 1)
task_instance.dag_run.data_interval_start = timezone.datetime(2020, 1, 1, 1, 1, 1)
task_instance.dag_run.data_interval_end = timezone.datetime(2020, 1, 1, 1, 1, 1)
task_instance.dag_run.state = DagRunState.RUNNING
task_instance.task = None
task_instance.dag = None
task_instance.task = task # type: ignore[assignment] # For testing we'll avoid serialization
task_instance.dag = dag
task_instance.task_id = "task_id"
task_instance.dag_id = "dag_id"
task_instance.try_number = 1
Expand All @@ -1119,7 +1132,6 @@ def _create_listener_and_task_instance(
TaskInstance as SdkTaskInstance,
TIRunContext,
)
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance

dag = DAG(
Expand Down Expand Up @@ -1447,13 +1459,15 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty
@mock.patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.emit")
@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_debug_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet")
@mock.patch(
"airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call
)
def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_model(
self,
mock_get_task_parent_run_facet,
mock_get_airflow_run_facet,
mock_debug_facet,
mock_debug_mode,
mock_emit,
Expand All @@ -1467,6 +1481,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_
time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False)

listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False)
mock_get_airflow_run_facet.return_value = {"airflow": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}

Expand All @@ -1483,14 +1498,15 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_
job_name="dag_id.task_id",
run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1",
task=OperatorLineage(),
nominal_start_time=None,
nominal_end_time=None,
tags=None,
owners=None,
job_description=None,
job_description_type=None,
nominal_start_time="2020-01-01T01:01:01+00:00",
nominal_end_time="2020-01-01T01:01:01+00:00",
tags={"tag1", "tag2"},
owners=["task_owner"],
job_description="TASK Description",
job_description_type="text/markdown",
run_facets={
"parent": 4,
"airflow": 3,
"debug": "packages",
},
error=err,
Expand Down Expand Up @@ -1645,6 +1661,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta
time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False)

listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False)
delattr(task_instance, "task") # Test api server path, where task is not available
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}

Expand All @@ -1661,8 +1678,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta
job_name="dag_id.task_id",
run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1",
task=OperatorLineage(),
nominal_start_time=None,
nominal_end_time=None,
nominal_start_time="2020-01-01T01:01:01+00:00",
nominal_end_time="2020-01-01T01:01:01+00:00",
tags=None,
owners=None,
job_description=None,
Expand Down Expand Up @@ -1851,6 +1868,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta
time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False)

listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False)
delattr(task_instance, "task") # Test api server path, where task is not available
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}

Expand All @@ -1867,8 +1885,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta
job_name="dag_id.task_id",
run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1",
task=OperatorLineage(),
nominal_start_time=None,
nominal_end_time=None,
nominal_start_time="2020-01-01T01:01:01+00:00",
nominal_end_time="2020-01-01T01:01:01+00:00",
tags=None,
owners=None,
job_description=None,
Expand Down
Loading