From c8a9532cc564c9a93600738040cb11245b4a075f Mon Sep 17 00:00:00 2001 From: Christian Bosdorf Date: Mon, 22 Aug 2022 14:28:02 +0200 Subject: [PATCH] Add support for Trino --- .circleci/config.yml | 14 ++++- integration_tests/ci/sample.profiles.yml | 11 ++++ integration_tests/dbt_project.yml | 4 ++ .../macros/plugins/trino/prep_external.sql | 19 +++++++ .../models/plugins/trino/trino_external.yml | 54 +++++++++++++++++++ .../plugins/trino/create_external_table.sql | 41 ++++++++++++++ .../plugins/trino/get_external_build_plan.sql | 19 +++++++ macros/plugins/trino/helpers/dropif.sql | 6 +++ .../plugins/trino/refresh_external_tables.sql | 13 +++++ run_test.sh | 4 ++ sample_sources/trino.yml | 25 +++++++++ 11 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 integration_tests/macros/plugins/trino/prep_external.sql create mode 100644 integration_tests/models/plugins/trino/trino_external.yml create mode 100644 macros/plugins/trino/create_external_table.sql create mode 100644 macros/plugins/trino/get_external_build_plan.sql create mode 100644 macros/plugins/trino/helpers/dropif.sql create mode 100644 macros/plugins/trino/refresh_external_tables.sql create mode 100644 sample_sources/trino.yml diff --git a/.circleci/config.yml b/.circleci/config.yml index ede02ac..080eb72 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -5,7 +5,18 @@ orbs: azure-cli: circleci/azure-cli@1.1.0 jobs: - + + integration-trino: + docker: + - image: cimg/python:3.9.9 + steps: + - checkout + - run: + name: "Run Tests - Trino" + command: ./run_test.sh trino + - store_artifacts: + path: ./logs + integration-redshift: docker: - image: cimg/python:3.9.9 @@ -111,6 +122,7 @@ workflows: - integration-snowflake - integration-bigquery - integration-databricks + - integration-trino #- integration-synapse #- integration-azuresql: # requires: diff --git a/integration_tests/ci/sample.profiles.yml b/integration_tests/ci/sample.profiles.yml index 0bd2127..893ec50 100644 --- a/integration_tests/ci/sample.profiles.yml +++ b/integration_tests/ci/sample.profiles.yml @@ -10,6 +10,17 @@ integration_tests: target: postgres outputs: + trino: + type: trino + method: none + user: "{{ env_var('TRINO_TEST_USER') }}" + password: "{{ env_var('TRINO_TEST_PASSWORD') }}" + host: "{{ env_var('TRINO_TEST_HOST') }}" + port: "{{ env_var('TRINO_TEST_PORT') | as_number }}" + database: "{{ env_var('TRINO_TEST_DBNAME') }}" + schema: dbt_external_tables_integration_tests_trino + threads: 1 + redshift: type: redshift host: "{{ env_var('REDSHIFT_TEST_HOST') }}" diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 42cd38c..1e1c98b 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -27,6 +27,8 @@ seeds: sources: dbt_external_tables_integration_tests: plugins: + trino: + +enabled: "{{ target.type == 'trino' }}" redshift: +enabled: "{{ target.type == 'redshift' }}" snowflake: @@ -43,6 +45,8 @@ sources: tests: dbt_external_tables_integration_tests: plugins: + trino: + +enabled: "{{ target.type == 'trino' }}" redshift: +enabled: "{{ target.type == 'redshift' }}" snowflake: diff --git a/integration_tests/macros/plugins/trino/prep_external.sql b/integration_tests/macros/plugins/trino/prep_external.sql new file mode 100644 index 0000000..9195299 --- /dev/null +++ b/integration_tests/macros/plugins/trino/prep_external.sql @@ -0,0 +1,19 @@ +{% macro trino__prep_external() %} + + {% set external_schema = target.schema %} + {% set external_schema_location = 's3://test_bucket/external_schema.db' %} + + {% set create_external_schema %} + + create schema if not exists + hive.{{ external_schema }} + with ( + location = {{ external_schema_location }} + ) + + {% endset %} + + {% do log('Creating external schema ' ~ external_schema, info = true) %} + {% do run_query(create_external_schema) %} + +{% endmacro %} diff --git a/integration_tests/models/plugins/trino/trino_external.yml b/integration_tests/models/plugins/trino/trino_external.yml new file mode 100644 index 0000000..6df9aef --- /dev/null +++ b/integration_tests/models/plugins/trino/trino_external.yml @@ -0,0 +1,54 @@ +version: 2 + +sources: + - name: trino_external + schema: "{{ target.schema }}" + tables: + - name: people_csv_unpartitioned + external: &csv-people + location: "s3://dbt-external-tables-testing/csv/" + file_format: CSV + table_properties: + skip_header_line_count: 1 + columns: &cols-of-the-people + - name: id + data_type: int + - name: first_name + data_type: varchar(64) + - name: last_name + data_type: varchar(64) + - name: email + data_type: varchar(64) + tests: &equal-to-the-people + - dbt_utils.equality: + compare_model: ref('people') + compare_columns: + - id + - first_name + - last_name + - email + + - name: people_csv_partitioned + external: + <<: *csv-people + partitions: &parts-of-the-people + - name: section + data_type: varchar(1) + columns: *cols-of-the-people + tests: *equal-to-the-people + + # ensure that all partitions are created + - name: people_csv_multipartitioned + external: + <<: *csv-people + location: "s3://dbt-external-tables-testing/" + partitions: + - name: file_format + data_type: varchar(4) + - name: section + data_type: varchar(1) + - name: some_date + data_type: date + - name: file_name + data_type: varchar(10) + columns: *cols-of-the-people diff --git a/macros/plugins/trino/create_external_table.sql b/macros/plugins/trino/create_external_table.sql new file mode 100644 index 0000000..ff97637 --- /dev/null +++ b/macros/plugins/trino/create_external_table.sql @@ -0,0 +1,41 @@ +{% macro trino__create_external_table(source_node) %} +{%- set columns = source_node.columns.values() -%} +{%- set external = source_node.external -%} + +create table {{source(source_node.source_name, source_node.name)}} ( + + {% for column in columns %} + {{column.name}} {{column.data_type}} {{- ',' if not loop.last -}} + {% endfor %} + {%- if external.partitions %} {{- ',' }} + {% for partition in external.partitions -%} + {{ partition.name }} {{ partition.data_type }} {{- ',' if not loop.last }} + {% endfor %} + {%- endif %} + +) +{% if external.comment -%} comment '{{external.comment}}' {%- endif %} +with ( + + external_location = '{{external.location}}' + + {%- if external.file_format %} {{- ',' }} + format = '{{external.file_format}}' + {%- endif -%} + + {%- if external.partitions %} {{- ',' }} + partitioned_by = ARRAY[ + {%- for partition in external.partitions -%} + '{{ partition.name }}' {{- ', ' if not loop.last }} + {%- endfor -%} + ] + {%- endif -%} + + {%- if external.table_properties %} {{- ',' }} + {% for name, value in external.table_properties.items() -%} + {{ name }}={{ '{!r}'.format(value) }} {{- ',' if not loop.last }} + {% endfor -%} + {%- endif %} + +) +{% endmacro %} diff --git a/macros/plugins/trino/get_external_build_plan.sql b/macros/plugins/trino/get_external_build_plan.sql new file mode 100644 index 0000000..5b5fadd --- /dev/null +++ b/macros/plugins/trino/get_external_build_plan.sql @@ -0,0 +1,19 @@ +{% macro trino__get_external_build_plan(source_node) %} + {% set build_plan = [] %} + {% set old_relation = adapter.get_relation( + database = source_node.database, + schema = source_node.schema, + identifier = source_node.identifier + ) %} + {% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %} + {% if create_or_replace %} + {% set build_plan = [ + dbt_external_tables.dropif(source_node), + dbt_external_tables.create_external_table(source_node) + ] + dbt_external_tables.refresh_external_table(source_node) + %} + {% else %} + {% set build_plan = dbt_external_tables.refresh_external_table(source_node) %} + {% endif %} + {% do return(build_plan) %} +{% endmacro %} diff --git a/macros/plugins/trino/helpers/dropif.sql b/macros/plugins/trino/helpers/dropif.sql new file mode 100644 index 0000000..1d55fcd --- /dev/null +++ b/macros/plugins/trino/helpers/dropif.sql @@ -0,0 +1,6 @@ +{% macro trino__dropif(node) %} + {% set ddl %} + drop table if exists {{source(node.source_name, node.name)}} + {% endset %} + {{return(ddl)}} +{% endmacro %} diff --git a/macros/plugins/trino/refresh_external_tables.sql b/macros/plugins/trino/refresh_external_tables.sql new file mode 100644 index 0000000..12fcf3b --- /dev/null +++ b/macros/plugins/trino/refresh_external_tables.sql @@ -0,0 +1,13 @@ +{% macro trino__refresh_external_table(source_node) %} + {%- set partitions = source_node.external.partitions -%} + {%- if partitions -%} + {% set drop_partitions -%} + call system.sync_partition_metadata('{{ source_node.source_name }}', '{{ source_node.name }}', 'DROP', false) + {%- endset %} + {% set add_partitions -%} + call system.sync_partition_metadata('{{ source_node.source_name }}', '{{ source_node.name }}', 'ADD', false) + {%- endset %} + {{ return([drop_partitions, add_partitions]) }} + {% endif %} + {% do return([]) %} +{% endmacro %} diff --git a/run_test.sh b/run_test.sh index f6be5a6..9e1facc 100755 --- a/run_test.sh +++ b/run_test.sh @@ -10,6 +10,10 @@ if [[ ! -f $VENV ]]; then then echo "Installing dbt-spark" pip install dbt-spark[ODBC] --upgrade --pre + elif [ $1 == 'trino' ] + then + echo "Installing dbt-trino" + pip install dbt-trino --upgrade --pre elif [ $1 == 'azuresql' ] then echo "Installing dbt-sqlserver" diff --git a/sample_sources/trino.yml b/sample_sources/trino.yml new file mode 100644 index 0000000..13eb06b --- /dev/null +++ b/sample_sources/trino.yml @@ -0,0 +1,25 @@ +version: 2 + +sources: + - name: trino + tables: + - name: snowplow + description: "Snowplow events stored as CSV files in HDFS" + external: + location: "s3a://.../event.csv" # s3://, hdfs://, ... + file_format: CSV # file format: CSV, ORC, PARQUET, ... + table_properties: # as needed + skip_header_line_count: 1 + csv_separator: ";" + csv_quote: '"' + + columns: + - name: app_id + data_type: varchar + description: "Application ID" + - name: domain_sessionidx + data_type: int + description: "A visit / session index" + - name: etl_tstamp + data_type: timestamp + description: "Timestamp event began ETL"