diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index af4b45388241..8a01514cdf2f 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -14,6 +14,7 @@ import ( ringclient "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -238,20 +239,8 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo } } - if len(servers) > 0 { - // cache locality score (higher is better): - // `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances), - // but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of - // `1-2/num_instances` -> `1`, where the former represents slight - // overlap on instances to the left and right of the range. - pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64) - pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs()))) - cacheLocalityScore := pctKeyspace / pctInstances - c.metrics.cacheLocalityScore.Observe(cacheLocalityScore) - } - results := make([][]*logproto.GroupedChunkRefs, len(servers)) - count := 0 + count := atomic.NewInt64(0) err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { rs := servers[i] @@ -269,10 +258,24 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo } resp, err := client.FilterChunkRefs(ctx, req) if err != nil { - return err + // We don't want a single bloom-gw failure to fail the entire query, + // so instrument & move on + level.Error(c.logger).Log( + "msg", "filter failed for instance, skipping", + "addr", rs.addr, + "series", len(rs.groups), + "blocks", len(rs.blocks), + "err", err, + ) + // filter none of the results on failed request + c.metrics.clientRequests.WithLabelValues(typeError).Inc() + results[i] = rs.groups + } else { + c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc() + results[i] = resp.ChunkRefs } - results[i] = resp.ChunkRefs - count += len(resp.ChunkRefs) + + count.Add(int64(len(results[i]))) return nil }) }) @@ -281,7 +284,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo return nil, err } - buf := make([]*logproto.GroupedChunkRefs, 0, count) + buf := make([]*logproto.GroupedChunkRefs, 0, int(count.Load())) return mergeSeries(results, buf) } diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 0d408991b40c..0885bc2ae7cb 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -15,21 +15,25 @@ type metrics struct { *serverMetrics } +const ( + typeSuccess = "success" + typeError = "error" +) + type clientMetrics struct { - cacheLocalityScore prometheus.Histogram - requestLatency *prometheus.HistogramVec - clients prometheus.Gauge + clientRequests *prometheus.CounterVec + requestLatency *prometheus.HistogramVec + clients prometheus.Gauge } func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { return &clientMetrics{ - cacheLocalityScore: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + clientRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Subsystem: "bloom_gateway_client", - Name: "cache_locality_score", - Help: "Cache locality score of the bloom filter, as measured by % of keyspace touched / % of bloom_gws required", - Buckets: prometheus.LinearBuckets(0.01, 0.2, 5), - }), + Name: "requests_total", + Help: "Total number of requests made to the bloom gateway", + }, []string{"type"}), requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: constants.Loki, Subsystem: "bloom_gateway_client",