Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/providers/dbt/cloud/utils/openlineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ def generate_openlineage_events_from_dbt_cloud_run(
"""
from openlineage.common.provider.dbt import DbtCloudArtifactProcessor, ParentRunMetadata

from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import (
_DAG_NAMESPACE,
_PRODUCER,
OpenLineageAdapter,
)
Expand Down Expand Up @@ -110,7 +110,7 @@ async def get_artifacts_for_steps(steps, artifacts):

processor = DbtCloudArtifactProcessor(
producer=_PRODUCER,
job_namespace=_DAG_NAMESPACE,
job_namespace=namespace(),
skip_errors=False,
logger=operator.log,
manifest=manifest,
Expand All @@ -130,7 +130,7 @@ async def get_artifacts_for_steps(steps, artifacts):
parent_job = ParentRunMetadata(
run_id=parent_run_id,
job_name=f"{task_instance.dag_id}.{task_instance.task_id}",
job_namespace=_DAG_NAMESPACE,
job_namespace=namespace(),
)
processor.dbt_run_metadata = parent_job

Expand Down
98 changes: 98 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import os
from typing import Any

from airflow.compat.functools import cache
from airflow.configuration import conf

_CONFIG_SECTION = "openlineage"


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
option = conf.get(_CONFIG_SECTION, "config_path", fallback="")
if check_legacy_env_var and not option:
option = os.getenv("OPENLINEAGE_CONFIG", "")
return option


@cache
def is_source_enabled() -> bool:
"""[openlineage] disable_source_code."""
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
return option.lower() not in ("true", "1", "t")


@cache
def disabled_operators() -> set[str]:
"""[openlineage] disabled_for_operators."""
option = conf.get(_CONFIG_SECTION, "disabled_for_operators", fallback="")
return set(operator.strip() for operator in option.split(";") if operator.strip())


@cache
def custom_extractors() -> set[str]:
"""[openlineage] extractors."""
option = conf.get(_CONFIG_SECTION, "extractors", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_EXTRACTORS", "")
return set(extractor.strip() for extractor in option.split(";") if extractor.strip())


@cache
def namespace() -> str:
"""[openlineage] namespace."""
option = conf.get(_CONFIG_SECTION, "namespace", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_NAMESPACE", "default")
return option


@cache
def transport() -> dict[str, Any]:
"""[openlineage] transport."""
option = conf.getjson(_CONFIG_SECTION, "transport", fallback={})
if not isinstance(option, dict):
raise ValueError(f"OpenLineage transport `{option}` is not a dict")
return option


@cache
def is_disabled() -> bool:
"""[openlineage] disabled + some extra checks."""

def _is_true(val):
return str(val).lower().strip() in ("true", "1", "t")

option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
return True

option = os.getenv("OPENLINEAGE_DISABLED", "")
if _is_true(option):
return True

# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""
30 changes: 0 additions & 30 deletions airflow/providers/openlineage/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING

from attrs import Factory, define

from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -64,31 +62,10 @@ def get_operator_classnames(cls) -> list[str]:
"""
raise NotImplementedError()

@cached_property
def disabled_operators(self) -> set[str]:
return set(
operator.strip()
for operator in conf.get("openlineage", "disabled_for_operators", fallback="").split(";")
)

@cached_property
def _is_operator_disabled(self) -> bool:
fully_qualified_class_name = (
self.operator.__class__.__module__ + "." + self.operator.__class__.__name__
)
return fully_qualified_class_name in self.disabled_operators

@abstractmethod
def _execute_extraction(self) -> OperatorLineage | None: ...

def extract(self) -> OperatorLineage | None:
if self._is_operator_disabled:
self.log.debug(
"Skipping extraction for operator %s "
"due to its presence in [openlineage] openlineage_disabled_for_operators.",
self.operator.task_type,
)
return None
return self._execute_extraction()

def extract_on_complete(self, task_instance) -> OperatorLineage | None:
Expand Down Expand Up @@ -125,13 +102,6 @@ def _execute_extraction(self) -> OperatorLineage | None:
return None

def extract_on_complete(self, task_instance) -> OperatorLineage | None:
if self._is_operator_disabled:
self.log.debug(
"Skipping extraction for operator %s "
"due to its presence in [openlineage] openlineage_disabled_for_operators.",
self.operator.task_type,
)
return None
if task_instance.state == TaskInstanceState.FAILED:
on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None)
if on_failed and callable(on_failed):
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/openlineage/extractors/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

