From 9834d6db62900490470d1def152a90c8f52c492e Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Thu, 19 Nov 2020 16:32:30 +0100 Subject: [PATCH] Refactor consistency check so it can be reused. (#3512) This is in anticipation of thanos-io/thanos#3469 where we would need to utilise the consistency check to query for labels as well. Signed-off-by: Goutham Veeramachaneni Former-commit-id: 1216d0087fdbaf7f4ba3d534ac8e749af7e20e8d --- pkg/querier/blocks_store_queryable.go | 103 ++++++++++++++------- pkg/querier/blocks_store_queryable_test.go | 2 +- 2 files changed, 68 insertions(+), 37 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 29db5ee3e83..429e4bd22bb 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -315,6 +315,55 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* minT, maxT = sp.Start, sp.End } + var ( + convertedMatchers = convertMatchersToLabelMatcher(matchers) + resSeriesSets = []storage.SeriesSet(nil) + resWarnings = storage.Warnings(nil) + + maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID) + leftChunksLimit = maxChunksLimit + + resultMtx sync.Mutex + ) + + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { + seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit) + if err != nil { + return nil, err + } + + resultMtx.Lock() + + resSeriesSets = append(resSeriesSets, seriesSets...) + resWarnings = append(resWarnings, warnings...) + + // Given a single block is guaranteed to not be queried twice, we can safely decrease the number of + // chunks we can still read before hitting the limit (max == 0 means disabled). + if maxChunksLimit > 0 { + leftChunksLimit -= numChunks + } + + resultMtx.Unlock() + + return queriedBlocks, nil + } + + err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) + if err != nil { + return storage.ErrSeriesSet(err) + } + + if len(resSeriesSets) == 0 { + storage.EmptySeriesSet() + } + + return series.NewSeriesSetWithWarnings( + storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge), + resWarnings) +} + +func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, + queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error)) error { // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until // now - queryStoreAfter, because the most recent time range is covered by ingesters. This // optimization is particularly important for the blocks storage because can be used to skip @@ -325,29 +374,29 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter))) if origMaxT != maxT { - level.Debug(spanLog).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT) + level.Debug(logger).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT) } if maxT < minT { q.metrics.storesHit.Observe(0) - level.Debug(spanLog).Log("msg", "empty query time range after max time manipulation") - return storage.EmptySeriesSet() + level.Debug(logger).Log("msg", "empty query time range after max time manipulation") + return nil } } // Find the list of blocks we need to query given the time range. knownMetas, knownDeletionMarks, err := q.finder.GetBlocks(q.userID, minT, maxT) if err != nil { - return storage.ErrSeriesSet(err) + return err } if len(knownMetas) == 0 { q.metrics.storesHit.Observe(0) - level.Debug(spanLog).Log("msg", "no blocks found") - return storage.EmptySeriesSet() + level.Debug(logger).Log("msg", "no blocks found") + return nil } - level.Debug(spanLog).Log("msg", "found blocks to query", "expected", BlockMetas(knownMetas).String()) + level.Debug(logger).Log("msg", "found blocks to query", "expected", BlockMetas(knownMetas).String()) var ( // At the beginning the list of blocks to query are all known blocks. @@ -355,13 +404,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* attemptedBlocks = map[ulid.ULID][]string{} touchedStores = map[string]struct{}{} - convertedMatchers = convertMatchersToLabelMatcher(matchers) - resSeriesSets = []storage.SeriesSet(nil) - resWarnings = storage.Warnings(nil) - resQueriedBlocks = []ulid.ULID(nil) - - maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID) - leftChunksLimit = maxChunksLimit + resQueriedBlocks = []ulid.ULID(nil) ) for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ { @@ -372,32 +415,24 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* // If it's a retry and we get an error, it means there are no more store-gateways left // from which running another attempt, so we're just stopping retrying. if attempt > 1 { - level.Warn(spanLog).Log("msg", "unable to get store-gateway clients while retrying to fetch missing blocks", "err", err) + level.Warn(logger).Log("msg", "unable to get store-gateway clients while retrying to fetch missing blocks", "err", err) break } - return storage.ErrSeriesSet(err) + return err } - level.Debug(spanLog).Log("msg", "found store-gateway instances to query", "num instances", len(clients), "attempt", attempt) + level.Debug(logger).Log("msg", "found store-gateway instances to query", "num instances", len(clients), "attempt", attempt) // Fetch series from stores. If an error occur we do not retry because retries // are only meant to cover missing blocks. - seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit) + queriedBlocks, err := queryFunc(clients, minT, maxT) if err != nil { - return storage.ErrSeriesSet(err) + return err } - level.Debug(spanLog).Log("msg", "received series from all store-gateways", "queried blocks", strings.Join(convertULIDsToString(queriedBlocks), " ")) + level.Debug(logger).Log("msg", "received series from all store-gateways", "queried blocks", strings.Join(convertULIDsToString(queriedBlocks), " ")) - resSeriesSets = append(resSeriesSets, seriesSets...) - resWarnings = append(resWarnings, warnings...) resQueriedBlocks = append(resQueriedBlocks, queriedBlocks...) - // Given a single block is guaranteed to not be queried twice, we can safely decrease the number of - // chunks we can still read before hitting the limit (max == 0 means disabled). - if maxChunksLimit > 0 { - leftChunksLimit -= numChunks - } - // Update the map of blocks we attempted to query. for client, blockIDs := range clients { touchedStores[client.RemoteAddress()] = struct{}{} @@ -413,22 +448,18 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* q.metrics.storesHit.Observe(float64(len(touchedStores))) q.metrics.refetches.Observe(float64(attempt - 1)) - return series.NewSeriesSetWithWarnings( - storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge), - resWarnings) + return nil } - level.Debug(spanLog).Log("msg", "consistency check failed", "attempt", attempt, "missing blocks", strings.Join(convertULIDsToString(missingBlocks), " ")) + level.Debug(logger).Log("msg", "consistency check failed", "attempt", attempt, "missing blocks", strings.Join(convertULIDsToString(missingBlocks), " ")) // The next attempt should just query the missing blocks. remainingBlocks = missingBlocks } // We've not been able to query all expected blocks after all retries. - err = fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(convertULIDsToString(remainingBlocks), " ")) - level.Warn(util.WithContext(spanCtx, spanLog)).Log("msg", "failed consistency check", "err", err) - - return storage.ErrSeriesSet(err) + level.Warn(util.WithContext(ctx, logger)).Log("msg", "failed consistency check", "err", err) + return fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(convertULIDsToString(remainingBlocks), " ")) } func (q *blocksStoreQuerier) fetchSeriesFromStores( diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 3f2d20dd187..a4be09c8840 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -608,7 +608,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) End: testData.queryMaxT, } - set := q.selectSorted(sp, nil) + set := q.selectSorted(sp) require.NoError(t, set.Err()) if testData.expectedMinT == 0 && testData.expectedMaxT == 0 {