dbt-deltastream is the official dbt adapter for DeltaStream, a unified streaming and batch processing engine built on Apache Flink. This adapter enables data teams to apply the power and simplicity of dbt's transformation workflows to real-time streaming data pipelines.
- 🔄 Unified Analytics: Combine batch and streaming transformations in a single dbt project
- ⚡ Real-time Insights: Build continuously updating materialized views and streaming pipelines
- 🛠️ Developer Experience: Leverage dbt's familiar SQL-first approach for stream processing
- 🔌 Ecosystem Integration: Connect to Kafka, Kinesis, PostgreSQL, and other data systems
- 📊 Stream as Code: Version control, test, and document your streaming transformations
DeltaStream bridges the gap between traditional data warehousing and real-time stream processing, and this adapter brings dbt's best practices to the streaming world.
Before installing dbt-deltastream, ensure you have:
- Python 3.11 or higher installed on your system
- dbt-core 1.10 or higher (will be installed automatically as a dependency)
- A DeltaStream account with:
- Organization ID (found on settings page of your DeltaStream dashboard)
- API Token (can be generated on the integration page in your DeltaStream dashboard)
📘 New to DeltaStream? Sign up at deltastream.io to get started.
Install dbt-deltastream using pip:
pip install dbt-deltastreamOr with uv for faster installation:
uv init && uv add dbt-deltastreamIf you don't have a dbt project yet:
dbt init my_deltastream_projectWhen prompted for the database, select deltastream.
Edit your ~/.dbt/profiles.yml (or create profiles.yml in your project directory):
my_deltastream_project:
target: dev
outputs:
dev:
type: deltastream
# Required Parameters
token: "{{ env_var('DELTASTREAM_API_TOKEN') }}" # Your API token
organization_id: your-org-id # Your organization ID
database: my_database # Target database name
schema: my_schema # Target schema name
# Optional Parameters
url: https://api.deltastream.io/v2 # API endpoint (default)
timezone: UTC # Timezone (default: UTC)
role: AccountAdmin # User role
compute_pool: default_pool # Compute pool name🔐 Security Tip: Store sensitive credentials in environment variables rather than hardcoding them.
dbt debugYou should see a successful connection to DeltaStream!
Create models/sources.yml to define a source stream from the trial store:
version: 2
sources:
- name: kafka
schema: public
tables:
- name: pageviews
description: "Pageviews stream from trial store"
config:
materialized: stream
parameters:
store: trial_store
topic: pageviews
'value.format': JSON
columns:
- name: viewtime
type: BIGINT
- name: userid
type: VARCHAR
- name: pageid
type: VARCHARRun the operation to create the source stream in DeltaStream:
dbt run-operation create_sourcesThis will create the pageviews stream in DeltaStream based on your YAML configuration.
Create models/user_pageviews.sql:
{{ config(
materialized='materialized_view'
) }}
SELECT
userid,
COUNT(*) as pageview_count,
COUNT(DISTINCT pageid) as unique_pages_viewed
FROM {{ source('kafka', 'pageviews') }}
GROUP BY useriddbt runCongratulations! 🎉 You've created your first streaming transformation with dbt-deltastream. You've set up a source stream from Kafka and created a materialized view that continuously aggregates pageview data.
- ✅ Materializations: Table, View, Materialized View, Incremental, Stream, Changelog
- ✅ Seeds: Load CSV data into existing DeltaStream entities
- ✅ Tests: Data quality tests with dbt's testing framework
- ✅ Documentation: Auto-generate docs with
dbt docs generate - ✅ Sources: Define and test source data
- ✅ Macros: Full Jinja2 support for reusable SQL
- ⏳ Snapshots: Not yet supported (streaming context differs from batch)
This adapter extends dbt with streaming-first materializations:
stream: Pure streaming transformation with continuous processingchangelog: Change data capture (CDC) stream with primary keysmaterialized_view: Continuously updated aggregationstable: Traditional batch table materializationstore: External system connections (Kafka, Kinesis, PostgreSQL, etc.)entity: Entity definitions within storesdatabase: Database resource definitionsfunction: User-defined functions (UDFs) in Javafunction_source: JAR file sources for UDFsdescriptor_source: Protocol buffer schema sourcesschema_registry: Schema registry connections (Confluent, etc.)compute_pool: Dedicated compute resource pools
- 🔄 Automatic Retry Logic: Smart retry for function creation with exponential backoff
- 📁 File Attachment: Seamless JAR and Protocol Buffer file handling
- 🎯 Query Management: Macros to list, terminate, and restart queries
- 🔗 Multi-statement Applications: Execute multiple statements as atomic units
- 🏗️ Infrastructure as Code: Define stores, databases, and compute pools in YAML
Detailed configuration options for profiles.yml:
| Parameter | Description | Example |
|---|---|---|
token |
Authentication token for DeltaStream API | your-api-token |
database |
Target default database name | my_database |
schema |
Target default schema name | public |
organization_id |
Organization identifier | org-12345 |
| Parameter | Description | Default |
|---|---|---|
url |
DeltaStream API URL | https://api.deltastream.io/v2 |
timezone |
Timezone for operations | UTC |
session_id |
Custom session identifier for debugging | Auto-generated |
compute_pool |
Compute pool name for models requiring one | Default pool |
role |
User role | - |
store |
Target default store name | - |
-
Use environment variables for sensitive credentials:
your_profile_name: target: prod outputs: prod: type: deltastream token: "{{ env_var('DELTASTREAM_API_TOKEN') }}" organization_id: "{{ env_var('DELTASTREAM_ORG_ID') }}" # ... other parameters
-
Separate profiles for different environments (dev, staging, prod)
-
Document required environment variables in your project README
-
Use dbt Cloud for secure credential management in production
DeltaStream supports two types of model definitions:
- YAML-only resources for defining infrastructure components
- SQL models for data transformations
These models don't contain SQL SELECT statements but define infrastructure components using YAML configuration. YAML-only resources can be used to define external system connections such as streams, changelogs, and stores. They can be either: managed or unmanaged by dbt DAG.
If you plan to be able to recreate all the infrastructure in different environments and/or use graph operators to execute only the creation of specific resources and downstream transformations, you should use managed resources. Else, it might be simpler to use unmanaged resources to avoid placeholder files.
When a YAML-only resource is managed by dbt DAG, it is automatically included in the DAG by creating them as models, for instance:
version: 2
models:
- name: my_kafka_stream
config:
materialized: stream
parameters:
topic: "user_events"
value.format: "json"
store: "my_kafka_store"In that case, we're running into a dbt limitation where we need to create a placeholder .sql file for the model to be detected. That .sql file would contain any content as long as it doesn't contain a "SELECT". We expect that limitation to be lifted in future dbt versions but it's still part of discussions.
For example, you may create my_kafka_stream.sql with the following content:
-- PlaceholderThen it can be referenced in downstream model using the regular ref function:
SELECT * FROM {{ ref('my_kafka_stream') }}When a YAML-only resource is not managed by dbt DAG, it has to be created as sources, for instance:
version: 2
sources:
- name: kafka
schema: public
tables:
- name: pageviews
description: "Pageviews stream"
config:
materialized: stream
parameters:
topic: pageviews
store: "my_kafka_store"
"value.format": JSON
columns:
- name: viewtime
type: BIGINT
- name: userid
type: VARCHAR
- name: pageid
type: VARCHARThen it requires to execute specific macros to create the resources on demand. To create all sources, run:
dbt run-operation create_sourcesTo create a specific source, run:
dbt run-operation create_source_by_name --args '{source_name: user_events}'Then it can be referenced in downstream model using the regular source function:
SELECT * FROM {{ source('kafka', 'pageviews') }}Following example can be created both as managed (models) or as unmanaged (sources).
version: 2
models:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA # required
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL # required
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards' = 3
- name: my_compute_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size' = 'small',
'compute_pool.timeout_min' = 5
- name: my_function_source
config:
materialized: function_source
parameters:
file: '@/path/to/my-functions.jar'
description: 'Custom utility functions'
- name: my_descriptor_source
config:
materialized: descriptor_source
parameters:
file: '@/path/to/schemas.desc'
description: 'Protocol buffer schemas for data structures'
- name: my_custom_function
config:
materialized: function
parameters:
args:
- name: input_text
type: VARCHAR
returns: VARCHAR
language: JAVA
source.name: 'my_function_source'
class.name: 'com.example.TextProcessor'
- name: my_schema_registry
config:
materialized: schema_registry
parameters:
type: "CONFLUENT",
access_region: "AWS us-east-1",
uris: "https://url.to.schema.registry.listener:8081",
'confluent.username': 'fake_username',
'confluent.password': 'fake_password',
'tls.client.cert_file': '@/path/to/tls/client_cert_file',
'tls.client.key_file': '@/path/to/tls_key'version: 2
sources:
- name: example # source name, not used in DeltaStream but required by dbt for the {{ source("example", "...") }}
tables:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA # required
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL # required
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3
- name: my_compute_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5
- name: my_function_source
config:
materialized: function_source
parameters:
file: '@/path/to/my-functions.jar'
description: 'Custom utility functions'
- name: my_descriptor_source
config:
materialized: descriptor_source
parameters:
file: '@/path/to/schemas.desc'
description: 'Protocol buffer schemas for data structures'
- name: my_custom_function
config:
materialized: function
parameters:
args:
- name: input_text
type: VARCHAR
returns: VARCHAR
language: JAVA
source.name: 'my_function_source'
class.name: 'com.example.TextProcessor'
- name: my_schema_registry
config:
materialized: schema_registry
parameters:
type: "CONFLUENT",
access_region: "AWS us-east-1",
uris: "https://url.to.schema.registry.listener:8081",
'confluent.username': 'fake_username',
'confluent.password': 'fake_password',
'tls.client.cert_file': '@/path/to/tls/client_cert_file',
'tls.client.key_file': '@/path/to/tls_key'These models contain SQL SELECT statements for data transformations.
Creates a continuous streaming transformation:
{{ config(
materialized='stream',
parameters={
'topic': 'purchase_events',
'value.format': 'json'
}
) }}
SELECT
event_time,
user_id,
action
FROM {{ ref('source_stream') }}
WHERE action = 'purchase'Captures changes in the data stream:
{{ config(
materialized='changelog',
parameters={
'topic': 'order_updates',
'value.format': 'json'
}
) }}
SELECT
order_id,
status,
updated_at
FROM {{ ref('orders_stream') }}Creates a traditional batch table:
{{ config(materialized='table') }}
SELECT
date,
SUM(amount) as daily_total
FROM {{ ref('transactions') }}
GROUP BY dateCreates a continuously updated view:
{{ config(materialized='materialized_view') }}
SELECT
product_id,
COUNT(*) as purchase_count
FROM {{ ref('purchase_events') }}
GROUP BY product_idLoad CSV data into existing DeltaStream entities using the seed materialization. Unlike traditional dbt seeds that create new tables, DeltaStream seeds insert data into pre-existing entities.
Seeds must be configured in YAML with the following properties:
Required:
entity: The name of the target entity to insert data into
Optional:
store: The name of the store containing the entity (omit if entity is not in a store)with_params: A dictionary of parameters for the WITH clausequote_columns: Control which columns get quoted. Default:false(no columns quoted). Can be:true: Quote all columnsfalse: Quote no columns (default)string: If set to'*', quote all columnslist: List of column names to quote
With Store (quoting enabled):
# seeds.yml
version: 2
seeds:
- name: user_data_with_store_quoted
config:
entity: 'user_events'
store: 'kafka_store'
with_params:
kafka.topic.retention.ms: '86400000'
partitioned: true
quote_columns: true # Quote all columns- Place CSV files in your
seeds/directory - Configure seeds in YAML with the required
entityparameter - Optionally specify
storeif the entity is in a store - Run
dbt seedto load the data
Important: The target entity must already exist in DeltaStream before running seeds. Seeds only insert data, they do not create entities.
DeltaStream supports user-defined functions (UDFs) and their dependencies through specialized materializations.
The adapter provides seamless file attachment for function sources and descriptor sources:
- Standardized Interface: Common file handling logic for both function sources and descriptor sources
- Path Resolution: Supports both absolute paths and relative paths (including
@syntax for project-relative paths) - Automatic Validation: Files are validated for existence and accessibility before attachment
Creates a function source from a JAR file containing Java functions:
{{ config(
materialized='function_source',
parameters={
'file': '@/path/to/my-functions.jar',
'description': 'Custom utility functions'
}
) }}
SELECT 1 as placeholderCreates a descriptor source from compiled protocol buffer descriptor files:
{{ config(
materialized='descriptor_source',
parameters={
'file': '@/path/to/schemas.desc',
'description': 'Protocol buffer schemas for data structures'
}
) }}
SELECT 1 as placeholderNote: Descriptor sources require compiled .desc files, not raw .proto files. Compile your protobuf schemas using:
protoc --descriptor_set_out=schemas/my_schemas.desc schemas/my_schemas.protoCreates a user-defined function that references a function source:
{{ config(
materialized='function',
parameters={
'args': [
{'name': 'input_text', 'type': 'VARCHAR'}
],
'returns': 'VARCHAR',
'language': 'JAVA',
'source.name': 'my_function_source',
'class.name': 'com.example.TextProcessor'
}
) }}
SELECT 1 as placeholderDeltaStream dbt adapter provides macros to help you manage and terminate running queries directly from dbt.
Use the terminate_query macro to terminate a query by its ID:
dbt run-operation terminate_query --args '{query_id: "<QUERY_ID>"}'Use the terminate_all_queries macro to terminate all currently running queries:
dbt run-operation terminate_all_queriesThese macros leverage DeltaStream's LIST QUERIES; and TERMINATE QUERY <query_id>; SQL commands to identify and terminate running queries. This is useful for cleaning up long-running or stuck jobs during development or operations.
The list_all_queries macro displays all queries currently known to DeltaStream, including their state, owner, and SQL. It prints a formatted table to the dbt logs for easy inspection.
Usage:
dbt run-operation list_all_queriesExample Output:
ID | Name | Version | IntendedState | ActualState | Query | Owner | CreatedAt | UpdatedAt
-----------------------------------------------------------------------------------------
<query row 1>
<query row 2>
...
This macro is useful for debugging, monitoring, and operational tasks. It leverages DeltaStream's LIST QUERIES; SQL command and prints the results in a readable table format.
Use the restart_query macro to restart a failed query by its ID:
dbt run-operation restart_query --args '{query_id: "<QUERY_ID>"}'Before restarting a query, you can use the describe_query macro to check the logs and determine if it's worthwhile restarting:
dbt run-operation describe_query --args '{query_id: "<QUERY_ID>"}'This will display the query's current state and any error information to help you understand why the query failed.
The application macro allows you to execute multiple DeltaStream SQL statements as a single unit of work with all-or-nothing semantics. This leverages DeltaStream's APPLICATION syntax for better efficiency and resource utilization.
Usage:
dbt run-operation application --args '{
application_name: "my_data_pipeline",
statements: [
"USE DATABASE my_db",
"CREATE STREAM user_events WITH (topic='"'"'events'"'"', value.format='"'"'json'"'"')",
"CREATE MATERIALIZED VIEW user_counts AS SELECT user_id, COUNT(*) FROM user_events GROUP BY user_id"
]
}'If you encounter "function source is not ready" errors when creating functions:
- Automatic Retry: The adapter automatically retries function creation with exponential backoff
- Timeout Configuration: The default 30-second timeout can be extended if needed for large JAR files
- Dependency Order: Ensure function sources are created before dependent functions
- Manual Retry: If automatic retry fails, wait a few minutes and retry the operation
For problems with file attachments in function sources and descriptor sources:
-
File Paths: Use
@/path/to/filesyntax for project-relative paths -
File Types:
- Function sources require
.jarfiles - Descriptor sources require compiled
.descfiles (not.proto)
- Function sources require
-
File Validation: The adapter validates file existence before attempting attachment
-
Compilation: For descriptor sources, ensure protobuf files are compiled:
protoc --descriptor_set_out=output.desc input.proto
- DeltaStream Documentation - Complete DeltaStream platform documentation
- dbt Documentation - Official dbt documentation
Check out the /examples directory for complete working examples:
snowflake_with_deltastream/- Integration with Snowflakedatabricks_with_deltastream/- Integration with Databricks
Contributions are welcome and encouraged! Whether you're fixing bugs, adding features, improving documentation, or creating examples, your help makes this adapter better for everyone.
Ways to Contribute:
- 🐛 Report bugs via GitHub Issues
- 💡 Suggest features or enhancements
- 📖 Improve documentation - even small fixes help!
- 🧪 Add tests to increase coverage
- ⭐ Star the repository to show your support
Please see CONTRIBUTING.md for detailed guidelines on:
- Setting up your development environment
- Running tests and quality checks
- Submitting pull requests
- Using
changiefor changelog management