Skip to content

Commit

Permalink
streamlining
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Oct 2, 2024
1 parent 3c0e253 commit 42e82d3
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 60 deletions.
21 changes: 14 additions & 7 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from dbt_common.utils import executor
from dbt_common.utils.dict import AttrDict
from dbt_common.exceptions import DbtConfigError
from dbt_common.contracts.config.base import BaseConfig

if TYPE_CHECKING:
from agate import Row
Expand Down Expand Up @@ -184,18 +185,24 @@ def _behavior_flags(self) -> List[BehaviorFlag]:
return [USE_INFO_SCHEMA_FOR_COLUMNS]

@available.parse(lambda *a, **k: 0)
def check_iceberg(self, table_format: TableFormat, file_format: str, materialized: str) -> bool:
if table_format == TableFormat.ICEBERG:
def update_tblproperties_for_iceberg(
self, config: BaseConfig, tblproperties: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
result = tblproperties or config.get("tblproperties", {})
if config.get("table_format") == TableFormat.ICEBERG:
if self.compare_dbr_version(14, 3) < 0:
raise DbtConfigError("Iceberg support requires Databricks Runtime 14.3 or later.")
if file_format and file_format != "delta":
raise DbtConfigError("When table_format is 'iceberg', cannot set file_format.")
if materialized not in ("incremental", "table"):
if config.get("file_format", "delta") != "delta":
raise DbtConfigError(
"When table_format is 'iceberg', cannot set file_format to other than delta."
)
if config.get("materialized") not in ("incremental", "table"):
raise DbtConfigError(
"When table_format is 'iceberg', materialized must be 'incremental' or 'table'."
)
return True
return False
result["delta.enableIcebergCompatV2"] = "true"
result["delta.universalFormat.enabledFormats"] = "iceberg"
return result

# override/overload
def acquire_connection(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
{% macro get_create_sql_tblproperties(tblproperties) %}
{%- set _ = adapter.check_iceberg(config.get('table_format'), config.get('file_format'), model.config.materialized) -%}
{%- if tblproperties and tblproperties|length>0 -%}
TBLPROPERTIES (
{%- for prop in tblproperties -%}
'{{ prop }}' = '{{ tblproperties[prop] }}'{%- if not loop.last -%}, {% endif -%}
{% endfor -%}
)
{%- endif -%}
{{ databricks__tblproperties_clause(tblproperties)}}
{% endmacro %}
21 changes: 5 additions & 16 deletions dbt/include/databricks/macros/relations/tblproperties.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
{{ return(adapter.dispatch('tblproperties_clause', 'dbt')()) }}
{%- endmacro -%}

{% macro databricks__tblproperties_clause() -%}
{%- set tblproperties = config.get('tblproperties', {}) -%}
{%- set is_iceberg = adapter.check_iceberg(config.get('table_format'), config.get('file_format'), model.config.materialized) -%}
{%- if is_iceberg -%}
{%- set _ = tblproperties.update({'delta.enableIcebergCompatV2': 'true', 'delta.universalFormat.enabledFormats': 'iceberg'}) -%}
{%- endif -%}
{% macro databricks__tblproperties_clause(tblproperties=None) -%}
{%- set tblproperties = adapter.update_tblproperties_for_iceberg(config, tblproperties) -%}
{%- if tblproperties != {} %}
tblproperties (
{%- for prop in tblproperties -%}
Expand All @@ -18,17 +14,10 @@
{%- endmacro -%}

{% macro apply_tblproperties(relation, tblproperties) -%}
{% if tblproperties %}
{%- set is_iceberg = adapter.check_iceberg(config.get('table_format'), config.get('file_format'), model.config.materialized) -%}
{%- if is_iceberg -%}
{%- set _ = tblproperties.update({'delta.enableIcebergCompatV2': 'true', 'delta.universalFormat.enabledFormats': 'iceberg'}) -%}
{%- endif -%}
{% set tblproperty_statment = databricks__tblproperties_clause(tblproperties) %}
{% if tblproperty_statment %}
{%- call statement('apply_tblproperties') -%}
ALTER {{ relation.type }} {{ relation }} SET TBLPROPERTIES (
{% for tblproperty in tblproperties -%}
'{{ tblproperty }}' = '{{ tblproperties[tblproperty] }}' {%- if not loop.last %}, {% endif -%}
{%- endfor %}
)
ALTER {{ relation.type }} {{ relation }} SET {{ tblproperty_statment}}
{%- endcall -%}
{% endif %}
{%- endmacro -%}
21 changes: 21 additions & 0 deletions tests/functional/adapter/iceberg/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,24 @@
}}
select 1 as id
"""

invalid_iceberg_view = """
{{
config(
materialized = "view",
table_format = "iceberg",
)
}}
select 1 as id
"""

invalid_iceberg_format = """
{{
config(
materialized = "table",
table_format = "iceberg",
file_format = "parquet",
)
}}
select 1 as id
"""
21 changes: 21 additions & 0 deletions tests/functional/adapter/iceberg/test_iceberg_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from tests.functional.adapter.iceberg import fixtures
from dbt.tests import util
from dbt.artifacts.schemas.results import RunStatus


@pytest.mark.skip_profile("databricks_cluster")
Expand Down Expand Up @@ -33,3 +34,23 @@ def test_iceberg_swaps(self, project):
util.write_file(fixtures.basic_incremental_swap, "models", "first_model.sql")
run_results = util.run_dbt()
assert len(run_results) == 1


class InvalidIcebergConfig:
def test_iceberg_failures(self, project):
results = util.run_dbt(expect_pass=False)
assert results.results[0].status == RunStatus.Error


@pytest.mark.skip_profile("databricks_cluster")
class TestIcebergView(InvalidIcebergConfig):
@pytest.fixture(scope="class")
def models(self):
return {"first_model.sql": fixtures.invalid_iceberg_view}


@pytest.mark.skip_profile("databricks_cluster")
class TestIcebergWithParquet(InvalidIcebergConfig):
@pytest.fixture(scope="class")
def models(self):
return {"first_model.sql": fixtures.invalid_iceberg_format}
40 changes: 11 additions & 29 deletions tests/unit/macros/relations/test_table_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def context(self, template) -> dict:
mocking macros.
If you need to mock a macro, see the use of is_incremental in default_context.
"""
template.globals["adapter"].check_iceberg.return_value = False
template.globals["adapter"].update_tblproperties_for_iceberg.return_value = {}
return template.globals

def render_create_table_as(self, template_bundle, temporary=False, sql="select 1"):
Expand All @@ -41,7 +41,10 @@ def test_macros_create_table_as(self, template_bundle):
assert sql == f"create or replace table {template_bundle.relation} using delta as select 1"

def test_macros_create_table_as_with_iceberg(self, template_bundle):
template_bundle.context["adapter"].check_iceberg.return_value = True
template_bundle.context["adapter"].update_tblproperties_for_iceberg.return_value = {
"delta.enableIcebergCompatV2": "true",
"delta.universalFormat.enabledFormats": "iceberg",
}
sql = self.render_create_table_as(template_bundle)
assert sql == (
f"create or replace table {template_bundle.relation} using delta"
Expand Down Expand Up @@ -187,39 +190,16 @@ def test_macros_create_table_as_comment(self, config, template_bundle):

assert sql == expected

def test_macros_create_table_as_tblproperties(self, config, template_bundle):
config["tblproperties"] = {"delta.appendOnly": "true"}
sql = self.render_create_table_as(template_bundle)

expected = (
f"create or replace table {template_bundle.relation} "
"using delta tblproperties ('delta.appendOnly' = 'true' ) as select 1"
)

assert sql == expected

def test_macros_create_table_as_tblproperties_with_iceberg(self, config, template_bundle):
config["tblproperties"] = {"delta.appendOnly": "true"}
template_bundle.context["adapter"].check_iceberg.return_value = True
sql = self.render_create_table_as(template_bundle)

expected = (
f"create or replace table {template_bundle.relation} "
"using delta tblproperties ('delta.appendOnly' = 'true' , "
"'delta.enableIcebergCompatV2' = 'true' , "
"'delta.universalFormat.enabledFormats' = 'iceberg' ) as select 1"
)

assert sql == expected

def test_macros_create_table_as_all_delta(self, config, template_bundle):
config["location_root"] = "/mnt/root"
config["partition_by"] = ["partition_1", "partition_2"]
config["liquid_clustered_by"] = ["cluster_1", "cluster_2"]
config["clustered_by"] = ["cluster_1", "cluster_2"]
config["buckets"] = "1"
config["persist_docs"] = {"relation": True}
config["tblproperties"] = {"delta.appendOnly": "true"}
template_bundle.context["adapter"].update_tblproperties_for_iceberg.return_value = {
"delta.appendOnly": "true"
}
template_bundle.context["model"].description = "Description Test"

config["file_format"] = "delta"
Expand All @@ -245,7 +225,9 @@ def test_macros_create_table_as_all_hudi(self, config, template_bundle):
config["clustered_by"] = ["cluster_1", "cluster_2"]
config["buckets"] = "1"
config["persist_docs"] = {"relation": True}
config["tblproperties"] = {"delta.appendOnly": "true"}
template_bundle.context["adapter"].update_tblproperties_for_iceberg.return_value = {
"delta.appendOnly": "true"
}
template_bundle.context["model"].description = "Description Test"

config["file_format"] = "hudi"
Expand Down

0 comments on commit 42e82d3

Please sign in to comment.