Skip to content

allow custom metrics to be added to dlt resources and collect in extract info #3072

@rudolfix

Description

@rudolfix

Background
We want resources and transform steps to be able to generate custom metrics and then collect them at the end of extract step when ExtractInfo is generated. Examples

  1. Incremental generating a metric of total items vs. filtered out items and a metric related to key granularity (number of unique hashes in state) so we can detect common pitfalls ie. Incremental that everytime filters through a whole dataset or has low granularity cursor column
  2. quality checks / validations that run after the pipeline.run in an additional extract step with pipeline.dataset() as input and store results in metrics dict

Those metrics will be visible in trace and ultimately may be used by Workspace Dashboard and other tools. Note that metrics are only stored in a trace and in ExtractInfo object and should be visible on DltResource object after pipeline has run.

We want a nice user interface for it:

  • metrics are just a dictionary that is scoped to a resource. resource state is similar. user just sets values in it.
  • users can get writable dict via dlt.current.resource_metrics()
  • users implementing custom ItemTransform have access to metrics property of that item that will be collected
  • we add add_metrics to DltResource which will add a transform step used to generate metrics from flowing data (this should be recommended way)

Tasks
We can split this into a few PRs:

PR 1:
implement custom metrics on a DltResource

    • add metrics prop on DltResource and return it via dlt.current.resource_metrics().
    • add metrics prop on ItemTransform
    • When extract step is completed, extract module must collect metrics from all steps of all participating resources. _compute_metricsmethod is a good place to do this. just merge all dictionaries: from metrics in DltResource, job metrics in resource_metrics and step metrics.
    • Make sure those get into ExtractInfo
    • Write tests where we check that metrics are collected and available, that they are in extract info and persisted in the schema
    • Write tests to see what happens when users loads trace with dlt (what will be the shape of the data). mind that we have trace tests that also are checked against trace schema. find them and add metrics to this tests
    • Write a chapter on custom metrics in reource.md

PR 2:
practical implementation:

    • update Incremental class so it collects custom metrics:
  • unfiltered_items_count: number of items entering the step (before incremental filtlers them out). remember to count rows like LimitItem does
  • unfiltered_batches_count: number of batches (pages) entering the step
  • uniq_hashes_count: number of unique hashes in state

def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: looks like a good place to count metrics.

PR 3: (move to separate ticket)

    • Implement MetricsItem dering and add_metrics working like add_map (will create a metrics from function with right signature).
    • Metrics function should take: items, meta and metrics (dict) as input arguments.
    • implement practical example: batch_counter function that counts batches entering the steps

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions