Skip to content

Commit

Permalink
feat: add debug facet to all OpenLineage events (#41217)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda authored Aug 12, 2024
1 parent bf64cb6 commit d12eb43
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 16 deletions.
6 changes: 6 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,9 @@ def execution_timeout() -> int:
def include_full_task_info() -> bool:
"""[openlineage] include_full_task_info."""
return conf.getboolean(_CONFIG_SECTION, "include_full_task_info", fallback="False")


@cache
def debug_mode() -> bool:
"""[openlineage] debug_mode."""
return conf.getboolean(_CONFIG_SECTION, "debug_mode", fallback="False")
30 changes: 30 additions & 0 deletions airflow/providers/openlineage/facets/AirflowDebugRunFacet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"AirflowDebugRunFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"packages": {
"description": "The names and versions of all installed Python packages.",
"type": "object",
"additionalProperties": true
}
},
"required": ["packages"]
}
],
"type": "object"
}
},
"type": "object",
"properties": {
"debug": {
"$ref": "#/$defs/AirflowDebugRunFacet"
}
}
}
6 changes: 4 additions & 2 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_state_run_facet,
)
from airflow.stats import Stats
Expand Down Expand Up @@ -361,7 +362,7 @@ def dag_started(
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets=get_airflow_dag_run_facet(dag_run),
run_facets={**get_airflow_dag_run_facet(dag_run), **get_airflow_debug_facet()},
),
inputs=[],
outputs=[],
Expand All @@ -385,7 +386,7 @@ def dag_success(self, dag_run: DagRun, msg: str):
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
),
facets={**get_airflow_state_run_facet(dag_run)},
facets={**get_airflow_state_run_facet(dag_run), **get_airflow_debug_facet()},
),
inputs=[],
outputs=[],
Expand Down Expand Up @@ -414,6 +415,7 @@ def dag_failed(self, dag_run: DagRun, msg: str):
message=msg, programmingLanguage="python"
),
**get_airflow_state_run_facet(dag_run),
**get_airflow_debug_facet(),
},
),
inputs=[],
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/plugins/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ class AirflowDagRunFacet(RunFacet):
dagRun: dict


@define
class AirflowDebugRunFacet(RunFacet):
"""Airflow Debug run facet."""

packages: dict


