From e607691857e0609433685de06d9257df8c5c0265 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 27 Nov 2024 15:59:08 +0100 Subject: [PATCH] ingest: add dashboard panels for concurrent ingestion/fetching (#10021) * Add batch latency and fetch throughput Signed-off-by: Dimitar Dimitrov * Add exemplars to latency histograms, improve metadata Signed-off-by: Dimitar Dimitrov * Add more panels Signed-off-by: Dimitar Dimitrov * Add CHANGELOG.md entry Signed-off-by: Dimitar Dimitrov * Rename metric to include estimated Signed-off-by: Dimitar Dimitrov * Remove double overlapping Signed-off-by: Dimitar Dimitrov * Replace sample with series Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov --- CHANGELOG.md | 1 + .../dashboards/dashboard-utils.libsonnet | 8 ++ .../mimir-mixin/dashboards/writes.libsonnet | 125 ++++++++++++++---- pkg/storage/ingest/fetcher.go | 9 +- pkg/storage/ingest/fetcher_test.go | 6 +- pkg/storage/ingest/pusher_metrics.go | 2 +- pkg/storage/ingest/reader.go | 21 +-- 7 files changed, 129 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48f955548fc..d358abfd3b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,6 +105,7 @@ * [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697 * [ENHANCEMENT] Dashboards: Include block compaction level on queried blocks in 'Mimir / Queries' dashboard. #9706 * [ENHANCEMENT] Alerts: add `MimirIngesterMissedRecordsFromKafka` to detect gaps in consumed records in the ingester when using the experimental Kafka-based storage. #9921 #9972 +* [ENHANCEMENT] Dashboards: Add more panels to 'Mimir / Writes' for concurrent ingestion and fetching when using ingest storage. #10021 * [BUGFIX] Dashboards: Fix autoscaling metrics joins when series churn. #9412 #9450 #9432 * [BUGFIX] Alerts: Fix autoscaling metrics joins in `MimirAutoscalerNotActive` when series churn. #9412 * [BUGFIX] Alerts: Exclude failed cache "add" operations from alerting since failures are expected in normal operation. #9658 diff --git a/operations/mimir-mixin/dashboards/dashboard-utils.libsonnet b/operations/mimir-mixin/dashboards/dashboard-utils.libsonnet index 6724553683a..fe8e64dfe51 100644 --- a/operations/mimir-mixin/dashboards/dashboard-utils.libsonnet +++ b/operations/mimir-mixin/dashboards/dashboard-utils.libsonnet @@ -1935,4 +1935,12 @@ local utils = import 'mixin-utils/utils.libsonnet'; defaults+: { unit: 's' }, }, }, + + withExemplars(queryPanel):: + queryPanel { + targets: [ + target { exemplar: true } + for target in queryPanel.targets + ], + }, } diff --git a/operations/mimir-mixin/dashboards/writes.libsonnet b/operations/mimir-mixin/dashboards/writes.libsonnet index b9915f13c6b..0f7bf92c29d 100644 --- a/operations/mimir-mixin/dashboards/writes.libsonnet +++ b/operations/mimir-mixin/dashboards/writes.libsonnet @@ -239,6 +239,88 @@ local filename = 'mimir-writes.json'; ], ) + $.aliasColors({ successful: $._colors.success, failed: $._colors.failed, 'read errors': $._colors.failed }) + $.stack, ) + .addPanel( + $.timeseriesPanel('Kafka records batch latency') + + $.panelDescription( + 'Kafka records batch latency', + ||| + This panel displays the two stages in processing a record batch from Kafka. Fetching batches happens asynchronously to processing them. + + If processing batches is the bottleneck, then "Awaiting next batch avg" will be close to zero. In this case you can consider tuning the ingestion-concurrency configuration parameters. + + If fetching is the bottleneck, then "Awaiting next batch avg" will be in the high tens of milliseconds or higher. + It is normal for fetching to be the bottleneck during normal operation because a lot of the time is spent in waiting for new records to be produced. + ||| + ) + + $.withExemplars($.queryPanel( + [ + 'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_batch_process_duration_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_batch_wait_duration_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + ], + [ + 'Batch processing avg', + 'Awaiting next batch avg', + ], + )) + { + fieldConfig+: { + defaults+: { unit: 's', exemplar: true }, + }, + } + + $.stack, + ) + .addPanel( + $.timeseriesPanel('Record size') + + $.panelDescription( + 'Record size', + ||| + Concurrent fetching estimates the size of records. + The estimation is used to enforce the max-buffered-bytes limit and to reduce discarded bytes. + ||| + ) + + $.queryPanel([ + ||| + sum(rate(cortex_ingest_storage_reader_fetch_bytes_total{%s}[$__rate_interval])) + / + sum(rate(cortex_ingest_storage_reader_records_per_fetch_sum{%s}[$__rate_interval])) + ||| % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)], + ||| + histogram_avg(sum(rate(cortex_ingest_storage_reader_estimated_bytes_per_record{%s}[$__rate_interval]))) + ||| + % [$.jobMatcher($._config.job_names.ingester)], + ], [ + 'Actual bytes per record (avg)', + 'Estimated bytes per record (avg)', + ]) + + { fieldConfig+: { defaults+: { unit: 'bytes' } } }, + ) + ) + .addRowIf( + $._config.show_ingest_storage_panels, + $.row('') + .addPanel( + $.timeseriesPanel('Kafka fetch throughput') + + $.panelDescription( + 'Kafka fetch throughput', + ||| + Throughput of fetches received from Kafka brokers. + This panel shows the rate of bytes fetched from Kafka brokers, and the rate of bytes discarded. + The discarded bytes are due to concurrently fetching overlapping offsets. + + Discarded bytes amounting to up to 10% of the total fetched bytes are exepcted during startup when there is higher concurrency in fetching. + Discarded bytes amounting to around 1% of the total fetched bytes are expected during normal operation. + + High values of discarded bytes might indicate inaccurate estimation of record size. + ||| + ) + + $.queryPanel([ + 'sum(rate(cortex_ingest_storage_reader_fetch_bytes_total{%s}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)], + 'sum(rate(cortex_ingest_storage_reader_fetched_discarded_bytes_total{%s}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)], + ], [ + 'Fetched bytes (decompressed)', + 'Discarded bytes (decompressed)', + ]) + + { fieldConfig+: { defaults+: { unit: 'Bps' } } }, + ) .addPanel( $.timeseriesPanel('Write request batches processed / sec') + $.panelDescription( @@ -271,9 +353,9 @@ local filename = 'mimir-writes.json'; 'sum ( # This is the old metric name. We\'re keeping support for backward compatibility. rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="client"}[$__rate_interval]) - or - rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="client"}[$__rate_interval]) - )' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)], + or + rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="client"}[$__rate_interval]) + )' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)], 'sum ( # This is the old metric name. We\'re keeping support for backward compatibility. rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="server"}[$__rate_interval]) @@ -289,31 +371,23 @@ local filename = 'mimir-writes.json'; ) + $.aliasColors({ successful: $._colors.success, 'failed (client)': $._colors.clientError, 'failed (server)': $._colors.failed }) + $.stack, ) .addPanel( - $.timeseriesPanel('Kafka records batch processing latency') + + $.timeseriesPanel('Ingested series / sec') + $.panelDescription( - 'Kafka records batch processing latency', + 'Ingested series', ||| - Time taken to process a batch of Kafka records (each record contains a write request). + Concurrent ingestion estimates the number of timeseries per batch to choose the optimal concurrency settings. + + Normally one timeseries contains one sample, but it can contain multiple samples. ||| ) + - $.queryPanel( - [ - 'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - ], - [ - 'avg', - '99th percentile', - '99.9th percentile', - '100th percentile', - ], - ) + { - fieldConfig+: { - defaults+: { unit: 's' }, - }, - }, + $.queryPanel([ + 'histogram_sum(sum(rate(cortex_ingest_storage_reader_pusher_timeseries_per_flush{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'sum(rate(cortex_ingest_storage_reader_pusher_estimated_timeseries_total{%s}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)], + ], [ + 'Actual series', + 'Estimated series', + ]) + + { fieldConfig+: { defaults+: { unit: 'short' } } }, ) ) .addRowIf( @@ -321,7 +395,8 @@ local filename = 'mimir-writes.json'; $.row('Ingester – end-to-end latency (ingest storage)') .addPanel( $.ingestStorageIngesterEndToEndLatencyWhenRunningPanel(), - ).addPanel( + ) + .addPanel( $.ingestStorageIngesterEndToEndLatencyOutliersWhenRunningPanel(), ) .addPanel( diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 4c0fc61ba31..96d3c8e8f62 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/instrument" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kadm" @@ -64,8 +65,8 @@ type fetcher interface { // BufferedBytes returns the number of bytes that have been fetched but not yet consumed. BufferedBytes() int64 - // BytesPerRecord returns the current estimation for how many bytes each record is. - BytesPerRecord() int64 + // EstimatedBytesPerRecord returns the current estimation for how many bytes each record is. + EstimatedBytesPerRecord() int64 } // fetchWant represents a range of offsets to fetch. @@ -373,7 +374,7 @@ func (r *concurrentFetchers) BufferedBytes() int64 { return r.bufferedFetchedBytes.Load() } -func (r *concurrentFetchers) BytesPerRecord() int64 { +func (r *concurrentFetchers) EstimatedBytesPerRecord() int64 { return r.estimatedBytesPerRecord.Load() } @@ -493,7 +494,7 @@ func recordIndexAfterOffset(records []*kgo.Record, offset int64) int { func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) { waitDuration := time.Since(waitStartTime) level.Debug(r.logger).Log("msg", "received ordered fetch", "num_records", len(f.Records), "wait_duration", waitDuration) - r.metrics.fetchWaitDuration.Observe(waitDuration.Seconds()) + instrument.ObserveWithExemplar(f.ctx, r.metrics.fetchWaitDuration, waitDuration.Seconds()) var ( doubleFetchedBytes = 0 diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index f6658987bd1..5e3cec8a16d 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -296,9 +296,9 @@ func TestFranzGoErrorStrings(t *testing.T) { type noopReaderMetricsSource struct { } -func (n noopReaderMetricsSource) BufferedBytes() int64 { return 0 } -func (n noopReaderMetricsSource) BufferedRecords() int64 { return 0 } -func (n noopReaderMetricsSource) BytesPerRecord() int64 { return 0 } +func (n noopReaderMetricsSource) BufferedBytes() int64 { return 0 } +func (n noopReaderMetricsSource) BufferedRecords() int64 { return 0 } +func (n noopReaderMetricsSource) EstimatedBytesPerRecord() int64 { return 0 } func TestConcurrentFetchers(t *testing.T) { const ( diff --git a/pkg/storage/ingest/pusher_metrics.go b/pkg/storage/ingest/pusher_metrics.go index c14382fe2f6..3b1e25c731b 100644 --- a/pkg/storage/ingest/pusher_metrics.go +++ b/pkg/storage/ingest/pusher_metrics.go @@ -22,7 +22,7 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics storagePusherMetrics: newStoragePusherMetrics(reg), processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_reader_records_processing_time_seconds", - Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka.", + Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka. This is the latency of a single attempt and does not include retries.", NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index dab044f8521..f021d43e880 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/instrument" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" "github.com/opentracing/opentracing-go" @@ -164,9 +165,9 @@ func (r *PartitionReader) BufferedBytes() int64 { return fcount + ccount } -func (r *PartitionReader) BytesPerRecord() int64 { +func (r *PartitionReader) EstimatedBytesPerRecord() int64 { if f := r.getFetcher(); f != nil && f != r { - return f.BytesPerRecord() + return f.EstimatedBytesPerRecord() } return 0 @@ -622,7 +623,7 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche MaxRetries: 0, // retry forever }) defer func(consumeStart time.Time) { - r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) + instrument.ObserveWithExemplar(ctx, r.metrics.consumeLatency, time.Since(consumeStart).Seconds()) }(time.Now()) logger := spanlogger.FromContext(ctx, r.logger) @@ -1010,7 +1011,7 @@ type readerMetrics struct { bufferedFetchedRecords prometheus.GaugeFunc bufferedFetchedBytes prometheus.GaugeFunc - bytesPerRecord prometheus.Histogram + estimatedBytesPerRecord prometheus.Histogram receiveDelayWhenStarting prometheus.Observer receiveDelayWhenRunning prometheus.Observer recordsPerFetch prometheus.Histogram @@ -1028,7 +1029,7 @@ type readerMetrics struct { type readerMetricsSource interface { BufferedBytes() int64 BufferedRecords() int64 - BytesPerRecord() int64 + EstimatedBytesPerRecord() int64 } func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource) readerMetrics { @@ -1062,9 +1063,9 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc Name: "cortex_ingest_storage_reader_buffered_fetched_bytes", Help: "The number of bytes fetched or requested from Kafka by both concurrent fetchers and the Kafka client but not yet processed. The value depends on -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes.", }, func() float64 { return float64(metricsSource.BufferedBytes()) }), - bytesPerRecord: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_ingest_storage_reader_bytes_per_record", - Help: "Observations with the current size of records fetched from Kafka. Sampled at 10Hz.", + estimatedBytesPerRecord: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_estimated_bytes_per_record", + Help: "Observations with the current size estimation of records fetched from Kafka. Sampled at 10Hz.", NativeHistogramBucketFactor: 1.1, }), receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), @@ -1093,7 +1094,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc }), consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_reader_records_batch_process_duration_seconds", - Help: "How long a consumer spent processing a batch of records from Kafka.", + Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.", NativeHistogramBucketFactor: 1.1, }), strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), @@ -1106,7 +1107,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc } m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error { - m.bytesPerRecord.Observe(float64(metricsSource.BytesPerRecord())) + m.estimatedBytesPerRecord.Observe(float64(metricsSource.EstimatedBytesPerRecord())) return nil }, nil) return m