Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion dlt/extract/items_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Generic,
Iterator,
Mapping,
MutableMapping,
Optional,
Union,
Dict,
Expand All @@ -30,6 +29,7 @@
ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny]
ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]]

MetricsFunctionWithMeta = Callable[[TDataItems, Any, Dict[str, Any]], None]

TCustomMetrics = TypeVar(
"TCustomMetrics", covariant=True, bound=Mapping[str, Any], default=Dict[str, Any]
Expand Down Expand Up @@ -225,3 +225,21 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
return None

return item


class MetricsItem(ItemTransform[None]):
"""Collects custom metrics from data flowing through the pipe without modifying items.

The metrics function receives items, optionally meta, and a metrics dictionary that it can
update in-place. The items are passed through unchanged.
"""

_metrics_f: MetricsFunctionWithMeta = None

def __init__(self, metrics_f: MetricsFunctionWithMeta) -> None:
BaseItemTransform.__init__(self)
self._metrics_f = metrics_f

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
self._metrics_f(item, meta, self._custom_metrics)
return item
25 changes: 22 additions & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
Union,
Any,
Optional,
Mapping,
List,
Tuple,
Dict,
)

Expand Down Expand Up @@ -66,8 +63,10 @@
YieldMapItem,
ValidateItem,
LimitItem,
MetricsItem,
ItemTransformFunc,
ItemTransformFunctionWithMeta,
MetricsFunctionWithMeta,
)
from dlt.extract.state import resource_state
from dlt.extract.pipe_iterator import ManagedPipeIterator
Expand Down Expand Up @@ -345,6 +344,26 @@ def add_map(
self._pipe.insert_step(MapItem(item_map), insert_at)
return self

def add_metrics(
self, metrics_f: MetricsFunctionWithMeta, insert_at: int = None
) -> Self: # noqa: A003
"""Adds metrics collection function defined in `metrics_f` to the resource pipe at position `inserted_at`
`metrics_f` receives data items, meta, and metrics dictionary to update in-place.
Items are passed through unchanged.

Args:
metrics_f (MetricsFunctionWithMeta): A function taking items, meta argument, and metrics dict. Returns None.
insert_at (int, optional): At which step in pipe to insert the metrics collector. Defaults to None which inserts after last step

Returns:
"DltResource": returns self
"""
if insert_at is None:
self._pipe.append_step(MetricsItem(metrics_f))
else:
self._pipe.insert_step(MetricsItem(metrics_f), insert_at)
return self

def add_yield_map(
self,
item_map: ItemTransformFunc[Iterator[TDataItem]],
Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/dlt-ecosystem/transformations/add-map.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ This page covers how `add_map` works, where it fits in the pipeline, and how to

In addition to `add_map`, dlt provides:

- **`add_filter`**: Excludes records based on a condition. Works like a filter function that removes items you don't want to load. ([`resource.add_filter`)](../../api_reference/dlt/extract/resource#add_filter).
- **`add_filter`**: Excludes records based on a condition. Works like a filter function that removes items you don't want to load. ([`resource.add_filter`](../../api_reference/dlt/extract/resource#add_filter)).
- **`add_yield_map`**: Produces multiple outputs from a single input item. Returns an iterator instead of a single item. ([`resource.add_yield_map`](../../api_reference/dlt/extract/resource#add_yield_map)).
- **`add_limit`**: Limits the number of records processed by a resource. Useful for testing or reducing data volume during development. ([`resource.add_limit`](../../api_reference/dlt/extract/resource#add_limit)).
- **`add_metrics`**: Collects custom metrics from data flowing through the pipeline without modifying items. Useful for tracking statistics like counts, data quality metrics, or processing information. ([`resource.add_metrics`](../../api_reference/dlt/extract/resource#add_metrics)).

These methods help you control the shape and flow of data during transformation.

Expand Down
72 changes: 68 additions & 4 deletions docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,14 @@ resource. The available transformation types:
- **filter** - filter the data item (`resource.add_filter`).
- [**yield map**](../dlt-ecosystem/transformations/add-map#add_yield_map) - a map that returns an iterator (so a single row may generate many rows -
`resource.add_yield_map`).
- [**metrics**](#using-add_metrics-as-a-transformation-step) - collect custom metrics without modifying data items (`resource.add_metrics`).

Example: We have a resource that loads a list of users from an API endpoint. We want to customize it
so:

1. We remove users with `user_id == "me"`.
2. We anonymize user data.
1. We track how many users with `user_id == "me"` are being filtered out.
2. We remove users with `user_id == "me"`.
3. We anonymize user data.

Here's our resource:

Expand All @@ -384,8 +386,20 @@ def anonymize_user(user_data):
user_data["user_email"] = _hash_str(user_data["user_email"])
return user_data

# add the filter and anonymize function to users resource and enumerate
for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(anonymize_user):
def track_filtered(items, meta, metrics):
"""Track how many 'me' users were filtered out."""
users_list = items if isinstance(items, list) else [items]
for user in users_list:
if user["user_id"] == "me":
metrics["filtered_me_users"] = metrics.get("filtered_me_users", 0) + 1

# add metrics, filter, and anonymize function to users resource
for user in (
users()
.add_metrics(track_filtered)
.add_filter(lambda user: user["user_id"] != "me")
.add_map(anonymize_user)
):
print(user)
```

Expand Down Expand Up @@ -702,6 +716,9 @@ The `with_name` method returns a deep copy of the original resource, its data pi

## Collect custom metrics


### Using `dlt.current.resource_metrics()` within a resource

You can track custom statistics during resource extraction with `dlt.current.resource_metrics()`, which might otherwise be lost:

```py
Expand Down Expand Up @@ -746,6 +763,53 @@ print(f"Custom metrics: {resource_metrics.custom_metrics}")

As shown above, custom metrics are included in pipeline traces. Refer to [pipeline trace loading](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) for more details.

### Using `add_metrics` as a transformation step

Alternatively, you can collect metrics using `add_metrics`, which works as a transformation step in the pipeline.

```py
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator

github_client = RESTClient(
base_url="https://pokeapi.co/api/v2",
paginator=JSONLinkPaginator(next_url_path="next"),
data_selector="results",
)

@dlt.resource
def get_pokemons():
for page in github_client.paginate(
"/pokemon",
params={
"limit": 100,
},
):
yield page

def page_counter(items, meta, metrics) -> None:
metrics["page_count"] = metrics.get("page_count", 0) + 1

get_pokemons.add_metrics(page_counter)

pipeline = dlt.pipeline(
pipeline_name="get_pokemons",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(get_pokemons)
print(load_info)

# Access custom metrics from last trace
trace = pipeline.last_trace
load_id = load_info.loads_ids[0]
resource_metrics = trace.last_extract_info.metrics[load_id][0]["resource_metrics"]["get_pokemons"]

print(f"Custom metrics: {resource_metrics.custom_metrics}")
```


## Load resources

You can pass individual resources or a list of resources to the `dlt.pipeline` object. The resources loaded outside the source context will be added to the [default schema](schema.md) of the pipeline.
Expand Down
97 changes: 96 additions & 1 deletion tests/extract/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
NormalizeStorageConfiguration,
)
from dlt.common.storages.schema_storage import SchemaStorage
from dlt.common.utils import uniq_id

from dlt.common.typing import TTableNames, TDataItems
from dlt.extract import DltResource, DltSource
Expand All @@ -20,7 +21,7 @@
from dlt.extract.hints import TResourceNestedHints, make_hints
from dlt.extract.items_transform import ValidateItem

from dlt.extract.items import TableNameMeta
from dlt.extract.items import TableNameMeta, DataItemWithMeta
from tests.utils import MockPipeline, clean_test_storage, TEST_STORAGE_ROOT
from tests.extract.utils import expect_extracted_file

Expand Down Expand Up @@ -727,3 +728,97 @@ def resource_with_step_metrics():
assert resource_metrics.custom_metrics == expected_metrics
else:
assert resource_metrics.custom_metrics == {}


@pytest.mark.parametrize(
"as_single_batch",
[True, False],
ids=["single_batch", "multiple_batches"],
)
def test_add_metrics(extract_step: Extract, as_single_batch: bool) -> None:
"""Test metrics collection with add_metrics"""

# 1: Test metrics at different pipeline stages (before/after filter)
@dlt.resource
def some_data():
data = [1, 2, 3, 4, 5, 6]
if as_single_batch:
yield data
else:
yield from data

def early_counter(items: TDataItems, meta: Any, metrics: Dict[str, Any]) -> None:
metrics["early_count"] = metrics.get("early_count", 0) + 1

def late_counter(items: TDataItems, meta: Any, metrics: Dict[str, Any]) -> None:
metrics["late_count"] = metrics.get("late_count", 0) + 1

some_data.add_metrics(early_counter).add_filter(lambda x: x > 3).add_metrics(late_counter)

# 2. Test metrics with TableNameMeta
@dlt.resource
def multi_table_data():
yield dlt.mark.with_table_name({"id": 1, "name": "Alice"}, "users")
yield dlt.mark.with_table_name({"id": 2, "name": "Bob"}, "users")
yield dlt.mark.with_table_name({"product": "A"}, "products")
yield dlt.mark.with_table_name({"product": "B"}, "products")

def count_by_table(items: TDataItems, meta: Any, metrics: Dict[str, Any]) -> None:
if isinstance(meta, TableNameMeta):
table_key = f"count_{meta.table_name}"
metrics[table_key] = metrics.get(table_key, 0) + 1

multi_table_data.add_metrics(count_by_table)

# 3. Test metrics with custom metadata
@dlt.resource
def data_with_priority():
yield DataItemWithMeta(meta={"priority": "high"}, data={"id": 1})
yield DataItemWithMeta(meta={"priority": "high"}, data={"id": 2})
yield DataItemWithMeta(meta={"priority": "low"}, data={"id": 3})
yield DataItemWithMeta(meta={"priority": "low"}, data={"id": 4})
yield DataItemWithMeta(meta={"priority": "low"}, data={"id": 5})

def count_by_priority(items: TDataItems, meta: Any, metrics: Dict[str, Any]) -> None:
if isinstance(meta, dict) and "priority" in meta:
priority = meta["priority"]
key = f"{priority}_priority_count"
metrics[key] = metrics.get(key, 0) + 1

data_with_priority.add_metrics(count_by_priority)

source = DltSource(
dlt.Schema("metrics"), "module", [some_data, multi_table_data, data_with_priority]
)
load_id = extract_step.extract(source, 20, 1)

assert source.resources["some_data"].custom_metrics == {
"early_count": 1 if as_single_batch else 6,
"late_count": 1 if as_single_batch else 3,
}
assert source.resources["multi_table_data"].custom_metrics == {
"count_users": 2,
"count_products": 2,
}
assert source.resources["data_with_priority"].custom_metrics == {
"high_priority_count": 2,
"low_priority_count": 3,
}

step_info = extract_step.get_step_info(MockPipeline("buba", first_run=False)) # type: ignore[abstract]
all_resource_metrics = step_info.metrics[load_id][0]["resource_metrics"]
assert "some_data" in all_resource_metrics
assert "multi_table_data" in all_resource_metrics
assert "data_with_priority" in all_resource_metrics
assert all_resource_metrics["some_data"].custom_metrics == {
"early_count": 1 if as_single_batch else 6,
"late_count": 1 if as_single_batch else 3,
}
assert all_resource_metrics["multi_table_data"].custom_metrics == {
"count_users": 2,
"count_products": 2,
}
assert all_resource_metrics["data_with_priority"].custom_metrics == {
"high_priority_count": 2,
"low_priority_count": 3,
}
Loading