Skip to content

Commit

Permalink
ingest: add dashboard panels for concurrent ingestion/fetching (#10021)
Browse files Browse the repository at this point in the history
* Add batch latency and fetch throughput

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add exemplars to latency histograms, improve metadata

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add more panels

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename metric to include estimated

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove double overlapping

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Replace sample with series

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored Nov 27, 2024
1 parent 39dd512 commit e607691
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
* [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697
* [ENHANCEMENT] Dashboards: Include block compaction level on queried blocks in 'Mimir / Queries' dashboard. #9706
* [ENHANCEMENT] Alerts: add `MimirIngesterMissedRecordsFromKafka` to detect gaps in consumed records in the ingester when using the experimental Kafka-based storage. #9921 #9972
* [ENHANCEMENT] Dashboards: Add more panels to 'Mimir / Writes' for concurrent ingestion and fetching when using ingest storage. #10021
* [BUGFIX] Dashboards: Fix autoscaling metrics joins when series churn. #9412 #9450 #9432
* [BUGFIX] Alerts: Fix autoscaling metrics joins in `MimirAutoscalerNotActive` when series churn. #9412
* [BUGFIX] Alerts: Exclude failed cache "add" operations from alerting since failures are expected in normal operation. #9658
Expand Down
8 changes: 8 additions & 0 deletions operations/mimir-mixin/dashboards/dashboard-utils.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -1935,4 +1935,12 @@ local utils = import 'mixin-utils/utils.libsonnet';
defaults+: { unit: 's' },
},
},