@define
class UnknownOperatorInstance(RedactMixin):
"""
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
IS_AIRFLOW_2_10_OR_HIGHER,
get_airflow_debug_facet,
get_airflow_job_facet,
get_airflow_mapped_task_facet,
get_airflow_run_facet,
Expand Down Expand Up @@ -122,6 +123,9 @@ def on_task_instance_running(
)
return

# Needs to be calculated outside of inner method so that it gets cached for usage in fork processes
debug_facet = get_airflow_debug_facet()

@print_warning(self.log)
def on_running():
# that's a workaround to detect task running from deferred state
Expand Down Expand Up @@ -166,6 +170,7 @@ def on_running():
**get_user_provided_run_facets(task_instance, TaskInstanceState.RUNNING),
**get_airflow_mapped_task_facet(task_instance),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**debug_facet,
},
)
Stats.gauge(
Expand Down Expand Up @@ -237,6 +242,7 @@ def on_success():
run_facets={
**get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**get_airflow_debug_facet(),
},
)
Stats.gauge(
Expand Down Expand Up @@ -336,6 +342,7 @@ def on_failure():
run_facets={
**get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**get_airflow_debug_facet(),
},
)
Stats.gauge(
Expand Down
8 changes: 8 additions & 0 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,11 @@ config:
example: ~
type: boolean
version_added: 1.10.0
debug_mode:
description: |
If true, OpenLineage events will include information useful for debugging - potentially
containing large fields e.g. all installed packages and their versions.
default: "False"
example: ~
type: boolean
version_added: 1.11.0
24 changes: 24 additions & 0 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import re
from contextlib import redirect_stdout, suppress
from functools import wraps
from importlib import metadata
from io import StringIO
from typing import TYPE_CHECKING, Any, Callable, Iterable

Expand All @@ -38,6 +39,7 @@
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowDebugRunFacet,
AirflowJobFacet,
AirflowMappedTaskRunFacet,
AirflowRunFacet,
Expand Down Expand Up @@ -378,6 +380,28 @@ def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, RunFacet]:
}


@conf.cache
def _get_all_packages_installed() -> dict[str, str]:
"""
Retrieve a dictionary of all installed packages and their versions.
This operation involves scanning the system's installed packages, which can be a heavy operation.
It is recommended to cache the result to avoid repeated, expensive lookups.
"""
return {dist.metadata["Name"]: dist.version for dist in metadata.distributions()}


def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
if not conf.debug_mode():
return {}
log.warning("OpenLineage debug_mode is enabled. Be aware that this may log and emit extensive details.")
return {
"debug": AirflowDebugRunFacet(
packages=_get_all_packages_installed(),
)
}


def get_airflow_run_facet(
dag_run: DagRun,
dag: DAG,
Expand Down
21 changes: 17 additions & 4 deletions docs/apache-airflow-providers-openlineage/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Authors of tests need to remember the condition of calling different OL methods
``get_openlineage_facets_on_start`` is called before ``execute``, and as such, must not depend on values
that are set there.

See :ref:`local_troubleshooting:openlineage` for details on how to troubleshoot OpenLineage locally.
See :ref:`troubleshooting:openlineage` for details on how to troubleshoot OpenLineage locally.

There is no existing framework for system testing OpenLineage integration, but the easiest way it can be achieved is
by comparing emitted events (f.e. with ``FileTransport``) against expected ones.
Expand Down Expand Up @@ -299,7 +299,7 @@ creating a gap in pipeline observability.
Even with unit tests, an Extractor may still not be operating as expected.
The easiest way to tell if data isn't coming through correctly is if the UI elements are not showing up correctly in the Lineage tab.

See :ref:`local_troubleshooting:openlineage` for details on how to troubleshoot OpenLineage locally.
See :ref:`troubleshooting:openlineage` for details on how to troubleshoot OpenLineage locally.

Example
^^^^^^^
Expand Down Expand Up @@ -573,9 +573,9 @@ OpenLineage reflects this structure in its Job Hierarchy model.

TaskInstance events' ParentRunFacet references the originating DAG run.

.. _local_troubleshooting:openlineage:
.. _troubleshooting:openlineage:

Local troubleshooting
Troubleshooting
=====================

When testing code locally, `Marquez <https://marquezproject.ai/docs/quickstart>`_ can be used to inspect the data being emitted—or not being emitted.
Expand All @@ -585,6 +585,19 @@ then the Extractor is fine and an issue should be opened up in OpenLineage. Howe
it is likely that more unit tests are needed to cover Extractor behavior.
Marquez can help you pinpoint which facets are not being formed properly so you know where to add test coverage.

Debug settings
^^^^^^^^^^^^^^
For debugging purposes, ensure that the `Airflow logging level <https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#logging-level>`_
is set to ``DEBUG`` and that the :ref:`debug_mode <options:debug_mode>` is enabled for OpenLineage integration.
This will increase the detail in Airflow logs and include additional environmental information in OpenLineage events.

When seeking help with debugging, always try to provide the following:

- Airflow scheduler logs with the logging level set to DEBUG
- Airflow worker logs (task logs) with the logging level set to DEBUG
- OpenLineage events with debug_mode enabled


Where can I learn more?
=======================

Expand Down
40 changes: 37 additions & 3 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ If not set, it's using ``default`` namespace. Provide the name of the namespace
AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
.. _options:disable:

Timeout
^^^^^^^

Expand All @@ -210,6 +208,7 @@ The code runs with default timeout of 10 seconds. You can increase this by setti
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60
.. _options:disable:

Disable
^^^^^^^
Expand Down Expand Up @@ -283,6 +282,10 @@ serializing only a few known attributes, we exclude certain non-serializable ele
``AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true
.. warning::

By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements that are megabytes in size or larger, depending on the size of data you pass to the task.
Expand Down Expand Up @@ -324,6 +327,31 @@ a string of semicolon separated full import paths to ``custom_run_facets`` optio
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
.. _options:debug_mode:

Debug Mode
^^^^^^^^^^

You can enable sending additional information in OpenLineage events that can be useful for debugging and
reproducing your environment setup by setting ``debug_mode`` option to ``true`` in Airflow configuration.

.. code-block:: ini
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true
``AIRFLOW__OPENLINEAGE__DEBUG_MODE`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__DEBUG_MODE=true
.. warning::

By setting this variable to true, OpenLineage integration may log and emit extensive details. It should only be enabled temporary for debugging purposes.


Enabling OpenLineage on DAG/task level
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -335,6 +363,12 @@ To enable this policy, set the ``selective_enable`` option to True in the [openl
[openlineage]
selective_enable = True
``AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
While ``selective_enable`` enables selective control, the ``disabled`` :ref:`option <options:disable>` still has precedence.
If you set ``disabled`` to True in the configuration, OpenLineage will be disabled for all DAGs and tasks regardless of the ``selective_enable`` setting.
Expand Down Expand Up @@ -383,7 +417,7 @@ Disabling DAG-level lineage while enabling task-level lineage might cause errors
Troubleshooting
===============

See :ref:`local_troubleshooting:openlineage` for details on how to troubleshoot OpenLineage locally.
See :ref:`troubleshooting:openlineage` for details on how to troubleshoot OpenLineage.


Adding support for custom Operators
Expand Down
19 changes: 15 additions & 4 deletions tests/providers/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowDebugRunFacet,
AirflowStateRunFacet,
)
from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
Expand Down Expand Up @@ -527,10 +528,11 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")


@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_static_uuid):
def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_static_uuid, mock_debug_mode):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
Expand Down Expand Up @@ -600,6 +602,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
"start_date": event_time.isoformat(),
},
),
"debug": AirflowDebugRunFacet(packages=ANY),
},
),
job=Job(
Expand Down Expand Up @@ -630,11 +633,14 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
mock_stats_timer.assert_called_with("ol.emit.attempts")


@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
@mock.patch.object(DagRun, "get_task_instances")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks):
def test_emit_dag_complete_event(
mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, mock_debug_mode
):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
Expand Down Expand Up @@ -684,7 +690,8 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_sta
task_1.task_id: TaskInstanceState.SKIPPED,
task_2.task_id: TaskInstanceState.FAILED,
},
)
),
"debug": AirflowDebugRunFacet(packages=ANY),
},
),
job=Job(
Expand All @@ -708,11 +715,14 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")


@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
@mock.patch.object(DagRun, "get_task_instances")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks):
def test_emit_dag_failed_event(
mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, mock_debug_mode
):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
Expand Down Expand Up @@ -764,6 +774,7 @@ def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_stati
task_2.task_id: TaskInstanceState.FAILED,
},
),
"debug": AirflowDebugRunFacet(packages=ANY),
},
),
job=Job(
Expand Down
Loading

0 comments on commit d12eb43

Please sign in to comment.