Skip to content

Commit

Permalink
[HUDI-7149] Add a dbt example project with CDC capability (apache#10192)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Nov 29, 2023
1 parent 817d81a commit 8370c62
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 18 deletions.
1 change: 1 addition & 0 deletions hudi-examples/hudi-examples-dbt/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ logs/
.DS_Store
.vscode
*.log
dbt-env/
107 changes: 95 additions & 12 deletions hudi-examples/hudi-examples-dbt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
## Testing dbt project: `hudi_examples_dbt`
# Testing dbt project: `hudi_examples_dbt`

This dbt project transforms demonstrates hudi integration with dbt, it has a few models to demonstrate the different ways in which you can create hudi datasets using dbt.

This directory serves as a self-contained playground dbt project, useful for testing out scripts, and communicating some of the core dbt concepts.

### Setup
## Setup

Switch working directory and have `python3` installed.

```shell
cd hudi-examples/hudi-examples-dbt
```

### Install dbt
## Install dbt

Create python virtual environment ([Reference](https://docs.getdbt.com/docs/installation)).

Expand All @@ -54,7 +54,7 @@ spark:
dev:
type: spark
method: thrift
schema: my_schema
schema: hudi_examples_dbt
host: localhost
port: 10000
server_side_parameters:
Expand All @@ -63,7 +63,7 @@ spark:
_If you have access to a data warehouse, you can use those credentials – we recommend setting your [target schema](https://docs.getdbt.com/docs/configure-your-profile#section-populating-your-profile) to be a new schema (dbt will create the schema for you, as long as you have the right privileges). If you don't have access to an existing data warehouse, you can also setup a local postgres database and connect to it in your profile._
### Start Spark Thrift server
## Start Spark Thrift server
> **NOTE** Using these versions
> - Spark 3.2.3 (with Derby 10.14.2.0)
Expand Down Expand Up @@ -104,7 +104,7 @@ $SPARK_HOME/sbin/start-thriftserver.sh \
--hiveconf 'javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true'
```

### Verify dbt setup
## Verify dbt setup

```shell
dbt debug
Expand All @@ -116,13 +116,16 @@ Output of the above command should show this text at the end of the output:
All checks passed!
```

### Run the models
## Run the models

### Run `example`

```shell
dbt run
dbt run -m example
```

Output should look like this
<details>
<summary>Output should look like this</summary>

```
05:47:28 Running with dbt=1.0.0
Expand All @@ -145,14 +148,16 @@ Output should look like this
05:47:42
05:47:42 Completed successfully
```
</details>

### Test the output of the models
### Test `example`

```shell
dbt test
dbt test -m example
```

Output should look like this
<details>
<summary>Output should look like this</summary>

```
05:48:17 Running with dbt=1.0.0
Expand Down Expand Up @@ -187,6 +192,84 @@ Output should look like this
05:48:26
05:48:26 Done. PASS=10 WARN=0 ERROR=0 SKIP=0 TOTAL=10
```
</details>

### Run `example_cdc`

Bootstrap the raw table `raw_updates` and `profiles`.

```shell
dbt run -m example_cdc.raw_updates -m example_cdc.profiles
```

Launch a `spark-sql` shell to interact with the tables created by `example_cdc`.

```shell
spark-sql \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.warehouse.dir=/tmp/hudi/hive/warehouse \
--conf spark.hadoop.hive.metastore.warehouse.dir=/tmp/hudi/hive/warehouse \
--conf spark.hadoop.hive.metastore.schema.verification=false \
--conf spark.hadoop.datanucleus.schema.autoCreateAll=true \
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \
--conf 'spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true' \
--conf 'spark.hadoop.hive.cli.print.header=true'
```

Insert sample records.

```sql
use hudi_examples_dbt;
insert into raw_updates values ('101', 'D', UNIX_TIMESTAMP());
insert into raw_updates values ('102', 'E', UNIX_TIMESTAMP());
insert into raw_updates values ('103', 'F', UNIX_TIMESTAMP());
```

Process the updates and write new date to `profiles`.

```shell
dbt run -m example_cdc.profiles
```

<details>
<summary>Check `profiles` records.</summary>

```shell
spark-sql> refresh table profiles;
spark-sql> select _hoodie_commit_time, user_id, city, updated_at from profiles order by updated_at;
_hoodie_commit_time user_id city updated_at
20231128013722030 101 D 1701157027
20231128013722030 102 E 1701157031
20231128013722030 103 F 1701157035
Time taken: 0.219 seconds, Fetched 3 row(s)
```
</details>
Extract changed data from `profiles` to `profile_changes`.
```shell
dbt run -m example_cdc.profile_changes
```
<details>
<summary>Check `profile_changes` records.</summary>
```shell
spark-sql> refresh table profile_changes;
spark-sql> select user_id, old_city, new_city from profile_changes order by process_ts;
user_id old_city new_city
101 Nil A
102 Nil B
103 Nil C
101 A D
102 B E
103 C F
Time taken: 0.129 seconds, Fetched 6 row(s)
```
</details>
### Generate documentation
Expand Down
15 changes: 9 additions & 6 deletions hudi-examples/hudi-examples-dbt/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ profile: 'spark'
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
model-paths: [ "models" ]

target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
Expand All @@ -44,6 +44,9 @@ clean-targets: # directories to be removed by `dbt clean`
models:
+file_format: hudi
hudi_examples_dbt:
example:
# Applies to all files under models/example/
example:
materialized: table
materialized: table
example_cdc:
# Applies to all files under models/example_cdc/
materialized: table
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

{{
config(
materialized='incremental',
file_format='hudi'
)
}}

with new_changes as (
select
GET_JSON_OBJECT(after, '$.user_id') AS user_id,
COALESCE(GET_JSON_OBJECT(before, '$.city'), 'Nil') AS old_city,
GET_JSON_OBJECT(after, '$.city') AS new_city,
ts_ms as process_ts

from hudi_table_changes('hudi_examples_dbt.profiles', 'cdc',
from_unixtime(unix_timestamp() - 3600 * 24, 'yyyyMMddHHmmss'))

{% if is_incremental() %}
where ts_ms > (select max(process_ts) from {{ this }})
{% endif %}
)
select user_id, old_city, new_city, process_ts
from new_changes
49 changes: 49 additions & 0 deletions hudi-examples/hudi-examples-dbt/models/example_cdc/profiles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

{{
config(
materialized='incremental',
incremental_strategy='merge',
merge_update_columns = ['city', 'updated_at'],
unique_key='user_id',
file_format='hudi',
options={
'type': 'cow',
'primaryKey': 'user_id',
'preCombineField': 'updated_at',
'hoodie.table.cdc.enabled': 'true',
'hoodie.table.cdc.supplemental.logging.mode': 'DATA_BEFORE_AFTER'
}
)
}}

with new_updates as (
select user_id, city, updated_at from {{ ref('raw_updates') }}

{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
)

select
user_id, city, updated_at
from new_updates

40 changes: 40 additions & 0 deletions hudi-examples/hudi-examples-dbt/models/example_cdc/raw_updates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

{{
config(
materialized='incremental',
file_format='hudi',
incremental_strategy='insert_overwrite'
)
}}

with source_data as (

select '101' as user_id, 'A' as city, unix_timestamp() as updated_at
union all
select '102' as user_id, 'B' as city, unix_timestamp() as updated_at
union all
select '103' as user_id, 'C' as city, unix_timestamp() as updated_at

)

select *
from source_data
Loading

0 comments on commit 8370c62

Please sign in to comment.