from openlineage.client.facet import SourceCodeJobFacet

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import (
get_filtered_unknown_operator_keys,
is_source_enabled,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys

"""
:meta private:
Expand All @@ -51,7 +49,7 @@ def get_operator_classnames(cls) -> list[str]:

def _execute_extraction(self) -> OperatorLineage | None:
job_facets: dict = {}
if is_source_enabled():
if conf.is_source_enabled():
job_facets = {
"sourceCode": SourceCodeJobFacet(
language="bash",
Expand Down
30 changes: 12 additions & 18 deletions airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
# under the License.
from __future__ import annotations

import os
from contextlib import suppress
from typing import TYPE_CHECKING, Iterator

from airflow.configuration import conf
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.extractors.base import DefaultExtractor
from airflow.providers.openlineage.extractors.bash import BashExtractor
Expand Down Expand Up @@ -65,22 +64,17 @@ def __init__(self):
for operator_class in extractor.get_operator_classnames():
self.extractors[operator_class] = extractor

# Semicolon-separated extractors in Airflow configuration or OPENLINEAGE_EXTRACTORS variable.
# Extractors should implement BaseExtractor
env_extractors = conf.get("openlineage", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS"))
# skip either when it's empty string or None
if env_extractors:
for extractor in env_extractors.split(";"):
extractor: type[BaseExtractor] = try_import_from_string(extractor.strip())
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
operator_class,
extractor,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor
for extractor_path in conf.custom_extractors():
extractor: type[BaseExtractor] = try_import_from_string(extractor_path)
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
operator_class,
extractor_path,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor

def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
self.extractors[operator_class] = extractor
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/openlineage/extractors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@

from openlineage.client.facet import SourceCodeJobFacet

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import (
get_filtered_unknown_operator_keys,
is_source_enabled,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys

"""
:meta private:
Expand All @@ -55,7 +53,7 @@ def get_operator_classnames(cls) -> list[str]:
def _execute_extraction(self) -> OperatorLineage | None:
source_code = self.get_source_code(self.operator.python_callable)
job_facet: dict = {}
if is_source_enabled() and source_code:
if conf.is_source_enabled() and source_code:
job_facet = {
"sourceCode": SourceCodeJobFacet(
language="python",
Expand Down
28 changes: 9 additions & 19 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import os
import uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING
Expand All @@ -37,8 +36,7 @@
)
from openlineage.client.run import Job, Run, RunEvent, RunState

from airflow.configuration import conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -48,12 +46,6 @@
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker

_DAG_DEFAULT_NAMESPACE = "default"

_DAG_NAMESPACE = conf.get(
"openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", _DAG_DEFAULT_NAMESPACE)
)

_PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"

set_producer(_PRODUCER)
Expand Down Expand Up @@ -88,33 +80,31 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:

def get_openlineage_config(self) -> dict | None:
# First, try to read from YAML file
openlineage_config_path = conf.get("openlineage", "config_path", fallback="")
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
# Second, try to get transport config
transport = conf.getjson("openlineage", "transport", fallback="")
if not transport:
transport_config = conf.transport()
if not transport_config:
return None
elif not isinstance(transport, dict):
raise ValueError(f"{transport} is not a dict")
return transport
return transport_config

def _read_yaml_config(self, path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)

@staticmethod
def build_dag_run_id(dag_id, dag_run_id):
return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.{dag_id}.{dag_run_id}"))
return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{conf.namespace()}.{dag_id}.{dag_run_id}"))

@staticmethod
def build_task_instance_run_id(dag_id, task_id, execution_date, try_number):
return str(
uuid.uuid3(
uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}",
f"{conf.namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}",
)
)

Expand Down Expand Up @@ -353,7 +343,7 @@ def _build_run(
if parent_run_id:
parent_run_facet = ParentRunFacet.create(
runId=parent_run_id,
namespace=_DAG_NAMESPACE,
namespace=conf.namespace(),
name=parent_job_name or job_name,
)
facets.update(
Expand Down Expand Up @@ -396,4 +386,4 @@ def _build_job(

facets.update({"jobType": job_type})

return Job(_DAG_NAMESPACE, job_name, facets)
return Job(conf.namespace(), job_name, facets)
Loading