Skip to content

feat: add code samples for dbt bigframes integration #1898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions dbt_bigframes_integration/.dbt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dbt_sample_project:
outputs:
dev: # The target environment name (e.g., dev, prod)
compute_region: us-central1 # Region used for compute operations
dataset: dbt_sample_dateset # BigQuery dataset where dbt will create models
gcs_bucket: dbt_sample_bucket # GCS bucket to store output files
location: US # BigQuery dataset location
method: oauth # Authentication method
priority: interactive # Job priority: "interactive" or "batch"
project: bigframes-dev # GCP project ID
threads: 1 # Number of threads dbt can use for running models in parallel
type: bigquery # Specifies the dbt adapter
target: dev # The default target environment
39 changes: 39 additions & 0 deletions dbt_bigframes_integration/dbt_sample_project/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'dbt_sample_project'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'dbt_sample_project'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
dbt_sample_project:
# Optional: These settings (e.g., submission_method, notebook_template_id,
# etc.) can also be defined directly in the Python model using dbt.config.
submission_method: bigframes
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This example demonstrates one of the most general usages of transforming raw
# BigQuery data into a processed table using a dbt Python model with BigFrames.
# See more from: https://cloud.google.com/bigquery/docs/dataframes-dbt.
#
# Key defaults when using BigFrames in a dbt Python model for BigQuery:
# - The default materialization is 'table' unless specified otherwise. This
# means dbt will create a new BigQuery table from the result of this model.
# - The default timeout for the job is 3600 seconds (60 minutes). This can be
# adjusted if your processing requires more time.
# - If no runtime template is provided, dbt will automatically create and reuse
# a default one for executing the Python code in BigQuery.
#
# BigFrames provides a pandas-like API for BigQuery data, enabling familiar
# data manipulation directly within your dbt project. This code sample
# illustrates a basic pattern for:
# 1. Reading data from an existing BigQuery dataset.
# 2. Processing it using pandas-like DataFrame operations powered by BigFrames.
# 3. Outputting a cleaned and transformed table, managed by dbt.


def model(dbt, session):
# Optional: Override settings from your dbt_project.yml file.
# When both are set, dbt.config takes precedence over dbt_project.yml.
#
# Use `dbt.config(submission_method="bigframes")` to tell dbt to execute
# this Python model using BigQuery DataFrames (BigFrames). This allows you
# to write pandas-like code that operates directly on BigQuery data
# without needing to pull all data into memory.
dbt.config(submission_method="bigframes")

# Define the BigQuery table path from which to read data.
table = "bigquery-public-data.epa_historical_air_quality.temperature_hourly_summary"

# Define the specific columns to select from the BigQuery table.
columns = ["state_name", "county_name", "date_local", "time_local", "sample_measurement"]

# Read data from the specified BigQuery table into a BigFrames DataFrame.
df = session.read_gbq(table, columns=columns)

# Sort the DataFrame by the specified columns. This prepares the data for
# `drop_duplicates` to ensure consistent duplicate removal.
df = df.sort_values(columns).drop_duplicates(columns)

# Group the DataFrame by 'state_name', 'county_name', and 'date_local'. For
# each group, calculate the minimum and maximum of the 'sample_measurement'
# column. The result will be a BigFrames DataFrame with a MultiIndex.
result = df.groupby(["state_name", "county_name", "date_local"])["sample_measurement"]\
.agg(["min", "max"])

# Rename some columns and convert the MultiIndex of the 'result' DataFrame
# into regular columns. This flattens the DataFrame so 'state_name',
# 'county_name', and 'date_local' become regular columns again.
result = result.rename(columns={'min': 'min_temperature', 'max': 'max_temperature'})\
.reset_index()

# Return the processed BigFrames DataFrame.
# In a dbt Python model, this DataFrame will be materialized as a table
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This example demonstrates how to build an **incremental dbt Python model**
# using BigFrames.
#
# Incremental models are essential for efficiently processing large datasets by
# only transforming new or changed data, rather than reprocessing the entire
# dataset every time. If the target table already exists, dbt will perform a
# merge based on the specified unique keys; otherwise, it will create a new
# table automatically.
#
# This model also showcases the definition and application of a **BigFrames
# User-Defined Function (UDF)** to add a descriptive summary column based on
# temperature data. BigFrames UDFs allow you to execute custom Python logic
# directly within BigQuery, leveraging BigQuery's scalability.


import bigframes.pandas as bpd

def model(dbt, session):
# Optional: override settings from dbt_project.yml.
# When both are set, dbt.config takes precedence over dbt_project.yml.
dbt.config(
# Use BigFrames mode to execute this Python model. This enables
# pandas-like operations directly on BigQuery data.
submission_method="bigframes",
# Materialize this model as an 'incremental' table. This tells dbt to
# only process new or updated data on subsequent runs.
materialized='incremental',
# Use MERGE strategy to update rows during incremental runs.
incremental_strategy='merge',
# Define the composite key that uniquely identifies a row in the
# target table. This key is used by the 'merge' strategy to match
# existing rows for updates during incremental runs.
unique_key=["state_name", "county_name", "date_local"],
)

# Reference an upstream dbt model or an existing BigQuery table as a
# BigFrames DataFrame. It allows you to seamlessly use the output of another
# dbt model as input to this one.
df = dbt.ref("dbt_bigframes_code_sample_1")

# Define a BigFrames UDF to generate a temperature description.
# BigFrames UDFs allow you to define custom Python logic that executes
# directly within BigQuery. This is powerful for complex transformations.
@bpd.udf(dataset='dbt_sample_dataset', name='describe_udf')
def describe(
max_temperature: float,
min_temperature: float,
) -> str:
is_hot = max_temperature > 85.0
is_cold = min_temperature < 50.0

if is_hot and is_cold:
return "Expect both hot and cold conditions today."
if is_hot:
return "Overall, it's a hot day."
if is_cold:
return "Overall, it's a cold day."
return "Comfortable throughout the day."

# Apply the UDF using combine and store the result in a column "describe".
df["describe"] = df["max_temperature"].combine(df["min_temperature"], describe)

# Return the transformed BigFrames DataFrame.
# This DataFrame will be the final output of your incremental dbt model.
# On subsequent runs, only new or changed rows will be processed and merged
# into the target BigQuery table based on the `unique_key`.
return df