Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for incremental predicates in delete+insert strategy #86

Merged
merged 4 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
test:
strategy:
matrix:
python: ['3.7', '3.8', '3.9','3.10', '3.11']
python: ['3.7.16', '3.8', '3.9','3.10', '3.11']
sc250072 marked this conversation as resolved.
Show resolved Hide resolved
runs-on: macos-12
name: Functional test
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

{% macro teradata__get_incremental_sql(strategy, target_relation, tmp_relation, unique_key, dest_columns,incremental_predicates) %}
{% if strategy == 'delete+insert' %}
{% do return(teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% do return(teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, incremental_predicates)) %}
{% elif strategy == 'append' %}
{% do return(teradata__get_incremental_append_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'merge' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-- {#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = teradata__validate_get_incremental_strategy(config) %}

{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,38 @@
{% endmacro %}


{% macro teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
{% macro teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, incremental_predicates) %}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- if unique_key is not none -%}
DELETE
FROM {{ target_relation }}
WHERE ({{ unique_key }}) IN (
SELECT ({{ unique_key }})
FROM {{ tmp_relation }}
);
{% if unique_key is sequence and unique_key is not string %}
delete from {{target_relation }}
where (
{% for key in unique_key %}
{{ tmp_relation }}.{{ key }} = {{ target_relation }}.{{ key }}
{{ "and " if not loop.last}}
{% endfor %}
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %}
);
{% else %}
DELETE
FROM {{ target_relation }}
WHERE ({{ unique_key }}) IN (
SELECT ({{ unique_key }})
FROM {{ tmp_relation }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};

{% endif %}

{%- endif %}

INSERT INTO {{ target_relation }} ({{ dest_cols_csv }})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pytest

from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates

models__delete_insert_incremental_predicates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id'
) }}

{% if not is_incremental() %}

select * from (select 1 as id, 'goodbye' as msg, 'purple' as color) as "dual"
union all
select * from (select 2 as id, 'anyway' as msg, 'red' as color) as "dual"
union all
select * from (select 3 as id, 'hey' as msg, 'blue' as color) as "dual"

{% else %}

-- delete will not happen on the above record where id = 2, so new record will be inserted instead
select * from (select 1 as id, 'goodbye' as msg, 'purple' as color) as "dual"
union all
select * from (select 2 as id, 'yo' as msg, 'green' as color) as "dual"
union all
select * from (select 3 as id, 'hi' as msg, 'blue' as color) as "dual"

{% endif %}
"""

seeds__expected_delete_insert_incremental_predicates_csv = """id,msg,color
1,goodbye,purple
2,anyway,red
2,yo,green
3,hi,blue
"""

class TestIncrementalPredicatesDeleteInsertTeradata(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_predicates": [
"id <> 2"
],
"+incremental_strategy": "delete+insert"
}
}
pass


class TestPredicatesDeleteInsertTeradata(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+predicates": [
"id <> 2"
],
"+incremental_strategy": "delete+insert"
}
}