-
Couldn't load subscription status.
- Fork 355
Description
Background
We want resources and transform steps to be able to generate custom metrics and then collect them at the end of extract step when ExtractInfo is generated. Examples
Incrementalgenerating a metric of total items vs. filtered out items and a metric related to key granularity (number of unique hashes in state) so we can detect common pitfalls ie. Incremental that everytime filters through a whole dataset or has low granularity cursor column- quality checks / validations that run after the
pipeline.runin an additionalextractstep withpipeline.dataset()as input and store results in metrics dict
Those metrics will be visible in trace and ultimately may be used by Workspace Dashboard and other tools. Note that metrics are only stored in a trace and in ExtractInfo object and should be visible on DltResource object after pipeline has run.
We want a nice user interface for it:
- metrics are just a dictionary that is scoped to a resource. resource state is similar. user just sets values in it.
- users can get writable dict via
dlt.current.resource_metrics() - users implementing custom
ItemTransformhave access tometricsproperty of that item that will be collected - we add
add_metricstoDltResourcewhich will add a transform step used to generate metrics from flowing data (this should be recommended way)
Tasks
We can split this into a few PRs:
PR 1:
implement custom metrics on a DltResource
-
- add
metricsprop onDltResourceand return it viadlt.current.resource_metrics().
- add
-
- add
metricsprop onItemTransform
- add
-
- When extract step is completed,
extractmodule must collect metrics from all steps of all participating resources._compute_metricsmethod is a good place to do this. just merge all dictionaries: frommetricsinDltResource, job metrics inresource_metricsand step metrics.
- When extract step is completed,
-
- Make sure those get into ExtractInfo
-
- Write tests where we check that metrics are collected and available, that they are in extract info and persisted in the schema
-
- Write tests to see what happens when users loads trace with dlt (what will be the shape of the data). mind that we have trace tests that also are checked against trace schema. find them and add metrics to this tests
-
- Write a chapter on custom metrics in
reource.md
- Write a chapter on custom metrics in
PR 2:
practical implementation:
-
- update
Incrementalclass so it collects custom metrics:
- update
unfiltered_items_count: number of items entering the step (before incremental filtlers them out). remember to count rows likeLimitItemdoesunfiltered_batches_count: number of batches (pages) entering the stepuniq_hashes_count: number of unique hashes in state
def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: looks like a good place to count metrics.
PR 3: (move to separate ticket)
-
- Implement
MetricsItemdering andadd_metricsworking likeadd_map(will create a metrics from function with right signature).
- Implement
-
- Metrics function should take: items, meta and metrics (dict) as input arguments.
-
- implement practical example:
batch_counterfunction that counts batches entering the steps
- implement practical example:
Metadata
Metadata
Assignees
Labels
Type
Projects
Status