Skip to content

Commit 22edebf

Browse files
committed
Add custom materialization for a merge and delete incremental
1 parent 58a0027 commit 22edebf

File tree

1 file changed

+115
-0
lines changed

1 file changed

+115
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
2+
{% macro dbt_snowflake_validate_get_incremental_strategy(config) %}
3+
{#-- Find and validate the incremental strategy #}
4+
{%- set strategy = config.get("incremental_strategy", default="merge") -%}
5+
6+
{% set invalid_strategy_msg -%}
7+
Invalid incremental strategy provided: {{ strategy }}
8+
Expected one of: 'merge', 'delete+insert', 'insert_overwrite'
9+
{%- endset %}
10+
{% if strategy not in ['merge', 'delete+insert', 'insert_overwrite'] %}
11+
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
12+
{% endif %}
13+
14+
{% do return(strategy) %}
15+
{% endmacro %}
16+
17+
{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
18+
{% if strategy == 'merge' %}
19+
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
20+
{% elif strategy == 'delete+insert' %}
21+
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
22+
{% elif strategy == 'insert_overwrite' %}
23+
{% do return(get_insert_overwrite_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
24+
{% else %}
25+
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
26+
{% endif %}
27+
{% endmacro %}
28+
29+
{% macro incremental_validate_delete_target_not_in_source(delete_target_not_in_source, strategy, unique_key, default) %}
30+
31+
{% if not delete_target_not_in_source %}
32+
{{ return(False) }}
33+
{% elif delete_target_not_in_source and strategy not in ['merge', 'delete+insert'] %}
34+
{% do exceptions.raise_compiler_error('invalid strategy for delete_target_not_in_source, must be one of: [merge, delete+insert]') %}
35+
{% elif delete_target_not_in_source and not unique_key %}
36+
{% do exceptions.raise_compiler_error('invalid configuration, must specify a unique_key to when delete_target_not_in_source is set to True') %}
37+
{% else %}
38+
{{ return(True) }}
39+
{% endif %}
40+
41+
{% endmacro %}
42+
43+
{% macro delete_from_target_not_in_source(tmp_relation, target_relation, unique_key) %}
44+
45+
delete from {{ target_relation }} where {{ unique_key }} not in (select {{ unique_key }} from {{ tmp_relation }} );
46+
47+
{% endmacro %}
48+
49+
{% materialization incremental_custom, adapter='snowflake' -%}
50+
51+
{% set original_query_tag = set_query_tag() %}
52+
53+
{%- set unique_key = config.get('unique_key') -%}
54+
{%- set full_refresh_mode = (should_full_refresh()) -%}
55+
56+
{% set target_relation = this %}
57+
{% set existing_relation = load_relation(this) %}
58+
{% set tmp_relation = make_temp_relation(this) %}
59+
60+
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
61+
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
62+
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
63+
{% set delete_target_not_in_source = incremental_validate_delete_target_not_in_source(
64+
delete_target_not_in_source = config.get('delete_target_not_in_source'),
65+
strategy=strategy,
66+
unique_key=unique_key,
67+
default=False
68+
)
69+
%}
70+
71+
{{ run_hooks(pre_hooks) }}
72+
73+
{% if existing_relation is none %}
74+
{% set build_sql = create_table_as(False, target_relation, sql) %}
75+
76+
{% elif existing_relation.is_view %}
77+
{#-- Can't overwrite a view with a table - we must drop --#}
78+
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
79+
{% do adapter.drop_relation(existing_relation) %}
80+
{% set build_sql = create_table_as(False, target_relation, sql) %}
81+
82+
{% elif full_refresh_mode %}
83+
{% set build_sql = create_table_as(False, target_relation, sql) %}
84+
85+
{% else %}
86+
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
87+
{% do adapter.expand_target_column_types(
88+
from_relation=tmp_relation,
89+
to_relation=target_relation) %}
90+
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
91+
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
92+
{% if not dest_columns %}
93+
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
94+
{% endif %}
95+
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
96+
{% set delete_sql = delete_from_target_not_in_source(tmp_relation, target_relation, unique_key) %}
97+
{% endif %}
98+
99+
{%- call statement('main') -%}
100+
{{ build_sql }}
101+
{% if delete_target_not_in_source %}
102+
{{ delete_sql }}
103+
{% endif %}
104+
{%- endcall -%}
105+
106+
{{ run_hooks(post_hooks) }}
107+
108+
{% set target_relation = target_relation.incorporate(type='table') %}
109+
{% do persist_docs(target_relation, model) %}
110+
111+
{% do unset_query_tag(original_query_tag) %}
112+
113+
{{ return({'relations': [target_relation]}) }}
114+
115+
{%- endmaterialization %}

0 commit comments

Comments
 (0)