-
Notifications
You must be signed in to change notification settings - Fork 14
Open
Labels
Description
Course
data-engineering-zoomcamp
Question
What is the difference between rest_api_source({...}) and @dlt.resource in dlt, and when should I use each?
Answer
Both are official dlt patterns. The main difference is level of control.
- JSON config (
rest_api_source) is declarative. - Custom code (
@dlt.resource) is programmatic and more flexible.
Use JSON config when:
- API is simple and consistent
- pagination/params/selectors are standard
- you want fast setup with less custom code
Use custom resource when:
- response schema is inconsistent or dynamic
- you need custom stop/retry/error rules
- you need custom preprocessing/validation logic
- you need fine-grained behavior for production scenarios
Quick summary:
- JSON config: faster and cleaner for standard APIs
- Custom resource: more flexible for real-world custom APIs
Execution lifecycle is the same for both:
pipeline.run(...)->extract + normalize + load
Example 1: JSON config (rest_api_source)
import dlt
from dlt.sources.rest_api import rest_api_source
def ny_taxi_source():
return rest_api_source({
"client": {
"base_url": "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
},
"resource_defaults": {
"write_disposition": "replace"
},
"resources": [
{
"name": "rides",
"endpoint": {
"path": "",
"params": {
"page_size": 1000
},
"data_selector": "data",
"paginator": {
"type": "page_number",
"page_param": "page",
"base_page": 1,
"stop_after_empty_page": True
}
}
}
]
})
pipeline = dlt.pipeline(
pipeline_name="taxi_pipeline",
destination="duckdb",
dataset_name="taxi_data"
)
info = pipeline.run(ny_taxi_source())
print(info)Example 2: Custom resource (@dlt.resource)
import dlt
from dlt.sources.helpers.rest_client import RESTClient
BASE_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
def extract_records(payload):
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for k in ("data", "results", "rides", "items"):
v = payload.get(k)
if isinstance(v, list):
return v
return []
@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi_rides():
client = RESTClient(base_url=BASE_URL)
page = 1
while True:
response = client.get("", params={"page": page, "page_size": 1000})
rows = extract_records(response.json())
if not rows:
break
yield from rows
page += 1
pipeline = dlt.pipeline(
pipeline_name="taxi_pipeline",
destination="duckdb",
dataset_name="taxi_data"
)
info = pipeline.run(ny_taxi_rides())
print(info)Checklist
- I have searched existing FAQs and this question is not already answered
- The answer provides accurate, helpful information
- I have included any relevant code examples or links
Reactions are currently unavailable