Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions providers/openlineage/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ dependencies = [
"openlineage-python>=1.41.0",
]

# The optional dependencies should be modified in place in the generated file
# Any change in the dependencies is preserved when the file is regenerated
[project.optional-dependencies]
"sqlalchemy" = [
"sqlalchemy>=1.4.49",
]

[dependency-groups]
dev = [
"apache-airflow",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
from attrs import define
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import schema_dataset
from sqlalchemy import Column, MetaData, Table, and_, or_, union_all

from airflow.exceptions import AirflowOptionalProviderFeatureException

if TYPE_CHECKING:
from sqlalchemy import Table
from sqlalchemy.engine import Engine
from sqlalchemy.sql.elements import ColumnElement

Expand Down Expand Up @@ -157,6 +159,13 @@ def create_information_schema_query(
sqlalchemy_engine: Engine | None = None,
) -> str:
"""Create query for getting table schemas from information schema."""
try:
from sqlalchemy import Column, MetaData, Table, union_all
except ImportError:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required for SQL schema query generation. "
"Install it with: pip install 'apache-airflow-providers-openlineage[sqlalchemy]'"
)
metadata = MetaData()
select_statements = []
# Don't iterate over tables hierarchy, just pass it to query single information schema table
Expand Down Expand Up @@ -217,6 +226,13 @@ def create_filter_clauses(
therefore it is expected the table has them defined.
:param uppercase_names: if True use schema and table names uppercase
"""
try:
from sqlalchemy import and_, or_
except ImportError:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required for SQL filter clause generation. "
"Install it with: pip install 'apache-airflow-providers-openlineage[sqlalchemy]'"
)
table_schema_column_name = information_schema_table.columns[ColumnIndex.SCHEMA].name
table_name_column_name = information_schema_table.columns[ColumnIndex.TABLE_NAME].name
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from openlineage.client.utils import RedactMixin

from airflow import __version__ as AIRFLOW_VERSION
from airflow.exceptions import AirflowOptionalProviderFeatureException

# TODO: move this maybe to Airflow's logic?
from airflow.models import DagRun, TaskInstance, TaskReschedule
Expand Down Expand Up @@ -537,24 +538,19 @@ def is_selective_lineage_enabled(obj: DAG | SerializedDAG | AnyOperator) -> bool

@provide_session
def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
from sqlalchemy import exists, select
try:
from sqlalchemy import exists, select
except ImportError:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required for checking task instance reschedule status. "
"Install it with: pip install 'apache-airflow-providers-openlineage[sqlalchemy]'"
)

if not isinstance(ti.task, BaseSensorOperator):
return False

if not ti.task.reschedule:
return False
if AIRFLOW_V_3_0_PLUS:
return (
session.scalar(
select(
exists().where(
TaskReschedule.ti_id == ti.id, TaskReschedule.try_number == ti.try_number
)
)
)
is True
)
return (
session.scalar(
select(
Expand Down