Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions providers/common/sql/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ dependencies = [
"polars" = [
"polars>=1.26.0"
]
"sqlalchemy" = [
"sqlalchemy>=1.4.49",
]

[dependency-groups]
dev = [
Expand Down
45 changes: 34 additions & 11 deletions providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@
from deprecated import deprecated
from methodtools import lru_cache
from more_itertools import chunked
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import make_url
from sqlalchemy.exc import ArgumentError, NoSuchModuleError

try:
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import make_url
from sqlalchemy.exc import ArgumentError, NoSuchModuleError
except ImportError:
create_engine = None
inspect = None
make_url = None
ArgumentError = Exception
NoSuchModuleError = Exception


from airflow.configuration import conf
from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -304,6 +313,12 @@ def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:
:param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
:return: the created engine.
"""
if create_engine is None:
raise AirflowOptionalProviderFeatureException(
"SQLAlchemy is required to generate the connection URI. "
"Install it with: pip install 'apache-airflow-providers-common-sql[sqlalchemy]'"
)

if engine_kwargs is None:
engine_kwargs = {}

Expand All @@ -318,18 +333,26 @@ def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:

@property
def inspector(self) -> Inspector:
if inspect is None:
raise AirflowOptionalProviderFeatureException(
"SQLAlchemy is required for database inspection. "
"Install it with: pip install 'apache-airflow-providers-common-sql[sqlalchemy]'"
)
return inspect(self.get_sqlalchemy_engine())

@cached_property
def dialect_name(self) -> str:
try:
return make_url(self.get_uri()).get_dialect().name
except (ArgumentError, NoSuchModuleError, ValueError):
config = self.connection_extra
sqlalchemy_scheme = config.get("sqlalchemy_scheme")
if sqlalchemy_scheme:
return sqlalchemy_scheme.split("+")[0] if "+" in sqlalchemy_scheme else sqlalchemy_scheme
return config.get("dialect", "default")
if make_url is not None:
try:
return make_url(self.get_uri()).get_dialect().name
except (ArgumentError, NoSuchModuleError, ValueError):
pass

config = self.connection_extra
sqlalchemy_scheme = config.get("sqlalchemy_scheme")
if sqlalchemy_scheme:
return sqlalchemy_scheme.split("+")[0] if "+" in sqlalchemy_scheme else sqlalchemy_scheme
return config.get("dialect", "default")

@cached_property
def dialect(self) -> Dialect:
Expand Down