Skip to content

Commit

Permalink
openlineage: add some debug logging around sql parser call sites (apa…
Browse files Browse the repository at this point in the history
…che#40200)

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski authored and jannisko committed Jun 15, 2024
1 parent 9b7e163 commit 6b66f82
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
16 changes: 12 additions & 4 deletions airflow/providers/openlineage/sqlparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
get_table_schemas,
)
from airflow.typing_compat import TypedDict
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -116,19 +117,26 @@ def from_table_meta(
return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper())


class SQLParser:
class SQLParser(LoggingMixin):
"""Interface for openlineage-sql.
:param dialect: dialect specific to the database
:param default_schema: schema applied to each table with no schema parsed
"""

def __init__(self, dialect: str | None = None, default_schema: str | None = None) -> None:
super().__init__()
self.dialect = dialect
self.default_schema = default_schema

def parse(self, sql: list[str] | str) -> SqlMeta | None:
"""Parse a single or a list of SQL statements."""
self.log.debug(
"OpenLineage calling SQL parser with SQL %s dialect %s schema %s",
sql,
self.dialect,
self.default_schema,
)
return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema)

def parse_table_schemas(
Expand All @@ -151,6 +159,7 @@ def parse_table_schemas(
"database": database or database_info.database,
"use_flat_cross_db_query": database_info.use_flat_cross_db_query,
}
self.log.info("PRE getting schemas for input and output tables")
return get_table_schemas(
hook,
namespace,
Expand Down Expand Up @@ -335,9 +344,8 @@ def split_statement(sql: str) -> list[str]:
return split_statement(sql)
return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if obj != ""]

@classmethod
def create_information_schema_query(
cls,
self,
tables: list[DbTableMeta],
normalize_name: Callable[[str], str],
is_cross_db: bool,
Expand All @@ -349,7 +357,7 @@ def create_information_schema_query(
sqlalchemy_engine: Engine | None = None,
) -> str:
"""Create SELECT statement to query information schema table."""
tables_hierarchy = cls._get_tables_hierarchy(
tables_hierarchy = self._get_tables_hierarchy(
tables,
normalize_name=normalize_name,
database=database,
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/openlineage/utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import logging
from collections import defaultdict
from contextlib import closing
from enum import IntEnum
Expand All @@ -33,6 +34,9 @@
from airflow.hooks.base import BaseHook


log = logging.getLogger(__name__)


class ColumnIndex(IntEnum):
"""Enumerates the indices of columns in information schema view."""

Expand Down Expand Up @@ -90,6 +94,7 @@ def get_table_schemas(
if not in_query and not out_query:
return [], []

log.debug("Starting to query database for table schemas")
with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor:
if in_query:
cursor.execute(in_query)
Expand All @@ -101,6 +106,7 @@ def get_table_schemas(
out_datasets = [x.to_dataset(namespace, database, schema) for x in parse_query_result(cursor)]
else:
out_datasets = []
log.debug("Got table schema query result from database.")
return in_datasets, out_datasets


Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/snowflake/hooks/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,10 @@ def get_openlineage_database_specific_lineage(self, _) -> OperatorLineage | None
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import SQLParser

connection = self.get_connection(getattr(self, self.conn_name_attr))
namespace = SQLParser.create_namespace(self.get_openlineage_database_info(connection))

if self.query_ids:
self.log.debug("openlineage: getting connection to get database info")
connection = self.get_connection(getattr(self, self.conn_name_attr))
namespace = SQLParser.create_namespace(self.get_openlineage_database_info(connection))
return OperatorLineage(
run_facets={
"externalQuery": ExternalQueryRunFacet(
Expand Down

0 comments on commit 6b66f82

Please sign in to comment.