-
Notifications
You must be signed in to change notification settings - Fork 238
/
Copy pathsnapshot.sql
197 lines (148 loc) · 7.16 KB
/
snapshot.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
{% macro spark__snapshot_hash_arguments(args) -%}
md5({%- for arg in args -%}
coalesce(cast({{ arg }} as string ), '')
{% if not loop.last %} || '|' || {% endif %}
{%- endfor -%})
{%- endmacro %}
{% macro spark__snapshot_string_as_time(timestamp) -%}
{%- set result = "to_timestamp('" ~ timestamp ~ "')" -%}
{{ return(result) }}
{%- endmacro %}
{% macro spark__snapshot_merge_sql(target, source, insert_cols) -%}
merge into {{ target }} as DBT_INTERNAL_DEST
{% if target.is_iceberg %}
{# create view only supports a name (no catalog, or schema) #}
using {{ source.identifier }} as DBT_INTERNAL_SOURCE
{% else %}
using {{ source }} as DBT_INTERNAL_SOURCE
{% endif %}
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert *
;
{% endmacro %}
{% macro spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}
{% if target_relation.is_iceberg %}
{# iceberg catalog does not support create view, but regular spark does. We removed the catalog and schema #}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=none,
database=none,
type='view') -%}
{% else %}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=none,
type='view') -%}
{% endif %}
{% set select = snapshot_staging_table(strategy, sql, target_relation) %}
{# needs to be a non-temp view so that its columns can be ascertained via `describe` #}
{% call statement('build_snapshot_staging_relation') %}
{{ create_view_as(tmp_relation, select) }}
{% endcall %}
{% do return(tmp_relation) %}
{% endmacro %}
{% macro spark__post_snapshot(staging_relation) %}
{% do adapter.drop_relation(staging_relation) %}
{% endmacro %}
{% macro spark__create_columns(relation, columns) %}
{% if columns|length > 0 %}
{% call statement() %}
alter table {{ relation }} add columns (
{% for column in columns %}
`{{ column.name }}` {{ column.data_type }} {{- ',' if not loop.last -}}
{% endfor %}
);
{% endcall %}
{% endif %}
{% endmacro %}
{% materialization snapshot, adapter='spark' %}
{%- set config = model['config'] -%}
{%- set target_table = model.get('alias', model.get('name')) -%}
{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
{%- set file_format = config.get('file_format', 'parquet') -%}
{%- set grant_config = config.get('grants') -%}
{% set target_relation_exists, target_relation = get_or_create_relation(
database=none,
schema=model.schema,
identifier=target_table,
type='table') -%}
{%- if file_format not in ['delta', 'iceberg', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
{%- if target_relation_exists -%}
{%- if not target_relation.is_delta and not target_relation.is_iceberg and not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'iceberg' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
{% endif %}
{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.schema) %}
{% endif %}
{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{% if not target_relation_exists %}
{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}
{% else %}
{{ adapter.valid_snapshot_target(target_relation) }}
{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% do create_columns(target_relation, missing_columns) %}
{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}
{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}
{% endif %}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}
{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
{% do persist_docs(target_relation, model) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}