Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def disabled_operators() -> set[str]:
return set(operator.strip() for operator in option.split(";") if operator.strip())


@cache
def selective_enable() -> bool:
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)


@cache
def custom_extractors() -> set[str]:
"""[openlineage] extractors."""
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
get_custom_facets,
get_job_name,
is_operator_disabled,
is_selective_lineage_enabled,
print_warning,
)
from airflow.stats import Stats
Expand Down Expand Up @@ -83,6 +84,9 @@ def on_task_instance_running(
)
return None

if not is_selective_lineage_enabled(task):
return

@print_warning(self.log)
def on_running():
# that's a workaround to detect task running from deferred state
Expand Down Expand Up @@ -150,6 +154,9 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
)
return None

if not is_selective_lineage_enabled(task):
return

@print_warning(self.log)
def on_success():
parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id)
Expand Down Expand Up @@ -202,6 +209,9 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
)
return None

if not is_selective_lineage_enabled(task):
return

@print_warning(self.log)
def on_failure():
parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id)
Expand Down Expand Up @@ -255,6 +265,8 @@ def before_stopping(self, component):

@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
if not is_selective_lineage_enabled(dag_run.dag):
return
data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
self.executor.submit(
Expand All @@ -267,13 +279,17 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):

@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if not is_selective_lineage_enabled(dag_run.dag):
return
if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return
self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if not is_selective_lineage_enabled(dag_run.dag):
return
if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return
Expand Down
8 changes: 8 additions & 0 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ config:
example: "airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator"
default: ""
version_added: 1.1.0
selective_enable:
description: |
If this setting is enabled, OpenLineage integration won't collect and emit metadata,
unless you explicitly enable it per `DAG` or `Task` using `enable_lineage` method.
type: boolean
default: "False"
example: ~
version_added: 1.7.0
namespace:
description: |
Set namespace that the lineage data belongs to, so that if you use multiple OpenLineage producers,
Expand Down
87 changes: 87 additions & 0 deletions airflow/providers/openlineage/utils/selective_enable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import logging
from typing import TypeVar

from airflow.models import DAG, Operator, Param
from airflow.models.xcom_arg import XComArg

ENABLE_OL_PARAM_NAME = "_selective_enable_ol"
ENABLE_OL_PARAM = Param(True, const=True)
DISABLE_OL_PARAM = Param(False, const=False)
T = TypeVar("T", bound="DAG | Operator")

log = logging.getLogger(__name__)


def enable_lineage(obj: T) -> T:
"""Set selective enable OpenLineage parameter to True.

The method also propagates param to tasks if the object is DAG.
"""
if isinstance(obj, XComArg):
enable_lineage(obj.operator)
return obj
# propagate param to tasks
if isinstance(obj, DAG):
for task in obj.task_dict.values():
enable_lineage(task)
obj.params[ENABLE_OL_PARAM_NAME] = ENABLE_OL_PARAM
return obj


def disable_lineage(obj: T) -> T:
"""Set selective enable OpenLineage parameter to False.

The method also propagates param to tasks if the object is DAG.
"""
if isinstance(obj, XComArg):
disable_lineage(obj.operator)
return obj
# propagate param to tasks
if isinstance(obj, DAG):
for task in obj.task_dict.values():
disable_lineage(task)
obj.params[ENABLE_OL_PARAM_NAME] = DISABLE_OL_PARAM
return obj


def is_task_lineage_enabled(task: Operator) -> bool:
"""Check if selective enable OpenLineage parameter is set to True on task level."""
if task.params.get(ENABLE_OL_PARAM_NAME) is False:
log.debug(
"OpenLineage event emission suppressed. Task for this functionality is selectively disabled."
)
return task.params.get(ENABLE_OL_PARAM_NAME) is True


def is_dag_lineage_enabled(dag: DAG) -> bool:
"""Check if DAG is selectively enabled to emit OpenLineage events.

The method also checks if selective enable parameter is set to True
or if any of the tasks in DAG is selectively enabled.
"""
if dag.params.get(ENABLE_OL_PARAM_NAME) is False:
log.debug(
"OpenLineage event emission suppressed. DAG for this functionality is selectively disabled."
)
return dag.params.get(ENABLE_OL_PARAM_NAME) is True or any(
is_task_lineage_enabled(task) for task in dag.tasks
)
19 changes: 18 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@
# TODO: move this maybe to Airflow's logic?
from openlineage.client.utils import RedactMixin

from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowMappedTaskRunFacet,
AirflowRunFacet,
)
from airflow.providers.openlineage.utils.selective_enable import (
is_dag_lineage_enabled,
is_task_lineage_enabled,
)
from airflow.utils.context import AirflowContextDeprecationWarning
from airflow.utils.log.secrets_masker import Redactable, Redacted, SecretsMasker, should_hide_value_for_key

if TYPE_CHECKING:
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator, TaskInstance
from airflow.models import DagRun, TaskInstance


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +78,18 @@ def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:
return get_fully_qualified_class_name(operator) in conf.disabled_operators()


def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bool:
"""If selective enable is active check if DAG or Task is enabled to emit events."""
if not conf.selective_enable():
return True
if isinstance(obj, DAG):
return is_dag_lineage_enabled(obj)
elif isinstance(obj, (BaseOperator, MappedOperator)):
return is_task_lineage_enabled(obj)
else:
raise TypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects")


class InfoJsonEncodable(dict):
"""
Airflow objects might not be json-encodable overall.
Expand Down
56 changes: 56 additions & 0 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ If not set, it's using ``default`` namespace. Provide the name of the namespace

AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'

.. _options:disable:

Disable
^^^^^^^
Expand Down Expand Up @@ -263,6 +264,61 @@ a string of semicolon separated Airflow Operators full import paths to ``extract

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

Enabling OpenLineage on DAG/task level
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

One can selectively enable OpenLineage for specific DAGs and tasks by using the ``selective_enable`` policy.
To enable this policy, set the ``selective_enable`` option to True in the [openlineage] section of your Airflow configuration file:

.. code-block:: ini

[openlineage]
selective_enable = True


While ``selective_enable`` enables selective control, the ``disabled`` :ref:`option <options:disable>` still has precedence.
If you set ``disabled`` to True in the configuration, OpenLineage will be disabled for all DAGs and tasks regardless of the ``selective_enable`` setting.

Once the ``selective_enable`` policy is enabled, you can choose to enable OpenLineage
for individual DAGs and tasks using the ``enable_lineage`` and ``disable_lineage`` functions.

1. Enabling Lineage on a DAG:

.. code-block:: python

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with enable_lineage(DAG(...)):
# Tasks within this DAG will have lineage tracking enabled
MyOperator(...)

AnotherOperator(...)

2. Enabling Lineage on a Task:

While enabling lineage on a DAG implicitly enables it for all tasks within that DAG, you can still selectively disable it for specific tasks:

.. code-block:: python

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with DAG(...) as dag:
t1 = MyOperator(...)
t2 = AnotherOperator(...)

# Enable lineage for the entire DAG
enable_lineage(dag)

# Disable lineage for task t1
disable_lineage(t1)

Enabling lineage on the DAG level automatically enables it for all tasks within that DAG unless explicitly disabled per task.

Enabling lineage on the task level implicitly enables lineage on its DAG.
This is because each emitting task sends a `ParentRunFacet <https://openlineage.io/docs/spec/facets/run-facets/parent_run>`_,
which requires the DAG-level lineage to be enabled in some OpenLineage backend systems.
Disabling DAG-level lineage while enabling task-level lineage might cause errors or inconsistencies.


Troubleshooting
===============
Expand Down
Loading