Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ def process(
resource.add_filter(step["filter"])
if "map" in step:
resource.add_map(step["map"])
if "yield_map" in step:
resource.add_yield_map(step["yield_map"])
return resource

if resolved_params is None:
Expand Down
2 changes: 2 additions & 0 deletions dlt/sources/rest_api/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Any,
Callable,
Dict,
Iterator,
List,
Literal,
Optional,
Expand Down Expand Up @@ -262,6 +263,7 @@ class Endpoint(TypedDict, total=False):
class ProcessingSteps(TypedDict, total=False):
filter: Optional[Callable[[Any], bool]] # noqa: A003
map: Optional[Callable[[Any], Any]] # noqa: A003
yield_map: Optional[Callable[[Any], Iterator[Any]]] # noqa: A003


class ResourceBase(TResourceHintsBase, total=False):
Expand Down
38 changes: 36 additions & 2 deletions docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ def repositories() -> Generator[Dict[str, Any], Any, Any]:

The `processing_steps` field in the resource configuration allows you to apply transformations to the data fetched from the API before it is loaded into your destination. This is useful when you need to filter out certain records, modify the data structure, or anonymize sensitive information.

Each processing step is a dictionary specifying the type of operation (`filter` or `map`) and the function to apply. Steps apply in the order they are listed.
Each processing step is a dictionary specifying the type of operation (`filter`, `map` or `yield_map`) and the function to apply. Steps apply in the order they are listed.

#### Quick example

Expand All @@ -981,6 +981,12 @@ def lower_title(record):
record["title"] = record["title"].lower()
return record

def flatten_reactions(post):
post_without_reactions = copy.deepcopy(post)
post_without_reactions.pop("reactions")
for reaction in post["reactions"]:
yield {"reaction": reaction, **post_without_reactions}

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
Expand All @@ -991,6 +997,7 @@ config: RESTAPIConfig = {
"processing_steps": [
{"filter": lambda x: x["id"] < 10},
{"map": lower_title},
{"yield_map": flatten_reactions},
],
},
],
Expand All @@ -1000,7 +1007,9 @@ config: RESTAPIConfig = {
In the example above:

- First, the `filter` step uses a lambda function to include only records where `id` is less than 10.
- Thereafter, the `map` step applies the `lower_title` function to each remaining record.
- Then, the `map` step applies the `lower_title` function to each remaining record.
- Finally, the `yield_map` step applies the `flatten_reactions` function to each transformed record,
yielding a set of records, one for each reaction for the given post.

#### Using `filter`

Expand Down Expand Up @@ -1042,6 +1051,31 @@ config: RESTAPIConfig = {
}
```

#### Using `yield_map`

The `yield_map` step allows you to transform a record into multiple records. The provided function should take a record as an argument and return an iterator of records. For example, to flatten the `reactions` field:

```py
def flatten_reactions(post):
post_without_reactions = copy.deepcopy(post)
post_without_reactions.pop("reactions")
for reaction in post["reactions"]:
yield {"reaction": reaction, **post_without_reactions}

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"processing_steps": [
{"yield_map": flatten_reactions},
],
},
],
}
```
#### Combining `filter` and `map`

You can combine multiple processing steps to achieve complex transformations:
Expand Down
23 changes: 22 additions & 1 deletion tests/sources/rest_api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
DEFAULT_PAGE_SIZE = 5
DEFAULT_TOTAL_PAGES = 5
DEFAULT_LIMIT = 10
DEFAULT_REACTIONS_COUNT = 5
DEFAULT_COMMENTS_COUNT = 50


router = APIRouter(MOCK_BASE_URL)
Expand All @@ -27,7 +29,22 @@ def generate_posts(count=DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES):
return [{"id": i, "title": f"Post {i}"} for i in range(count)]


def generate_comments(post_id, count=50):
def generate_posts_with_reactions(
count=DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES, count_reactions=DEFAULT_REACTIONS_COUNT
):
return [
{
"id": i,
"title": f"Post {i}",
"reactions": [
{"id": j, "title": f"Reaction {j} for post {i}"} for j in range(count_reactions)
],
}
for i in range(count)
]


def generate_comments(post_id, count=DEFAULT_COMMENTS_COUNT):
return [
{"id": i, "post_id": post_id, "body": f"Comment {i} for post {post_id}"}
for i in range(count)
Expand Down Expand Up @@ -389,6 +406,10 @@ def posts_raw_data(request, context):
def posts(request, context):
return paginate_by_page_number(request, generate_posts())

@router.get(r"/posts_with_reactions(\?.*)?$")
def posts_with_reactions(request, context):
return paginate_by_page_number(request, generate_posts_with_reactions())

router.register_routes(m)

yield m
Expand Down
150 changes: 150 additions & 0 deletions tests/sources/rest_api/integration/test_processing_steps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import copy
from typing import Any, Callable, Dict, List

import pytest

import dlt
from dlt.sources.rest_api import RESTAPIConfig, rest_api_source
from ..conftest import (
DEFAULT_COMMENTS_COUNT,
DEFAULT_PAGE_SIZE,
DEFAULT_TOTAL_PAGES,
DEFAULT_REACTIONS_COUNT,
)


def _make_pipeline(destination_name: str):
Expand Down Expand Up @@ -259,3 +266,146 @@ def extend_body(row):

data = list(mock_source.with_resources("comments"))
assert data[0]["body"] == "Post 2 - Comment 0 for post 2"


def flatten_reactions(post):
post_without_reactions = copy.deepcopy(post)
post_without_reactions.pop("reactions")
for reaction in post["reactions"]:
yield {"reaction": reaction, **post_without_reactions}


def test_rest_api_source_yield_map(mock_api_server) -> None:
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"endpoint": "posts_with_reactions",
"processing_steps": [
{"yield_map": flatten_reactions},
],
},
],
}
mock_source = rest_api_source(config)

data = list(mock_source.with_resources("posts"))

assert len(data) == DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES * DEFAULT_REACTIONS_COUNT
assert all("reaction" in record and "reactions" not in record for record in data)
assert all(
record["reaction"]["title"]
== f"Reaction {record['reaction']['id']} for post {record['id']}"
for record in data
)


def test_rest_api_source_filter_then_yield_map(mock_api_server) -> None:
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"endpoint": "posts_with_reactions",
"processing_steps": [
{"filter": lambda x: x["id"] != 1},
{"yield_map": flatten_reactions},
],
},
],
}
mock_source = rest_api_source(config)

data = list(mock_source.with_resources("posts"))

assert len(data) == (DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES - 1) * DEFAULT_REACTIONS_COUNT
assert all(record["id"] != 1 for record in data)


def test_rest_api_source_yield_map_then_filter_reactions(mock_api_server) -> None:
config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"endpoint": "posts_with_reactions",
"processing_steps": [
{"yield_map": flatten_reactions},
{"filter": lambda x: x["reaction"]["id"] != 0},
],
},
],
}
mock_source = rest_api_source(config)

data = list(mock_source.with_resources("posts"))

assert len(data) == DEFAULT_PAGE_SIZE * DEFAULT_TOTAL_PAGES * (DEFAULT_REACTIONS_COUNT - 1)
assert all(record["reaction"]["id"] != 0 for record in data)


@pytest.mark.parametrize(
"comments_endpoint",
[
{
"path": "/posts/{post_id}/comments",
"params": {
"post_id": {
"type": "resolve",
"resource": "posts",
"field": "id",
}
},
},
"posts/{resources.posts.id}/comments",
],
)
def test_rest_api_source_yield_map_child(mock_api_server, comments_endpoint) -> None:
def extend_body(row):
row["body"] = f"{row['_posts_title']} - {row['body']}"
return row

def flatten_comment_reactions(comment_enriched):
comment_without_reactions = copy.deepcopy(comment_enriched)
comment_without_reactions.pop("_posts_reactions")
for reaction in comment_enriched["_posts_reactions"]:
yield {"_posts_reaction": reaction, **comment_without_reactions}

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.example.com",
},
"resources": [
{
"name": "posts",
"endpoint": "posts_with_reactions",
"processing_steps": [
{"filter": lambda x: x["id"] in (1, 2)},
],
},
{
"name": "comments",
"endpoint": comments_endpoint,
"include_from_parent": ["title", "reactions"],
"processing_steps": [
{"map": extend_body},
{"filter": lambda x: x["body"].startswith("Post 2")},
{"yield_map": flatten_comment_reactions},
],
},
],
}
mock_source = rest_api_source(config)

data = list(mock_source.with_resources("comments"))

assert len(data) == DEFAULT_COMMENTS_COUNT * DEFAULT_REACTIONS_COUNT
assert data[0]["body"] == "Post 2 - Comment 0 for post 2"
assert data[0]["_posts_reaction"]["title"] == "Reaction 0 for post 2"
Loading