Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior: Get column info from information_schema Part I #808

Merged
merged 10 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- custom aliases for source and target tables can be specified and used in condition clauses;
- `matched` and `not matched` steps can now be skipped;
- Allow for the use of custom constraints, using the `custom` constraint type with an `expression` as the constraint (thanks @roydobbe). ([792](https://github.com/databricks/dbt-databricks/pull/792))
- Add "use_info_schema_for_columns" behavior flag to turn on use of information_schema to get column info where possible. This may have more latency but will not truncate complex data types the way that 'describe' can. ([808](https://github.com/databricks/dbt-databricks/pull/808))

### Under the Hood

Expand Down
71 changes: 71 additions & 0 deletions dbt/adapters/databricks/behaviors/columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from abc import ABC, abstractmethod
from typing import List
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.utils import handle_missing_objects
from dbt_common.utils.dict import AttrDict

GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"


class GetColumnsBehavior(ABC):
@classmethod
@abstractmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
pass

@staticmethod
def _get_columns_with_comments(
adapter: SQLAdapter, relation: DatabricksRelation, macro_name: str
) -> List[AttrDict]:
return list(
handle_missing_objects(
lambda: adapter.execute_macro(macro_name, kwargs={"relation": relation}),
AttrDict(),
)
)


class GetColumnsByDescribe(GetColumnsBehavior):
@classmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
rows = cls._get_columns_with_comments(adapter, relation, "get_columns_comments")
return cls._parse_columns(rows)

@classmethod
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]:
columns = []

for row in rows:
if row["col_name"].startswith("#"):
break
columns.append(
DatabricksColumn(
column=row["col_name"], dtype=row["data_type"], comment=row["comment"]
)
)

return columns


class GetColumnsByInformationSchema(GetColumnsByDescribe):
@classmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
if relation.is_hive_metastore() or relation.type == DatabricksRelation.View:
return super().get_columns_in_relation(adapter, relation)

rows = cls._get_columns_with_comments(
adapter, relation, "get_columns_comments_via_information_schema"
)
return cls._parse_columns(rows)

@classmethod
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]:
return [DatabricksColumn(column=row[0], dtype=row[1], comment=row[2]) for row in rows]
76 changes: 31 additions & 45 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from multiprocessing.context import SpawnContext
import os
import re
from abc import ABC
Expand All @@ -7,7 +8,6 @@
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import cast
from typing import ClassVar
from typing import Dict
Expand All @@ -21,7 +21,6 @@
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union

from dbt.adapters.base import AdapterConfig
Expand All @@ -37,6 +36,11 @@
from dbt.adapters.contracts.connection import Connection
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import RelationType
from dbt.adapters.databricks.behaviors.columns import (
GetColumnsBehavior,
GetColumnsByDescribe,
GetColumnsByInformationSchema,
)
from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.connections import DatabricksConnectionManager
from dbt.adapters.databricks.connections import DatabricksDBTConnection
Expand All @@ -63,7 +67,7 @@
StreamingTableConfig,
)
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig
from dbt.adapters.databricks.utils import get_first_row
from dbt.adapters.databricks.utils import get_first_row, handle_missing_objects
from dbt.adapters.databricks.utils import redact_credentials
from dbt.adapters.databricks.utils import undefined_proof
from dbt.adapters.relation_configs import RelationResults
Expand All @@ -73,8 +77,7 @@
from dbt.adapters.spark.impl import KEY_TABLE_STATISTICS
from dbt.adapters.spark.impl import LIST_SCHEMAS_MACRO_NAME
from dbt.adapters.spark.impl import SparkAdapter
from dbt.adapters.spark.impl import TABLE_OR_VIEW_NOT_FOUND_MESSAGES
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.behavior_flags import BehaviorFlag
from dbt_common.utils import executor
from dbt_common.utils.dict import AttrDict

Expand All @@ -90,6 +93,15 @@
SHOW_VIEWS_MACRO_NAME = "show_views"
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"

USE_INFO_SCHEMA_FOR_COLUMNS = BehaviorFlag(
name="use_info_schema_for_columns",
default=False,
description=(
"Use info schema to gather column information to ensure complex types are not truncated."
" Incurs some overhead, so disabled by default."
),
) # type: ignore[typeddict-item]


@dataclass
class DatabricksConfig(AdapterConfig):
Expand All @@ -116,26 +128,6 @@ class DatabricksConfig(AdapterConfig):
merge_with_schema_evolution: Optional[bool] = None


def check_not_found_error(errmsg: str) -> bool:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to prevent circular dependency.

new_error = "[SCHEMA_NOT_FOUND]" in errmsg
old_error = re.match(r".*(Database).*(not found).*", errmsg, re.DOTALL)
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES)
return new_error or old_error is not None or any(found_msgs)


T = TypeVar("T")


def handle_missing_objects(exec: Callable[[], T], default: T) -> T:
try:
return exec()
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if check_not_found_error(errmsg):
return default
raise e


