Skip to content

Commit

Permalink
Behavior for external path (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Oct 11, 2024
1 parent 0e821b0 commit 00dd9f8
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Allow for the use of custom constraints, using the `custom` constraint type with an `expression` as the constraint (thanks @roydobbe). ([792](https://github.com/databricks/dbt-databricks/pull/792))
- Add "use_info_schema_for_columns" behavior flag to turn on use of information_schema to get column info where possible. This may have more latency but will not truncate complex data types the way that 'describe' can. ([808](https://github.com/databricks/dbt-databricks/pull/808))
- Add support for table_format: iceberg. This uses UniForm under the hood to provide iceberg compatibility for tables or incrementals. ([815](https://github.com/databricks/dbt-databricks/pull/815))
- Add `include_full_name_in_path` config boolean for external locations. This writes tables to {location_root}/{catalog}/{schema}/{table} ([823](https://github.com/databricks/dbt-databricks/pull/823))
- Add a new `workflow_job` submission method for python, which creates a long-lived Databricks Workflow instead of a one-time run (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))

Expand Down
20 changes: 20 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class DatabricksConfig(AdapterConfig):
file_format: str = "delta"
table_format: TableFormat = TableFormat.DEFAULT
location_root: Optional[str] = None
include_full_name_in_path: bool = False
partition_by: Optional[Union[List[str], str]] = None
clustered_by: Optional[Union[List[str], str]] = None
liquid_clustered_by: Optional[Union[List[str], str]] = None
Expand Down Expand Up @@ -209,6 +210,25 @@ def update_tblproperties_for_iceberg(
result["delta.universalFormat.enabledFormats"] = "iceberg"
return result

@available.parse(lambda *a, **k: 0)
def compute_external_path(
self, config: BaseConfig, model: BaseConfig, is_incremental: bool = False
) -> str:
location_root = config.get("location_root")
database = model.get("database", "hive_metastore")
schema = model.get("schema", "default")
identifier = model.get("alias")
if location_root is None:
raise DbtConfigError("location_root is required for external tables.")
include_full_name_in_path = config.get("include_full_name_in_path", False)
if include_full_name_in_path:
path = os.path.join(location_root, database, schema, identifier)
else:
path = os.path.join(location_root, identifier)
if is_incremental:
path = path + "_tmp"
return path

# override/overload
def acquire_connection(
self, name: Optional[str] = None, query_header_context: Any = None
Expand Down
7 changes: 2 additions & 5 deletions dbt/include/databricks/macros/adapters/python.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ writer.saveAsTable("{{ target_relation }}")
{%- set buckets = config.get('buckets', validator=validation.any[int]) -%}
.format("{{ file_format }}")
{%- if location_root is not none %}
{%- set identifier = model['alias'] %}
{%- if is_incremental() %}
{%- set identifier = identifier + '__dbt_tmp' %}
{%- endif %}
.option("path", "{{ location_root }}/{{ identifier }}")
{%- set model_path = adapter.compute_external_path(config, model, is_incremental()) %}
.option("path", "{{ model_path }}")
{%- endif -%}
{%- if partition_by is not none -%}
{%- if partition_by is string -%}
Expand Down
3 changes: 2 additions & 1 deletion dbt/include/databricks/macros/relations/location.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
{%- set file_format = config.get('file_format', default='delta') -%}
{%- set identifier = model['alias'] -%}
{%- if location_root is not none %}
location '{{ location_root }}/{{ identifier }}'
{%- set model_path = adapter.compute_external_path(config, model, is_incremental()) %}
location '{{ model_path }}'
{%- elif (not relation.is_hive_metastore()) and file_format != 'delta' -%}
{{ exceptions.raise_compiler_error(
'Incompatible configuration: `location_root` must be set when using a non-delta file format with Unity Catalog'
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/adapter/basic/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def project_config_update(self):
"models": {
"+file_format": "parquet",
"+location_root": f"{location_root}/parquet",
"+include_full_name_in_path": "true",
"+incremental_strategy": "append",
},
}
Expand Down Expand Up @@ -61,6 +62,7 @@ def project_config_update(self):
"models": {
"+file_format": "csv",
"+location_root": f"{location_root}/csv",
"+include_full_name_in_path": "true",
"+incremental_strategy": "append",
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def project_config_update(self):
"models": {
"+file_format": "parquet",
"+location_root": f"{location_root}/parquet_append",
"+include_full_name_in_path": "true",
"+incremental_strategy": "append",
},
}
Expand Down Expand Up @@ -129,6 +130,7 @@ def project_config_update(self):
"models": {
"+file_format": "parquet",
"+location_root": f"{location_root}/parquet_insert_overwrite",
"+include_full_name_in_path": "true",
"+incremental_strategy": "insert_overwrite",
},
}
Expand All @@ -144,6 +146,7 @@ def project_config_update(self):
"models": {
"+file_format": "parquet",
"+location_root": f"{location_root}/parquet_insert_overwrite_partitions",
"+include_full_name_in_path": "true",
"+incremental_strategy": "insert_overwrite",
"+partition_by": "id",
},
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/adapter/persist_docs/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
description: 'A seed description'
config:
location_root: '{{ env_var("DBT_DATABRICKS_LOCATION_ROOT") }}'
include_full_name_in_path: true
persist_docs:
relation: True
columns: True
Expand All @@ -22,6 +23,7 @@
description: 'A seed description'
config:
location_root: '/mnt/dbt_databricks/seeds'
include_full_name_in_path: true
persist_docs:
relation: True
columns: True
Expand Down
3 changes: 2 additions & 1 deletion tests/functional/adapter/python_model/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def model(dbt, spark):
marterialized: table
tags: ["python"]
create_notebook: true
location_root: "{root}/{schema}"
include_full_name_in_path: true
location_root: "{{ env_var('DBT_DATABRICKS_LOCATION_ROOT') }}"
columns:
- name: date
tests:
Expand Down
9 changes: 0 additions & 9 deletions tests/functional/adapter/python_model/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,6 @@ def project_config_update(self):
}

def test_expected_handling_of_complex_config(self, project):
unformatted_schema_yml = util.read_file("models", "schema.yml")
util.write_file(
unformatted_schema_yml.replace(
"root", os.environ["DBT_DATABRICKS_LOCATION_ROOT"]
).replace("{schema}", project.test_schema),
"models",
"schema.yml",
)

util.run_dbt(["seed"])
util.run_dbt(["build", "-s", "complex_config"])
util.run_dbt(["build", "-s", "complex_config"])
Expand Down
12 changes: 1 addition & 11 deletions tests/unit/macros/adapters/test_python_macros.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pytest
from jinja2 import Template
from mock import MagicMock

from tests.unit.macros.base import MacroTestBase
Expand Down Expand Up @@ -33,21 +32,12 @@ def test_py_get_writer__specified_file_format(self, config, template):

def test_py_get_writer__specified_location_root(self, config, template, context):
config["location_root"] = "s3://fake_location"
template.globals["adapter"].compute_external_path.return_value = "s3://fake_location/schema"
result = self.run_macro_raw(template, "py_get_writer_options")

expected = '.format("delta")\n.option("path", "s3://fake_location/schema")'
assert result == expected

def test_py_get_writer__specified_location_root_on_incremental(
self, config, template: Template, context
):
config["location_root"] = "s3://fake_location"
context["is_incremental"].return_value = True
result = self.run_macro_raw(template, "py_get_writer_options")

expected = '.format("delta")\n.option("path", "s3://fake_location/schema__dbt_tmp")'
assert result == expected

def test_py_get_writer__partition_by_single_column(self, config, template):
config["partition_by"] = "name"
result = self.run_macro_raw(template, "py_get_writer_options")
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/macros/relations/test_table_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def context(self, template) -> dict:
return template.globals

def render_create_table_as(self, template_bundle, temporary=False, sql="select 1"):
external_path = f"/mnt/root/{template_bundle.relation.identifier}"
template_bundle.template.globals["adapter"].compute_external_path.return_value = (
external_path
)
return self.run_macro(
template_bundle.template,
"databricks__create_table_as",
Expand Down

0 comments on commit 00dd9f8

Please sign in to comment.