Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
test:
strategy:
matrix:
python: ['3.7', '3.8', '3.9','3.10', '3.11']
python: ['3.7.16', '3.8', '3.9','3.10', '3.11']
runs-on: macos-12
name: Functional test
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

{% macro teradata__get_incremental_sql(strategy, target_relation, tmp_relation, unique_key, dest_columns,incremental_predicates) %}
{% if strategy == 'delete+insert' %}
{% do return(teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% do return(teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, incremental_predicates)) %}
{% elif strategy == 'append' %}
{% do return(teradata__get_incremental_append_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'merge' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-- {#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = teradata__validate_get_incremental_strategy(config) %}

{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,38 @@
{% endmacro %}


{% macro teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
{% macro teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, incremental_predicates) %}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- if unique_key is not none -%}
DELETE
FROM {{ target_relation }}
WHERE ({{ unique_key }}) IN (
SELECT ({{ unique_key }})
FROM {{ tmp_relation }}
);
{% if unique_key is sequence and unique_key is not string %}
delete from {{target_relation }}
where (
{% for key in unique_key %}
{{ tmp_relation }}.{{ key }} = {{ target_relation }}.{{ key }}
{{ "and " if not loop.last}}
{% endfor %}
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %}
);
{% else %}
DELETE
FROM {{ target_relation }}
WHERE ({{ unique_key }}) IN (
SELECT ({{ unique_key }})
FROM {{ tmp_relation }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};

{% endif %}

{%- endif %}

INSERT INTO {{ target_relation }} ({{ dest_cols_csv }})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pytest

from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates

models__delete_insert_incremental_predicates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id'
) }}

{% if not is_incremental() %}

select * from (select 1 as id, 'goodbye' as msg, 'purple' as color) as "dual"
union all
select * from (select 2 as id, 'anyway' as msg, 'red' as color) as "dual"
union all
select * from (select 3 as id, 'hey' as msg, 'blue' as color) as "dual"

{% else %}

-- delete will not happen on the above record where id = 2, so new record will be inserted instead
select * from (select 1 as id, 'goodbye' as msg, 'purple' as color) as "dual"
union all
select * from (select 2 as id, 'yo' as msg, 'green' as color) as "dual"
union all
select * from (select 3 as id, 'hi' as msg, 'blue' as color) as "dual"

{% endif %}
"""

seeds__expected_delete_insert_incremental_predicates_csv = """id,msg,color
1,goodbye,purple
2,anyway,red
2,yo,green
3,hi,blue
"""

class TestIncrementalPredicatesDeleteInsertTeradata(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_predicates": [
"id <> 2"
],
"+incremental_strategy": "delete+insert"
}
}
pass


class TestPredicatesDeleteInsertTeradata(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+predicates": [
"id <> 2"
],
"+incremental_strategy": "delete+insert"
}
}