-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Behavior: Get column info from information_schema Part I #808
Changes from all commits
3ca3a51
14675fd
39c59e4
f2e5235
a046461
6598360
a73a39d
c395c0a
e680e9e
880ec3a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import List | ||
from dbt.adapters.sql import SQLAdapter | ||
from dbt.adapters.databricks.column import DatabricksColumn | ||
from dbt.adapters.databricks.relation import DatabricksRelation | ||
from dbt.adapters.databricks.utils import handle_missing_objects | ||
from dbt_common.utils.dict import AttrDict | ||
|
||
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments" | ||
|
||
|
||
class GetColumnsBehavior(ABC): | ||
@classmethod | ||
@abstractmethod | ||
def get_columns_in_relation( | ||
cls, adapter: SQLAdapter, relation: DatabricksRelation | ||
) -> List[DatabricksColumn]: | ||
pass | ||
|
||
@staticmethod | ||
def _get_columns_with_comments( | ||
adapter: SQLAdapter, relation: DatabricksRelation, macro_name: str | ||
) -> List[AttrDict]: | ||
return list( | ||
handle_missing_objects( | ||
lambda: adapter.execute_macro(macro_name, kwargs={"relation": relation}), | ||
AttrDict(), | ||
) | ||
) | ||
|
||
|
||
class GetColumnsByDescribe(GetColumnsBehavior): | ||
@classmethod | ||
def get_columns_in_relation( | ||
cls, adapter: SQLAdapter, relation: DatabricksRelation | ||
) -> List[DatabricksColumn]: | ||
rows = cls._get_columns_with_comments(adapter, relation, "get_columns_comments") | ||
return cls._parse_columns(rows) | ||
|
||
@classmethod | ||
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]: | ||
columns = [] | ||
|
||
for row in rows: | ||
if row["col_name"].startswith("#"): | ||
break | ||
columns.append( | ||
DatabricksColumn( | ||
column=row["col_name"], dtype=row["data_type"], comment=row["comment"] | ||
) | ||
) | ||
|
||
return columns | ||
|
||
|
||
class GetColumnsByInformationSchema(GetColumnsByDescribe): | ||
@classmethod | ||
def get_columns_in_relation( | ||
cls, adapter: SQLAdapter, relation: DatabricksRelation | ||
) -> List[DatabricksColumn]: | ||
if relation.is_hive_metastore() or relation.type == DatabricksRelation.View: | ||
return super().get_columns_in_relation(adapter, relation) | ||
|
||
rows = cls._get_columns_with_comments( | ||
adapter, relation, "get_columns_comments_via_information_schema" | ||
) | ||
return cls._parse_columns(rows) | ||
|
||
@classmethod | ||
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]: | ||
return [DatabricksColumn(column=row[0], dtype=row[1], comment=row[2]) for row in rows] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from multiprocessing.context import SpawnContext | ||
import os | ||
import re | ||
from abc import ABC | ||
|
@@ -7,7 +8,6 @@ | |
from contextlib import contextmanager | ||
from dataclasses import dataclass | ||
from typing import Any | ||
from typing import Callable | ||
from typing import cast | ||
from typing import ClassVar | ||
from typing import Dict | ||
|
@@ -21,7 +21,6 @@ | |
from typing import Tuple | ||
from typing import Type | ||
from typing import TYPE_CHECKING | ||
from typing import TypeVar | ||
from typing import Union | ||
|
||
from dbt.adapters.base import AdapterConfig | ||
|
@@ -37,6 +36,11 @@ | |
from dbt.adapters.contracts.connection import Connection | ||
from dbt.adapters.contracts.relation import RelationConfig | ||
from dbt.adapters.contracts.relation import RelationType | ||
from dbt.adapters.databricks.behaviors.columns import ( | ||
GetColumnsBehavior, | ||
GetColumnsByDescribe, | ||
GetColumnsByInformationSchema, | ||
) | ||
from dbt.adapters.databricks.column import DatabricksColumn | ||
from dbt.adapters.databricks.connections import DatabricksConnectionManager | ||
from dbt.adapters.databricks.connections import DatabricksDBTConnection | ||
|
@@ -63,7 +67,7 @@ | |
StreamingTableConfig, | ||
) | ||
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig | ||
from dbt.adapters.databricks.utils import get_first_row | ||
from dbt.adapters.databricks.utils import get_first_row, handle_missing_objects | ||
from dbt.adapters.databricks.utils import redact_credentials | ||
from dbt.adapters.databricks.utils import undefined_proof | ||
from dbt.adapters.relation_configs import RelationResults | ||
|
@@ -73,8 +77,7 @@ | |
from dbt.adapters.spark.impl import KEY_TABLE_STATISTICS | ||
from dbt.adapters.spark.impl import LIST_SCHEMAS_MACRO_NAME | ||
from dbt.adapters.spark.impl import SparkAdapter | ||
from dbt.adapters.spark.impl import TABLE_OR_VIEW_NOT_FOUND_MESSAGES | ||
from dbt_common.exceptions import DbtRuntimeError | ||
from dbt_common.behavior_flags import BehaviorFlag | ||
from dbt_common.utils import executor | ||
from dbt_common.utils.dict import AttrDict | ||
|
||
|
@@ -90,6 +93,15 @@ | |
SHOW_VIEWS_MACRO_NAME = "show_views" | ||
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments" | ||
|
||
USE_INFO_SCHEMA_FOR_COLUMNS = BehaviorFlag( | ||
name="use_info_schema_for_columns", | ||
default=False, | ||
description=( | ||
"Use info schema to gather column information to ensure complex types are not truncated." | ||
" Incurs some overhead, so disabled by default." | ||
), | ||
) # type: ignore[typeddict-item] | ||
|
||
|
||
@dataclass | ||
class DatabricksConfig(AdapterConfig): | ||
|
@@ -116,26 +128,6 @@ class DatabricksConfig(AdapterConfig): | |
merge_with_schema_evolution: Optional[bool] = None | ||
|
||
|
||
def check_not_found_error(errmsg: str) -> bool: | ||
new_error = "[SCHEMA_NOT_FOUND]" in errmsg | ||
old_error = re.match(r".*(Database).*(not found).*", errmsg, re.DOTALL) | ||
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES) | ||
return new_error or old_error is not None or any(found_msgs) | ||
|
||
|
||
T = TypeVar("T") | ||
|
||
|
||
def handle_missing_objects(exec: Callable[[], T], default: T) -> T: | ||
try: | ||
return exec() | ||
except DbtRuntimeError as e: | ||
errmsg = getattr(e, "msg", "") | ||
if check_not_found_error(errmsg): | ||
return default | ||
raise e | ||
|
||
|
||
def get_identifier_list_string(table_names: Set[str]) -> str: | ||
"""Returns `"|".join(table_names)` by default. | ||
|
||
|
@@ -175,6 +167,19 @@ class DatabricksAdapter(SparkAdapter): | |
} | ||
) | ||
|
||
get_column_behavior: GetColumnsBehavior | ||
|
||
def __init__(self, config: Any, mp_context: SpawnContext) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although I don't control the invocation of init, and thus can't pass the behaviors in, doing this during init ensures we only check the behavior flag once. I tried to make this a little more functional but couldn't figure out how to override an existing function definition that is inherited from a parent, hence the goofy class-based strategy. |
||
super().__init__(config, mp_context) | ||
if self.behavior.use_info_schema_for_columns.no_warn: # type: ignore[attr-defined] | ||
self.get_column_behavior = GetColumnsByInformationSchema() | ||
else: | ||
self.get_column_behavior = GetColumnsByDescribe() | ||
|
||
@property | ||
def _behavior_flags(self) -> List[BehaviorFlag]: | ||
return [USE_INFO_SCHEMA_FOR_COLUMNS] | ||
|
||
# override/overload | ||
def acquire_connection( | ||
self, name: Optional[str] = None, query_header_context: Any = None | ||
|
@@ -388,26 +393,7 @@ def parse_describe_extended( # type: ignore[override] | |
def get_columns_in_relation( # type: ignore[override] | ||
self, relation: DatabricksRelation | ||
) -> List[DatabricksColumn]: | ||
rows = list( | ||
handle_missing_objects( | ||
lambda: self.execute_macro( | ||
GET_COLUMNS_COMMENTS_MACRO_NAME, kwargs={"relation": relation} | ||
), | ||
AttrDict(), | ||
) | ||
) | ||
|
||
columns = [] | ||
for row in rows: | ||
if row["col_name"].startswith("#"): | ||
break | ||
columns.append( | ||
DatabricksColumn( | ||
column=row["col_name"], dtype=row["data_type"], comment=row["comment"] | ||
) | ||
) | ||
|
||
return columns | ||
return self.get_column_behavior.get_columns_in_relation(self, relation) | ||
|
||
def _get_updated_relation( | ||
self, relation: DatabricksRelation | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,25 @@ | |
{% do return(load_result('get_columns_comments').table) %} | ||
{% endmacro %} | ||
|
||
{% macro get_columns_comments_via_information_schema(relation) -%} | ||
{% call statement('repair_table', fetch_result=False) -%} | ||
REPAIR TABLE {{ relation|lower }} SYNC METADATA | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure information_schema is up to date prior to using this method. |
||
{% endcall %} | ||
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%} | ||
select | ||
column_name, | ||
full_data_type, | ||
comment | ||
from `system`.`information_schema`.`columns` | ||
where | ||
table_catalog = '{{ relation.database|lower }}' and | ||
table_schema = '{{ relation.schema|lower }}' and | ||
table_name = '{{ relation.identifier|lower }}' | ||
{% endcall %} | ||
|
||
{% do return(load_result('get_columns_comments_via_information_schema').table) %} | ||
{% endmacro %} | ||
|
||
{% macro databricks__persist_docs(relation, model, for_relation, for_columns) -%} | ||
{%- if for_relation and config.persist_relation_docs() and model.description %} | ||
{% do alter_table_comment(relation, model) %} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,4 +15,4 @@ types-requests | |
types-mock | ||
pre-commit | ||
|
||
dbt-tests-adapter~=1.8.0 | ||
dbt-tests-adapter>=1.8.0, <2.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
databricks-sql-connector>=3.4.0, <3.5.0 | ||
dbt-spark~=1.8.0 | ||
dbt-core>=1.8.0, <2.0 | ||
dbt-adapters>=1.3.0, <2.0 | ||
dbt-core>=1.8.7, <2.0 | ||
dbt-common>=1.10.0, <2.0 | ||
dbt-adapters>=1.7.0, <2.0 | ||
databricks-sdk==0.17.0 | ||
keyring>=23.13.0 | ||
protobuf<5.0.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
base_model = """ | ||
select struct('a', 1, 'b', 'b', 'c', ARRAY(1,2,3)) as struct_col, 'hello' as str_col | ||
""" | ||
|
||
schema = """ | ||
version: 2 | ||
models: | ||
- name: base_model | ||
config: | ||
materialized: table | ||
columns: | ||
- name: struct_col | ||
- name: str_col | ||
""" | ||
|
||
view_schema = """ | ||
version: 2 | ||
models: | ||
- name: base_model | ||
config: | ||
materialized: view | ||
columns: | ||
- name: struct_col | ||
- name: str_col | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to prevent circular dependency.