@@ -4,21 +4,37 @@ description: Build a data pipeline - the comprehensive guide
44keywords : [build a data pipeline] 
55--- 
66
7- # Building data pipelines with ` dlt ` , from basic to advanced  
7+ ##  Building data pipelines with ` dlt ` , from basic to advanced  
88
99This is in-depth overview will take you through the main areas of pipelining with ` dlt ` . Go to the
1010related pages you are instead looking for the [ demo] ( try-in-colab.md ) , or the
1111[ walkthroughs] ( ../walkthroughs ) .
1212
13- # Why build pipelines with ` dlt ` ?  
13+ ##  Why build pipelines with ` dlt ` ?  
1414
15- By utilizing ` dlt ` , we can easily adapt and structure data as it evolves, reducing the time spent on
15+ ` dlt `  offers functionality to support the entire extract and load process. Let's look at the high level diagram:
16+ 
17+ ![ dlt source resource pipe diagram] ( /img/dlt-high-level.png ) 
18+ 
19+ 
20+ First, we have a ` pipeline `  function, that can infer a schema from data and load the data to the destination.
21+ We can use this pipeline with json data, dataframes, or other iterable objects such as generator functions.
22+ 
23+ This pipeline provides effortless loading via a schema discovery, versioning and evolution
24+ engine that ensures you can "just load" any data with row and column level lineage.
25+ 
26+ By utilizing a ` dlt pipeline ` , we can easily adapt and structure data as it evolves, reducing the time spent on
1627maintenance and development.
1728
1829This allows our data team to focus on leveraging the data and driving value, while ensuring
1930effective governance through timely notifications of any changes.
2031
21- # The simplest pipeline: 1 liner to load data with schema evolution  
32+ For extract, ` dlt `  also provides ` source `  and ` resource `  decorators that enable defining
33+ how extracted data should be loaded, while supporting graceful,
34+ scalable extraction via micro-batching and parallelism.
35+ 
36+ 
37+ ## The simplest pipeline: 1 liner to load data with schema evolution  
2238
2339``` python 
2440import  dlt
@@ -57,10 +73,70 @@ destination (DuckDB) and dataset name ("country_data"). The `run` method is then
5773the data from a list of objects into the table named "countries". The ` info `  variable stores
5874information about the loaded data, such as package IDs and job metadata.
5975
60- The data you can pass to it should be iterable: Lists  of rows, generators, or ` dlt `  sources will do
76+ The data you can pass to it should be iterable: lists  of rows, generators, or ` dlt `  sources will do
6177just fine.
6278
63- # Extracting data with ` dlt `   
79+ If you want to configure how the data is loaded, you can choose between ` write_disposition ` s
80+ such as ` replace ` , ` append `  and ` merge `  in the pipeline function.
81+ 
82+ Here is an example where we load some data to duckdb by ` upserting `  or ` merging `  on the id column found in the data.
83+ In this example, we also run a dbt package and then load the outcomes of the load jobs into their respective tables.
84+ This will enable us to log when schema changes occurred and match them to the loaded data for lineage, granting us both column and row level lineage.
85+ We also alert the schema change to a Slack channel where hopefully the producer and consumer are subscribed.
86+ 
87+ ``` python 
88+ import  dlt
89+ 
90+ #  have data? dlt likes data
91+ data =  [{' id' 1 , ' name' ' John' 
92+ 
93+ #  open connection
94+ pipeline =  dlt.pipeline(
95+     destination = ' duckdb' 
96+     dataset_name = ' raw_data' 
97+ )
98+ 
99+ #  Upsert/merge: Update old records, insert new
100+ load_info =  pipeline.run(
101+     data,
102+     write_disposition = " merge" 
103+     primary_key = " id" 
104+     table_name = " users" 
105+ )
106+ ``` 
107+ Add dbt runner, optionally with venv:
108+ ``` python 
109+ venv =  dlt.dbt.get_venv(pipeline)
110+ dbt =  dlt.dbt.package(
111+     pipeline,
112+     " https://github.com/dbt-labs/jaffle_shop.git" 
113+     venv = venv
114+ )
115+ models_info =  dbt.run_all()
116+ 
117+ #  Load metadata for monitoring and load package lineage.
118+ #  This allows for both row and column level lineage,
119+ #  as it contains schema update info linked to the loaded data
120+ pipeline.run([load_info], table_name = " loading_status" write_disposition = ' append' 
121+ pipeline.run([models_info], table_name = " transform_status" write_disposition = ' append' 
122+ ``` 
123+ 
124+ Let's alert any schema changes:
125+ ``` python 
126+ from  dlt.common.runtime.slack import  send_slack_message
127+ 
128+ slack_hook =  " https://hooks.slack.com/services/xxx/xxx/xxx" 
129+ 
130+ for  package in  load_info.load_packages:
131+     for  table_name, table in  package.schema_update.items():
132+         for  column_name, column in  table[" columns" 
133+             send_slack_message(
134+                 slack_hook,
135+                 message = f " \t Table updated:  { table_name} : Column changed:  { column_name} :  { column[' data_type' } " 
136+             )
137+ ``` 
138+ 
139+ ## Extracting data with ` dlt `   
64140
65141Extracting data with ` dlt `  is simple - you simply decorate your data-producing functions with loading
66142or incremental extraction metadata, which enables ` dlt `  to extract and load by your custom logic.
@@ -71,7 +147,7 @@ Technically, two key aspects contribute to `dlt`'s effectiveness:
71147-  The utilization of implicit extraction DAGs that allow efficient API calls for data
72148  enrichments or transformations.
73149
74- ## Scalability via iterators, chunking, and parallelization  
150+ ###  Scalability via iterators, chunking, and parallelization  
75151
76152` dlt `  offers scalable data extraction by leveraging iterators, chunking, and parallelization
77153techniques. This approach allows for efficient processing of large datasets by breaking them down
@@ -87,7 +163,7 @@ multiple data chunks simultaneously, `dlt` takes advantage of parallel processin
87163resulting in significantly reduced extraction times. This parallelization enhances performance,
88164especially when dealing with high-volume data sources.
89165
90- ## Implicit extraction DAGs  
166+ ###  Implicit extraction DAGs  
91167
92168` dlt `  incorporates the concept of implicit extraction DAGs to handle the dependencies between
93169data sources and their transformations automatically. A DAG represents a directed graph without
@@ -106,13 +182,13 @@ the correct order, accounting for any dependencies and transformations.
106182When deploying to Airflow, the internal DAG is unpacked into Airflow tasks in such a way to ensure
107183consistency and allow granular loading.
108184
109- # Defining Incremental Loading  
185+ ##  Defining Incremental Loading  
110186
111187[ Incremental loading] ( ../general-usage/incremental-loading.md )  is a crucial concept in data pipelines that involves loading only new or changed
112188data instead of reloading the entire dataset. This approach provides several benefits, including
113189low-latency data transfer and cost savings.
114190
115- ## Declarative loading  
191+ ###  Declarative loading  
116192
117193Declarative loading allows you to specify the desired state of the data in the target destination,
118194enabling efficient incremental updates. With ` dlt ` , you can define the incremental loading
@@ -131,7 +207,9 @@ behavior using the `write_disposition` parameter. There are three options availa
131207   ` write_disposition='merge' ` , you can perform merge-based incremental loading.
132208
133209For example, let's say you want to load GitHub events and update them in the destination, ensuring
134- that only one instance of each event is present. You can use the merge write disposition as follows:
210+ that only one instance of each event is present.
211+ 
212+ You can use the merge write disposition as follows:
135213
136214``` python 
137215@dlt.resource (primary_key = " id" write_disposition = " merge" 
@@ -144,20 +222,20 @@ In this example, the `github_repo_events` resource uses the merge write disposit
144222is present in the ` github_repo_events `  table. ` dlt `  takes care of loading the data
145223incrementally, deduplicating it, and performing the necessary merge operations.
146224
147- ## Advanced state management  
225+ ###  Advanced state management  
148226
149227Advanced state management in ` dlt `  allows you to store and retrieve values across pipeline runs
150228by persisting them at the destination but accessing them in a dictionary in code. This enables you
151229to track and manage incremental loading effectively. By leveraging the pipeline state, you can
152230preserve information, such as last values, checkpoints or column renames, and utilize them later in
153231the pipeline.
154232
155- # Transforming the Data  
233+ ##  Transforming the Data  
156234
157235Data transformation plays a crucial role in the data loading process. You can perform
158236transformations both before and after loading the data. Here's how you can achieve it:
159237
160- ## Before Loading  
238+ ###  Before Loading  
161239
162240Before loading the data, you have the flexibility to perform transformations using Python. You can
163241leverage Python's extensive libraries and functions to manipulate and preprocess the data as needed.
@@ -171,25 +249,38 @@ consistent mapping. The `dummy_source` generates dummy data with an `id` and `na
171249column, and the ` add_map `  function applies the ` pseudonymize_name `  transformation to each
172250record.
173251
174- ## After Loading  
252+ ###  After Loading  
175253
176254For transformations after loading the data, you have several options available:
177255
178- ### [ Using dbt] ( ../dlt-ecosystem/transformations/dbt.md )  
256+ ####  [ Using dbt] ( ../dlt-ecosystem/transformations/dbt.md )  
179257
180258dbt is a powerful framework for transforming data. It enables you to structure your transformations
181259into DAGs, providing cross-database compatibility and various features such as templating,
182260backfills, testing, and troubleshooting. You can use the dbt runner in ` dlt `  to seamlessly
183261integrate dbt into your pipeline. Here's an example of running a dbt package after loading the data:
184262
185263``` python 
264+ import  dlt
265+ from  pipedrive import  pipedrive_source
266+ 
186267#  load to raw
187- pipeline =  dlt.pipeline(pipeline_name = ' pipedrive' destination = ' bigquery' dataset_name = ' pipedrive_raw' 
268+ pipeline =  dlt.pipeline(
269+     pipeline_name = ' pipedrive' 
270+     destination = ' bigquery' 
271+     dataset_name = ' pipedrive_raw' 
272+ )
188273
189274load_info =  pipeline.run(pipedrive_source())
190275print (load_info)
191- #  now transform from loaded data to dbt dataset
192- pipeline =  dlt.pipeline(pipeline_name = ' pipedrive' destination = ' bigquery' dataset_name = ' pipedrive_dbt' 
276+ ``` 
277+ Now transform from loaded data to dbt dataset:
278+ ``` python 
279+ pipeline =  dlt.pipeline(
280+     pipeline_name = ' pipedrive' 
281+     destination = ' bigquery' 
282+     dataset_name = ' pipedrive_dbt' 
283+ )
193284
194285#  make venv and install dbt in it.
195286venv =  dlt.dbt.get_venv(pipeline)
@@ -208,7 +299,7 @@ pipeline performs transformations using a dbt package called `pipedrive` after l
208299The ` dbt.package `  function sets up the dbt runner, and ` dbt.run_all() `  executes the dbt
209300models defined in the package.
210301
211- ### [ Using the ` dlt `  SQL client] ( ../dlt-ecosystem/transformations/sql.md )  
302+ ####  [ Using the ` dlt `  SQL client] ( ../dlt-ecosystem/transformations/sql.md )  
212303
213304Another option is to leverage the ` dlt `  SQL client to query the loaded data and perform
214305transformations using SQL statements. You can execute SQL statements that change the database schema
@@ -227,7 +318,7 @@ with pipeline.sql_client() as client:
227318In this example, the ` execute_sql `  method of the SQL client allows you to execute SQL
228319statements. The statement inserts a row with values into the ` customers `  table.
229320
230- ### [ Using Pandas] ( ../dlt-ecosystem/transformations/pandas.md )  
321+ ####  [ Using Pandas] ( ../dlt-ecosystem/transformations/pandas.md )  
231322
232323You can fetch query results as Pandas data frames and perform transformations using Pandas
233324functionalities. Here's an example of reading data from the ` issues `  table in DuckDB and
@@ -253,7 +344,7 @@ counts = reactions.sum(0).sort_values(0, ascending=False)
253344By leveraging these transformation options, you can shape and manipulate the data before or after
254345loading it, allowing you to meet specific requirements and ensure data quality and consistency.
255346
256- # Adjusting the automated normalisation  
347+ ##  Adjusting the automated normalisation  
257348
258349To streamline the process, ` dlt `  recommends attaching schemas to sources implicitly instead of
259350creating them explicitly. You can provide a few global schema settings and let the table and column
@@ -266,7 +357,7 @@ By adjusting the automated normalization process in `dlt`, you can ensure that t
266357schema meets your specific requirements and aligns with your preferred naming conventions, data
267358types, and other customization needs.
268359
269- ## Customizing the Normalization Process  
360+ ###  Customizing the Normalization Process  
270361
271362Customizing the normalization process in ` dlt `  allows you to adapt it to your specific requirements.
272363
@@ -280,7 +371,7 @@ the normalization process to meet your unique needs and achieve optimal results.
280371
281372Read more about how to configure [ schema generation.] ( ../general-usage/schema.md ) 
282373
283- ## Exporting and Importing Schema Files  
374+ ###  Exporting and Importing Schema Files  
284375
285376` dlt `  allows you to export and import schema files, which contain the structure and instructions for
286377processing and loading the data. Exporting schema files enables you to modify them directly, making
@@ -289,20 +380,20 @@ use them in your pipeline.
289380
290381Read more: [ Adjust a schema docs.] ( ../walkthroughs/adjust-a-schema.md ) 
291382
292- # Governance Support in ` dlt `  Pipelines  
383+ ##  Governance Support in ` dlt `  Pipelines  
293384
294385` dlt `  pipelines offer robust governance support through three key mechanisms: pipeline metadata
295386utilization, schema enforcement and curation, and schema change alerts.
296387
297- ## Pipeline Metadata  
388+ ###  Pipeline Metadata  
298389
299390` dlt `  pipelines leverage metadata to provide governance capabilities. This metadata includes load IDs,
300391which consist of a timestamp and pipeline name. Load IDs enable incremental transformations and data
301392vaulting by tracking data loads and facilitating data lineage and traceability.
302393
303394Read more about [ lineage.] ( ../dlt-ecosystem/visualizations/understanding-the-tables.md#load-ids ) 
304395
305- ## Schema Enforcement and Curation  
396+ ###  Schema Enforcement and Curation  
306397
307398` dlt `  empowers users to enforce and curate schemas, ensuring data consistency and quality. Schemas
308399define the structure of normalized data and guide the processing and loading of data. By adhering to
@@ -311,7 +402,7 @@ practices.
311402
312403Read more: [ Adjust a schema docs.] ( ../walkthroughs/adjust-a-schema.md ) 
313404
314- ## Schema evolution  
405+ ###  Schema evolution  
315406
316407` dlt `  enables proactive governance by alerting users to schema changes. When modifications occur in
317408the source data’s schema, such as table or column alterations, ` dlt `  notifies stakeholders, allowing
@@ -324,7 +415,7 @@ control throughout the data processing lifecycle.
324415
325416Read more about [ schema evolution.] ( ../reference/explainers/schema-evolution.md ) 
326417
327- ## Scaling and finetuning  
418+ ###  Scaling and finetuning  
328419
329420` dlt `  offers several mechanism and configuration options to scale up and finetune pipelines:
330421
@@ -334,8 +425,8 @@ Read more about [schema evolution.](../reference/explainers/schema-evolution.md)
334425
335426Read more about [ performance.] ( ../reference/performance.md ) 
336427
337- ## Other advanced topics  
428+ ###  Other advanced topics  
338429
339430` dlt `  is a constantly growing library that supports many features and use cases needed by the
340- community. [ Join our slack ] ( https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g ) 
431+ community. [ Join our Slack ] ( https://join.slack.com/t/dlthub-community/shared_invite/zt-1slox199h-HAE7EQoXmstkP_bTqal65g ) 
341432to find recent releases or discuss what you can build with ` dlt ` .
0 commit comments