def get_identifier_list_string(table_names: Set[str]) -> str:
"""Returns `"|".join(table_names)` by default.

Expand Down Expand Up @@ -175,6 +167,19 @@ class DatabricksAdapter(SparkAdapter):
}
)

get_column_behavior: GetColumnsBehavior

def __init__(self, config: Any, mp_context: SpawnContext) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I don't control the invocation of init, and thus can't pass the behaviors in, doing this during init ensures we only check the behavior flag once. I tried to make this a little more functional but couldn't figure out how to override an existing function definition that is inherited from a parent, hence the goofy class-based strategy.

super().__init__(config, mp_context)
if self.behavior.use_info_schema_for_columns.no_warn: # type: ignore[attr-defined]
self.get_column_behavior = GetColumnsByInformationSchema()
else:
self.get_column_behavior = GetColumnsByDescribe()

@property
def _behavior_flags(self) -> List[BehaviorFlag]:
return [USE_INFO_SCHEMA_FOR_COLUMNS]

# override/overload
def acquire_connection(
self, name: Optional[str] = None, query_header_context: Any = None
Expand Down Expand Up @@ -388,26 +393,7 @@ def parse_describe_extended( # type: ignore[override]
def get_columns_in_relation( # type: ignore[override]
self, relation: DatabricksRelation
) -> List[DatabricksColumn]:
rows = list(
handle_missing_objects(
lambda: self.execute_macro(
GET_COLUMNS_COMMENTS_MACRO_NAME, kwargs={"relation": relation}
),
AttrDict(),
)
)

columns = []
for row in rows:
if row["col_name"].startswith("#"):
break
columns.append(
DatabricksColumn(
column=row["col_name"], dtype=row["data_type"], comment=row["comment"]
)
)

return columns
return self.get_column_behavior.get_columns_in_relation(self, relation)

def _get_updated_relation(
self, relation: DatabricksRelation
Expand Down
22 changes: 22 additions & 0 deletions dbt/adapters/databricks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from typing import TypeVar

from dbt.adapters.base import BaseAdapter
from dbt.adapters.spark.impl import TABLE_OR_VIEW_NOT_FOUND_MESSAGES
from dbt_common.exceptions import DbtRuntimeError
from jinja2 import Undefined

if TYPE_CHECKING:
Expand Down Expand Up @@ -92,3 +94,23 @@ def get_first_row(results: "Table") -> "Row":

return Row(values=set())
return results.rows[0]


def check_not_found_error(errmsg: str) -> bool:
new_error = "[SCHEMA_NOT_FOUND]" in errmsg
old_error = re.match(r".*(Database).*(not found).*", errmsg, re.DOTALL)
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES)
return new_error or old_error is not None or any(found_msgs)


T = TypeVar("T")


def handle_missing_objects(exec: Callable[[], T], default: T) -> T:
try:
return exec()
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if check_not_found_error(errmsg):
return default
raise e
19 changes: 19 additions & 0 deletions dbt/include/databricks/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@
{% do return(load_result('get_columns_comments').table) %}
{% endmacro %}

{% macro get_columns_comments_via_information_schema(relation) -%}
{% call statement('repair_table', fetch_result=False) -%}
REPAIR TABLE {{ relation|lower }} SYNC METADATA
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure information_schema is up to date prior to using this method.

{% endcall %}
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%}
select
column_name,
full_data_type,
comment
from `system`.`information_schema`.`columns`
where
table_catalog = '{{ relation.database|lower }}' and
table_schema = '{{ relation.schema|lower }}' and
table_name = '{{ relation.identifier|lower }}'
{% endcall %}

{% do return(load_result('get_columns_comments_via_information_schema').table) %}
{% endmacro %}

{% macro databricks__persist_docs(relation, model, for_relation, for_columns) -%}
{%- if for_relation and config.persist_relation_docs() and model.description %}
{% do alter_table_comment(relation, model) %}
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ types-requests
types-mock
pre-commit

dbt-tests-adapter~=1.8.0
dbt-tests-adapter>=1.8.0, <2.0
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
databricks-sql-connector>=3.4.0, <3.5.0
dbt-spark~=1.8.0
dbt-core>=1.8.0, <2.0
dbt-adapters>=1.3.0, <2.0
dbt-core>=1.8.7, <2.0
dbt-common>=1.10.0, <2.0
dbt-adapters>=1.7.0, <2.0
databricks-sdk==0.17.0
keyring>=23.13.0
protobuf<5.0.0
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def _get_plugin_version() -> str:
include_package_data=True,
install_requires=[
"dbt-spark>=1.8.0, <2.0",
"dbt-core>=1.8.0, <2.0",
"dbt-adapters>=1.3.0, <2.0",
"databricks-sql-connector>=3.2.0, <3.3.0",
"dbt-core>=1.8.7, <2.0",
"dbt-adapters>=1.7.0, <2.0",
"dbt-common>=1.10.0, <2.0",
"databricks-sql-connector>=3.4.0, <3.5.0",
"databricks-sdk==0.17.0",
"keyring>=23.13.0",
"pandas<2.2.0",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str)
parser.addoption("--profile", action="store", default="databricks_uc_cluster", type=str)


# Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type'
Expand Down
25 changes: 25 additions & 0 deletions tests/functional/adapter/columns/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
base_model = """
select struct('a', 1, 'b', 'b', 'c', ARRAY(1,2,3)) as struct_col, 'hello' as str_col
"""

schema = """
version: 2
models:
- name: base_model
config:
materialized: table
columns:
- name: struct_col
- name: str_col
"""

view_schema = """
version: 2
models:
- name: base_model
config:
materialized: view
columns:
- name: struct_col
- name: str_col
"""
Loading
Loading