withExemplars(queryPanel)::
queryPanel {
targets: [
target { exemplar: true }
for target in queryPanel.targets
],
},
}
125 changes: 100 additions & 25 deletions operations/mimir-mixin/dashboards/writes.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,88 @@ local filename = 'mimir-writes.json';
],
) + $.aliasColors({ successful: $._colors.success, failed: $._colors.failed, 'read errors': $._colors.failed }) + $.stack,
)
.addPanel(
$.timeseriesPanel('Kafka records batch latency') +
$.panelDescription(
'Kafka records batch latency',
|||
This panel displays the two stages in processing a record batch from Kafka. Fetching batches happens asynchronously to processing them.
If processing batches is the bottleneck, then "Awaiting next batch avg" will be close to zero. In this case you can consider tuning the ingestion-concurrency configuration parameters.
If fetching is the bottleneck, then "Awaiting next batch avg" will be in the high tens of milliseconds or higher.
It is normal for fetching to be the bottleneck during normal operation because a lot of the time is spent in waiting for new records to be produced.
|||
) +
$.withExemplars($.queryPanel(
[
'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_batch_process_duration_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_batch_wait_duration_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
],
[
'Batch processing avg',
'Awaiting next batch avg',
],
)) + {
fieldConfig+: {
defaults+: { unit: 's', exemplar: true },
},
} +
$.stack,
)
.addPanel(
$.timeseriesPanel('Record size') +
$.panelDescription(
'Record size',
|||
Concurrent fetching estimates the size of records.
The estimation is used to enforce the max-buffered-bytes limit and to reduce discarded bytes.
|||
) +
$.queryPanel([
|||
sum(rate(cortex_ingest_storage_reader_fetch_bytes_total{%s}[$__rate_interval]))
/
sum(rate(cortex_ingest_storage_reader_records_per_fetch_sum{%s}[$__rate_interval]))
||| % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
|||
histogram_avg(sum(rate(cortex_ingest_storage_reader_estimated_bytes_per_record{%s}[$__rate_interval])))
|||
% [$.jobMatcher($._config.job_names.ingester)],
], [
'Actual bytes per record (avg)',
'Estimated bytes per record (avg)',
]) +
{ fieldConfig+: { defaults+: { unit: 'bytes' } } },
)
)
.addRowIf(
$._config.show_ingest_storage_panels,
$.row('')
.addPanel(
$.timeseriesPanel('Kafka fetch throughput') +
$.panelDescription(
'Kafka fetch throughput',
|||
Throughput of fetches received from Kafka brokers.
This panel shows the rate of bytes fetched from Kafka brokers, and the rate of bytes discarded.
The discarded bytes are due to concurrently fetching overlapping offsets.
Discarded bytes amounting to up to 10% of the total fetched bytes are exepcted during startup when there is higher concurrency in fetching.
Discarded bytes amounting to around 1% of the total fetched bytes are expected during normal operation.
High values of discarded bytes might indicate inaccurate estimation of record size.
|||
) +
$.queryPanel([
'sum(rate(cortex_ingest_storage_reader_fetch_bytes_total{%s}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
'sum(rate(cortex_ingest_storage_reader_fetched_discarded_bytes_total{%s}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
], [
'Fetched bytes (decompressed)',
'Discarded bytes (decompressed)',
]) +
{ fieldConfig+: { defaults+: { unit: 'Bps' } } },
)
.addPanel(
$.timeseriesPanel('Write request batches processed / sec') +
$.panelDescription(
Expand Down Expand Up @@ -271,9 +353,9 @@ local filename = 'mimir-writes.json';
'sum (
# This is the old metric name. We\'re keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="client"}[$__rate_interval])
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="client"}[$__rate_interval])
)' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="client"}[$__rate_interval])
)' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (
# This is the old metric name. We\'re keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="server"}[$__rate_interval])
Expand All @@ -289,39 +371,32 @@ local filename = 'mimir-writes.json';
) + $.aliasColors({ successful: $._colors.success, 'failed (client)': $._colors.clientError, 'failed (server)': $._colors.failed }) + $.stack,
)
.addPanel(
$.timeseriesPanel('Kafka records batch processing latency') +
$.timeseriesPanel('Ingested series / sec') +
$.panelDescription(
'Kafka records batch processing latency',
'Ingested series',
|||
Time taken to process a batch of Kafka records (each record contains a write request).
Concurrent ingestion estimates the number of timeseries per batch to choose the optimal concurrency settings.
Normally one timeseries contains one sample, but it can contain multiple samples.
|||
) +
$.queryPanel(
[
'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
],
[
'avg',
'99th percentile',
'99.9th percentile',
'100th percentile',
],
) + {
fieldConfig+: {
defaults+: { unit: 's' },
},
},
$.queryPanel([
'histogram_sum(sum(rate(cortex_ingest_storage_reader_pusher_timeseries_per_flush{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'sum(rate(cortex_ingest_storage_reader_pusher_estimated_timeseries_total{%s}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
], [
'Actual series',
'Estimated series',
]) +
{ fieldConfig+: { defaults+: { unit: 'short' } } },
)
)
.addRowIf(
$._config.show_ingest_storage_panels,
$.row('Ingester – end-to-end latency (ingest storage)')
.addPanel(
$.ingestStorageIngesterEndToEndLatencyWhenRunningPanel(),
).addPanel(
)
.addPanel(
$.ingestStorageIngesterEndToEndLatencyOutliersWhenRunningPanel(),
)
.addPanel(
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/instrument"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -64,8 +65,8 @@ type fetcher interface {
// BufferedBytes returns the number of bytes that have been fetched but not yet consumed.
BufferedBytes() int64

// BytesPerRecord returns the current estimation for how many bytes each record is.
BytesPerRecord() int64
// EstimatedBytesPerRecord returns the current estimation for how many bytes each record is.
EstimatedBytesPerRecord() int64
}

// fetchWant represents a range of offsets to fetch.
Expand Down Expand Up @@ -373,7 +374,7 @@ func (r *concurrentFetchers) BufferedBytes() int64 {
return r.bufferedFetchedBytes.Load()
}

func (r *concurrentFetchers) BytesPerRecord() int64 {
func (r *concurrentFetchers) EstimatedBytesPerRecord() int64 {
return r.estimatedBytesPerRecord.Load()
}

Expand Down Expand Up @@ -493,7 +494,7 @@ func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) {
waitDuration := time.Since(waitStartTime)
level.Debug(r.logger).Log("msg", "received ordered fetch", "num_records", len(f.Records), "wait_duration", waitDuration)
r.metrics.fetchWaitDuration.Observe(waitDuration.Seconds())
instrument.ObserveWithExemplar(f.ctx, r.metrics.fetchWaitDuration, waitDuration.Seconds())

var (
doubleFetchedBytes = 0
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ func TestFranzGoErrorStrings(t *testing.T) {
type noopReaderMetricsSource struct {
}

func (n noopReaderMetricsSource) BufferedBytes() int64 { return 0 }
func (n noopReaderMetricsSource) BufferedRecords() int64 { return 0 }
func (n noopReaderMetricsSource) BytesPerRecord() int64 { return 0 }
func (n noopReaderMetricsSource) BufferedBytes() int64 { return 0 }
func (n noopReaderMetricsSource) BufferedRecords() int64 { return 0 }
func (n noopReaderMetricsSource) EstimatedBytesPerRecord() int64 { return 0 }

func TestConcurrentFetchers(t *testing.T) {
const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/pusher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics
storagePusherMetrics: newStoragePusherMetrics(reg),
processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_records_processing_time_seconds",
Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka.",
Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka. This is the latency of a single attempt and does not include retries.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Expand Down
21 changes: 11 additions & 10 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -164,9 +165,9 @@ func (r *PartitionReader) BufferedBytes() int64 {
return fcount + ccount
}

func (r *PartitionReader) BytesPerRecord() int64 {
func (r *PartitionReader) EstimatedBytesPerRecord() int64 {
if f := r.getFetcher(); f != nil && f != r {
return f.BytesPerRecord()
return f.EstimatedBytesPerRecord()
}

return 0
Expand Down Expand Up @@ -622,7 +623,7 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche
MaxRetries: 0, // retry forever
})
defer func(consumeStart time.Time) {
r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds())
instrument.ObserveWithExemplar(ctx, r.metrics.consumeLatency, time.Since(consumeStart).Seconds())
}(time.Now())

logger := spanlogger.FromContext(ctx, r.logger)
Expand Down Expand Up @@ -1010,7 +1011,7 @@ type readerMetrics struct {

bufferedFetchedRecords prometheus.GaugeFunc
bufferedFetchedBytes prometheus.GaugeFunc
bytesPerRecord prometheus.Histogram
estimatedBytesPerRecord prometheus.Histogram
receiveDelayWhenStarting prometheus.Observer
receiveDelayWhenRunning prometheus.Observer
recordsPerFetch prometheus.Histogram
Expand All @@ -1028,7 +1029,7 @@ type readerMetrics struct {
type readerMetricsSource interface {
BufferedBytes() int64
BufferedRecords() int64
BytesPerRecord() int64
EstimatedBytesPerRecord() int64
}

func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSource readerMetricsSource) readerMetrics {
Expand Down Expand Up @@ -1062,9 +1063,9 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
Name: "cortex_ingest_storage_reader_buffered_fetched_bytes",
Help: "The number of bytes fetched or requested from Kafka by both concurrent fetchers and the Kafka client but not yet processed. The value depends on -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes.",
}, func() float64 { return float64(metricsSource.BufferedBytes()) }),
bytesPerRecord: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_bytes_per_record",
Help: "Observations with the current size of records fetched from Kafka. Sampled at 10Hz.",
estimatedBytesPerRecord: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_estimated_bytes_per_record",
Help: "Observations with the current size estimation of records fetched from Kafka. Sampled at 10Hz.",
NativeHistogramBucketFactor: 1.1,
}),
receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"),
Expand Down Expand Up @@ -1093,7 +1094,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
}),
consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_records_batch_process_duration_seconds",
Help: "How long a consumer spent processing a batch of records from Kafka.",
Help: "How long a consumer spent processing a batch of records from Kafka. This includes retries on server errors.",
NativeHistogramBucketFactor: 1.1,
}),
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
Expand All @@ -1106,7 +1107,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
}

m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {
m.bytesPerRecord.Observe(float64(metricsSource.BytesPerRecord()))
m.estimatedBytesPerRecord.Observe(float64(metricsSource.EstimatedBytesPerRecord()))
return nil
}, nil)
return m
Expand Down

0 comments on commit e607691

Please sign in to comment.