Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions providers/src/airflow/providers/openlineage/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
Changelog
---------

main
.....

.. warning::
All deprecated classes, parameters and features have been removed from the OpenLineage provider package.
The following breaking changes were introduced:

* Utils

* Removed ``normalize_sql`` function from ``openlineage.utils`` module.


1.14.0
......

Expand Down
19 changes: 0 additions & 19 deletions providers/src/airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,17 @@
import datetime
import json
import logging
from collections.abc import Iterable
from contextlib import suppress
from functools import wraps
from importlib import metadata
from typing import TYPE_CHECKING, Any, Callable

import attrs
from deprecated import deprecated
from openlineage.client.utils import RedactMixin
from packaging.version import Version
from sqlalchemy import exists

from airflow import __version__ as AIRFLOW_VERSION
from airflow.exceptions import (
AirflowProviderDeprecationWarning,
)

# TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator, TaskReschedule
Expand Down Expand Up @@ -719,20 +714,6 @@ def get_filtered_unknown_operator_keys(operator: BaseOperator) -> dict:
return {attr: value for attr, value in operator.__dict__.items() if attr not in not_required_keys}


@deprecated(
reason=(
"`airflow.providers.openlineage.utils.utils.normalize_sql` "
"has been deprecated and will be removed in future"
),
category=AirflowProviderDeprecationWarning,
)
def normalize_sql(sql: str | Iterable[str]):
if isinstance(sql, str):
sql = [stmt for stmt in sql.split(";") if stmt != ""]
sql = [obj for stmt in sql for obj in stmt.split(";") if obj != ""]
return ";\n".join(sql)


def should_use_external_connection(hook) -> bool:
# If we're at Airflow 2.10, the execution is process-isolated, so we can safely run those again.
if not IS_AIRFLOW_2_10_OR_HIGHER:
Expand Down