Skip to content

Commit

Permalink
feat: add support for python models (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsej authored Oct 25, 2022
1 parent 04fa9a7 commit b2f6ad8
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 12 deletions.
40 changes: 37 additions & 3 deletions dbt/adapters/duckdb/impl.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
from dbt.adapters.duckdb import DuckDBConnectionManager
from dbt.adapters.duckdb.connections import DuckDBConnectionManager
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.connection import AdapterResponse
from dbt.exceptions import InternalException, RuntimeException


class DuckDBAdapter(SQLAdapter):
ConnectionManager = DuckDBConnectionManager

@classmethod
def date_function(cls):
def date_function(cls) -> str:
return "now()"

@classmethod
def is_cancelable(cls):
def is_cancelable(cls) -> bool:
return False

def valid_incremental_strategies(self):
"""DuckDB does not currently support MERGE statement."""
return ["append", "delete+insert"]

def commit_if_has_connection(self) -> None:
"""This is just a quick-fix. Python models do not execute begin function so the transaction_open is always false."""
try:
self.connections.commit_if_has_connection()
except InternalException:
pass

def submit_python_job(self, parsed_model: dict, compiled_code: str) -> AdapterResponse:

connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()
con = connection.handle._conn

def load_df_function(table_name: str):
"""
Currently con.table method dos not support fully qualified name - https://github.com/duckdb/duckdb/issues/5038
Can be replaced by con.table, after it is fixed.
"""
return con.query(f"select * from {table_name}")

try:
exec(compiled_code, {}, {"load_df_function": load_df_function, "con": con})
except SyntaxError as err:
raise RuntimeException(
f"Python model has a syntactic error at line {err.lineno}:\n" f"{err}\n"
)
except Exception as err:
raise RuntimeException(f"Python model failed:\n" f"{err}")
return AdapterResponse(_message="OK")
62 changes: 53 additions & 9 deletions dbt/include/duckdb/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,60 @@
{{ return(run_query(sql)) }}
{% endmacro %}

{% macro duckdb__create_table_as(temporary, relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
{% macro duckdb__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- if language == 'sql' -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create {% if temporary: -%}temporary{%- endif %} table
{{ relation.include(database=False, schema=(not temporary)) }}
as (
{{ compiled_code }}
);
{%- elif language == 'python' -%}
{{ py_write_table(temporary=temporary, relation=relation, compiled_code=compiled_code) }}
{%- else -%}
{% do exceptions.raise_compiler_error("duckdb__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}
{% endmacro %}

create {% if temporary: -%}temporary{%- endif %} table
{{ relation.include(database=False, schema=(not temporary)) }}
as (
{{ sql }}
);
{% macro py_write_table(temporary, relation, compiled_code) -%}
# MODEL ----------
{{ compiled_code }}

extend_globals = {
"config": config,
"this": this,
"ref": ref,
"source": source
}

globals().update(extend_globals)

dbt = dbtObj(load_df_function)
df = model(dbt, con)

# make sure pandas exists before using it
try:
import pandas
pandas_available = True
except ImportError:
pandas_available = False

# make sure pyarrow exists before using it
try:
import pyarrow
pyarrow_available = True
except ImportError:
pyarrow_available = False

if pandas_available and isinstance(df, pandas.core.frame.DataFrame):
con.execute('create table {{ relation.include(database=False, schema=(not temporary)) }} as select * from df')
elif pyarrow_available and isinstance(df, pyarrow.Table):
con.execute('create table {{ relation.include(database=False, schema=(not temporary)) }} as select * from df')
else:
raise Exception( str(type(df)) + " is not a supported type for dbt Python materialization")

{% endmacro %}

Expand Down
101 changes: 101 additions & 0 deletions dbt/include/duckdb/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{% materialization incremental, adapter="duckdb", supported_languages=['sql', 'python'] -%}

{%- set language = model['language'] -%}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set temp_relation = make_temp_relation(target_relation)-%}
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, compiled_code, language) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, intermediate_relation, compiled_code, language) %}
{% set need_swap = true %}
{% else %}
{% if language == 'python' %}
{% set build_python = create_table_as(False, temp_relation, compiled_code, language) %}
{% call statement("pre", language=language) %}
{{- build_python }}
{% endcall %}
{% else %} {# SQL #}
{% do run_query(create_table_as(True, temp_relation, compiled_code, language)) %}
{% endif %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{% set language = "sql" %}

{% endif %}

{% call statement("main", language=language) %}
{{- build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
63 changes: 63 additions & 0 deletions dbt/include/duckdb/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{% materialization table, adapter="duckdb", supported_languages=['sql', 'python'] %}

{%- set language = model['language'] -%}

{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') %}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
-- the intermediate_relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
/*
See ../view/view.sql for more information about this relation.
*/
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
-- as above, the backup_relation should not already exist
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}

-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{{ log(compiled_code, True) }}

-- build model
{% call statement('main', language=language) -%}
{{- create_table_as(False, intermediate_relation, compiled_code, language) }}
{%- endcall %}

-- cleanup
{% if existing_relation is not none %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{% do create_indexes(target_relation) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
52 changes: 52 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest
from dbt.tests.adapter.python_model.test_python_model import (
BasePythonIncrementalTests,
BasePythonModelTests,
basic_sql,
m_1,
schema_yml,
second_sql,
)

basic_python = """
def model(dbt, _):
dbt.config(
materialized='table',
)
df = dbt.ref("my_sql_model")
df2 = dbt.source('test_source', 'test_table')
df = df.limit(2)
return df.df()
"""


class TestBasePythonModel(BasePythonModelTests):
@pytest.fixture(scope="class")
def models(self):
return {
"schema.yml": schema_yml,
"my_sql_model.sql": basic_sql,
"my_python_model.py": basic_python,
"second_sql_model.sql": second_sql,
}


incremental_python = """
def model(dbt, session):
dbt.config(materialized="incremental", unique_key='id')
df = dbt.ref("m_1")
if dbt.is_incremental:
# incremental runs should only apply to part of the data
df = df.filter("id > 5")
return df.df()
"""


class TestBasePythonIncremental(BasePythonIncrementalTests):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+incremental_strategy": "delete+insert"}}

@pytest.fixture(scope="class")
def models(self):
return {"m_1.sql": m_1, "incremental.py": incremental_python}

0 comments on commit b2f6ad8

Please sign in to comment.