Skip to content

Commit

Permalink
Add AirflowRun on COMPLETE/FAIL events (#40996)
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski authored Jul 28, 2024
1 parent a482d0f commit 4ab0183
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
9 changes: 5 additions & 4 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def start_task(
nominal_end_time: str | None,
owners: list[str],
task: OperatorLineage | None,
run_facets: dict[str, RunFacet] | None = None, # Custom run facets
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Emit openlineage event of type START.
Expand Down Expand Up @@ -243,7 +243,7 @@ def complete_task(
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
run_facets: dict[str, RunFacet] | None = None, # Custom run facets
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Emit openlineage event of type COMPLETE.
Expand All @@ -255,7 +255,7 @@ def complete_task(
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
:param run_facets: additional run facets
"""
run_facets = run_facets or {}
if task:
Expand Down Expand Up @@ -285,8 +285,8 @@ def fail_task(
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
run_facets: dict[str, RunFacet] | None = None, # Custom run facets
error: str | BaseException | None = None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Emit openlineage event of type FAIL.
Expand All @@ -300,6 +300,7 @@ def fail_task(
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
:param error: error
:param run_facets: additional run facets
"""
run_facets = run_facets or {}
if task:
Expand Down
10 changes: 8 additions & 2 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ def on_success():
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
run_facets=get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
run_facets={
**get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
},
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
Expand Down Expand Up @@ -330,8 +333,11 @@ def on_failure():
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
run_facets=get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
error=error,
run_facets={
**get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
},
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
Expand Down
5 changes: 4 additions & 1 deletion tests/providers/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter
from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowStateRunFacet
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowStateRunFacet,
)
from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
from airflow.utils.task_group import TaskGroup
from tests.test_utils.config import conf_vars
Expand Down
20 changes: 16 additions & 4 deletions tests/providers/openlineage/plugins/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,16 @@ def test_adapter_start_task_is_called_with_proper_arguments(

@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call)
def test_adapter_fail_task_is_called_with_proper_arguments(
mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter, mock_disabled
mock_get_job_name,
mock_get_user_provided_run_facets,
mock_get_airflow_run_facet,
mocked_adapter,
mock_disabled,
):
"""Tests that the 'fail_task' method of the OpenLineageAdapter is invoked with the correct arguments.
Expand All @@ -289,6 +294,7 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_disabled.return_value = False

err = ValueError("test")
Expand All @@ -305,18 +311,23 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
parent_run_id="execution_date.dag_id",
run_id="execution_date.dag_id.task_id.1",
task=listener.extractor_manager.extract_metadata(),
run_facets={"custom_user_facet": 2},
run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
**expected_err_kwargs,
)


@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call)
def test_adapter_complete_task_is_called_with_proper_arguments(
mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter, mock_disabled
mock_get_job_name,
mock_get_user_provided_run_facets,
mock_get_airflow_run_facet,
mocked_adapter,
mock_disabled,
):
"""Tests that the 'complete_task' method of the OpenLineageAdapter is called with the correct arguments.
Expand All @@ -338,6 +349,7 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_disabled.return_value = False

listener.on_task_instance_success(None, task_instance, None)
Expand All @@ -352,7 +364,7 @@ def mock_task_id(dag_id, task_id, try_number, execution_date):
parent_run_id="execution_date.dag_id",
run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
task=listener.extractor_manager.extract_metadata(),
run_facets={"custom_user_facet": 2},
run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
)


Expand Down

0 comments on commit 4ab0183

Please sign in to comment.