diff --git a/airflow/providers/openlineage/sqlparser.py b/airflow/providers/openlineage/sqlparser.py index f181ff8ccea019..470b93d3cb9e05 100644 --- a/airflow/providers/openlineage/sqlparser.py +++ b/airflow/providers/openlineage/sqlparser.py @@ -39,6 +39,7 @@ get_table_schemas, ) from airflow.typing_compat import TypedDict +from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -116,7 +117,7 @@ def from_table_meta( return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper()) -class SQLParser: +class SQLParser(LoggingMixin): """Interface for openlineage-sql. :param dialect: dialect specific to the database @@ -124,11 +125,18 @@ class SQLParser: """ def __init__(self, dialect: str | None = None, default_schema: str | None = None) -> None: + super().__init__() self.dialect = dialect self.default_schema = default_schema def parse(self, sql: list[str] | str) -> SqlMeta | None: """Parse a single or a list of SQL statements.""" + self.log.debug( + "OpenLineage calling SQL parser with SQL %s dialect %s schema %s", + sql, + self.dialect, + self.default_schema, + ) return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema) def parse_table_schemas( @@ -151,6 +159,7 @@ def parse_table_schemas( "database": database or database_info.database, "use_flat_cross_db_query": database_info.use_flat_cross_db_query, } + self.log.info("PRE getting schemas for input and output tables") return get_table_schemas( hook, namespace, @@ -335,9 +344,8 @@ def split_statement(sql: str) -> list[str]: return split_statement(sql) return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if obj != ""] - @classmethod def create_information_schema_query( - cls, + self, tables: list[DbTableMeta], normalize_name: Callable[[str], str], is_cross_db: bool, @@ -349,7 +357,7 @@ def create_information_schema_query( sqlalchemy_engine: Engine | None = None, ) -> str: """Create SELECT statement to query information schema table.""" - tables_hierarchy = cls._get_tables_hierarchy( + tables_hierarchy = self._get_tables_hierarchy( tables, normalize_name=normalize_name, database=database, diff --git a/airflow/providers/openlineage/utils/sql.py b/airflow/providers/openlineage/utils/sql.py index f959745b9361b5..f5d083b4e46905 100644 --- a/airflow/providers/openlineage/utils/sql.py +++ b/airflow/providers/openlineage/utils/sql.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import logging from collections import defaultdict from contextlib import closing from enum import IntEnum @@ -33,6 +34,9 @@ from airflow.hooks.base import BaseHook +log = logging.getLogger(__name__) + + class ColumnIndex(IntEnum): """Enumerates the indices of columns in information schema view.""" @@ -90,6 +94,7 @@ def get_table_schemas( if not in_query and not out_query: return [], [] + log.debug("Starting to query database for table schemas") with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor: if in_query: cursor.execute(in_query) @@ -101,6 +106,7 @@ def get_table_schemas( out_datasets = [x.to_dataset(namespace, database, schema) for x in parse_query_result(cursor)] else: out_datasets = [] + log.debug("Got table schema query result from database.") return in_datasets, out_datasets diff --git a/airflow/providers/snowflake/hooks/snowflake.py b/airflow/providers/snowflake/hooks/snowflake.py index 978bcf75e1c566..e2a4a453fbb075 100644 --- a/airflow/providers/snowflake/hooks/snowflake.py +++ b/airflow/providers/snowflake/hooks/snowflake.py @@ -473,10 +473,10 @@ def get_openlineage_database_specific_lineage(self, _) -> OperatorLineage | None from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.sqlparser import SQLParser - connection = self.get_connection(getattr(self, self.conn_name_attr)) - namespace = SQLParser.create_namespace(self.get_openlineage_database_info(connection)) - if self.query_ids: + self.log.debug("openlineage: getting connection to get database info") + connection = self.get_connection(getattr(self, self.conn_name_attr)) + namespace = SQLParser.create_namespace(self.get_openlineage_database_info(connection)) return OperatorLineage( run_facets={ "externalQuery": ExternalQueryRunFacet(