Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openlineage: add some debug logging around sql parser call sites #40200

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions airflow/providers/openlineage/sqlparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
get_table_schemas,
)
from airflow.typing_compat import TypedDict
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -116,19 +117,26 @@ def from_table_meta(
return Dataset(namespace=namespace, name=name if not is_uppercase else name.upper())


class SQLParser:
class SQLParser(LoggingMixin):
"""Interface for openlineage-sql.
:param dialect: dialect specific to the database
:param default_schema: schema applied to each table with no schema parsed
"""

def __init__(self, dialect: str | None = None, default_schema: str | None = None) -> None:
super().__init__()
self.dialect = dialect
self.default_schema = default_schema

def parse(self, sql: list[str] | str) -> SqlMeta | None:
"""Parse a single or a list of SQL statements."""
self.log.debug(
"OpenLineage calling SQL parser with SQL %s dialect %s schema %s",
sql,
self.dialect,
self.default_schema,
)
return parse(sql=sql, dialect=self.dialect, default_schema=self.default_schema)

def parse_table_schemas(
Expand All @@ -151,6 +159,7 @@ def parse_table_schemas(
"database": database or database_info.database,
"use_flat_cross_db_query": database_info.use_flat_cross_db_query,
}
self.log.info("PRE getting schemas for input and output tables")
return get_table_schemas(
hook,
namespace,
Expand Down Expand Up @@ -335,9 +344,8 @@ def split_statement(sql: str) -> list[str]:
return split_statement(sql)
return [obj for stmt in sql for obj in cls.split_sql_string(stmt) if obj != ""]

@classmethod
def create_information_schema_query(
cls,
self,
tables: list[DbTableMeta],
normalize_name: Callable[[str], str],
is_cross_db: bool,
Expand All @@ -349,7 +357,7 @@ def create_information_schema_query(
sqlalchemy_engine: Engine | None = None,
) -> str:
"""Create SELECT statement to query information schema table."""
tables_hierarchy = cls._get_tables_hierarchy(
tables_hierarchy = self._get_tables_hierarchy(
tables,
normalize_name=normalize_name,
database=database,
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/openlineage/utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import logging
from collections import defaultdict
from contextlib import closing
from enum import IntEnum
Expand All @@ -33,6 +34,9 @@
from airflow.hooks.base import BaseHook


log = logging.getLogger(__name__)


class ColumnIndex(IntEnum):
"""Enumerates the indices of columns in information schema view."""

Expand Down Expand Up @@ -90,6 +94,7 @@ def get_table_schemas(
if not in_query and not out_query:
return [], []

log.debug("Starting to query database for table schemas")
with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cursor:
if in_query:
cursor.execute(in_query)
Expand All @@ -101,6 +106,7 @@ def get_table_schemas(
out_datasets = [x.to_dataset(namespace, database, schema) for x in parse_query_result(cursor)]
else:
out_datasets = []
log.debug("Got table schema query result from database.")
return in_datasets, out_datasets


Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/snowflake/hooks/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,10 @@ def get_openlineage_database_specific_lineage(self, _) -> OperatorLineage | None
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import SQLParser

connection = self.get_connection(getattr(self, self.conn_name_attr))
namespace = SQLParser.create_namespace(self.get_openlineage_database_info(connection))

if self.query_ids:
self.log.debug("openlineage: getting connection to get database info")
connection = self.get_connection(getattr(self, self.conn_name_attr))
namespace = SQLParser.create_namespace(self.get_openlineage_database_info(connection))
return OperatorLineage(
run_facets={
"externalQuery": ExternalQueryRunFacet(
Expand Down