Skip to content

Executing child queries on queriers #1730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 97 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
edc8851
ast mapping
owen-d Oct 8, 2019
cecd6ed
wrapper types for querier roundtripping and in memory series set
owen-d Oct 8, 2019
42c1640
query sharding middleware
owen-d Oct 9, 2019
f9bdf07
embeds all selectors for distribution to queriers
owen-d Oct 9, 2019
8ac7205
shardsummer in middleware uses vectorsquasher
owen-d Oct 9, 2019
6eb66c1
value conversion unit tests
owen-d Oct 9, 2019
49caa6a
downstreamSeries.Seek method & adds unit tests
owen-d Oct 9, 2019
43dbbc0
corrects embedded queryable label matching/dispatching & adds unit tests
owen-d Oct 10, 2019
0e12f39
Series{Iterator,Set} expects to call Next() before At() initially (as…
owen-d Oct 10, 2019
8b13494
middleware embeds engine + unit tests
owen-d Oct 10, 2019
e1d7d97
exports ConcreteSeries{,Set}
owen-d Oct 10, 2019
c942382
removes custom Series{,Set,Iterator} impls in favor of queriers concr…
owen-d Oct 10, 2019
ff1a385
concreteSeries moved to its own pkg
owen-d Oct 10, 2019
a509b57
wiring for multiple sharding configs
owen-d Oct 16, 2019
1ec82a0
NodeMapper ASTMapper adapter which requires less verbose impls
owen-d Oct 19, 2019
cd35d76
adds NodeMapper ifc to simplify ASTMapper impls
owen-d Oct 19, 2019
c27c58d
queries assume milliseconds
owen-d Oct 21, 2019
5e9c0d4
dont shard requests across multiple shard configs
owen-d Oct 21, 2019
a344f48
lazy queryable and chunkstore to their own packages
owen-d Oct 21, 2019
039eefc
querysharding uses a lazy querier
owen-d Oct 21, 2019
4f00f9e
linter fixes
owen-d Oct 21, 2019
bc50779
__name__ label via constant
owen-d Oct 21, 2019
8879c6a
corrects error source
owen-d Oct 21, 2019
a936b6b
linting
owen-d Oct 21, 2019
197d902
ast unit test cases, removes use of unkeyed fields
owen-d Oct 22, 2019
455cdef
shard aware storage filtering
owen-d Oct 22, 2019
bc29fd3
s/FilterIndexQueries/FilterReadQueries/ for consistency
owen-d Oct 22, 2019
675b08e
add promql test to verify splitting
cyriltovena Oct 23, 2019
d148470
more promql equivalence tests, removes avg as we do not parallelize it
owen-d Oct 23, 2019
235d2ff
corrects astmapping sums behavior
owen-d Oct 23, 2019
cc6403a
adds ShardAnnotation type for enclosing __cortex_shard__ logic
owen-d Oct 24, 2019
91f4485
injects __cortex_shard__ labels for chunks in series_store
owen-d Oct 24, 2019
e40606a
exports LabelsSeriesID from pkg/chunk
owen-d Oct 24, 2019
95a6c16
shard aware ingester queries
owen-d Oct 24, 2019
c085be8
allows all functions as parallel
owen-d Oct 24, 2019
0dce5c9
parallel functions improvements/tests
owen-d Oct 25, 2019
6219d4d
fix missing rebased files
cyriltovena Oct 25, 2019
9253c50
Implement the new queryrange.Handler
cyriltovena Oct 25, 2019
3cb3953
Add missing comment
cyriltovena Oct 25, 2019
f44bafe
Add the new sum shard middleware to the query range
cyriltovena Oct 25, 2019
0a9e32f
moves querysharding into queryrange package
owen-d Oct 27, 2019
ac21eb8
approximates rate test
owen-d Oct 27, 2019
98657c5
approximates more floating point calculation combinations
owen-d Oct 27, 2019
99a2d37
populates chunkstore test shard factor via validation
owen-d Oct 27, 2019
5a11bcb
Adds some trace tags and logs
cyriltovena Oct 30, 2019
1a186b7
logging mapped queries
owen-d Oct 30, 2019
40dfdb7
Adds a warning if no shard exists in config
cyriltovena Oct 30, 2019
242095d
Improve error formating
cyriltovena Oct 30, 2019
09cf55d
Fixes empty matchers and add some logs
cyriltovena Oct 30, 2019
5587446
simplifies shard-aware series index searching
owen-d Oct 30, 2019
9118a79
Fixes inject shard label
cyriltovena Oct 30, 2019
b86b601
Add more logging in the ingester code
cyriltovena Oct 31, 2019
43097cc
fixes copy/pasted comment
owen-d Oct 31, 2019
d5f51c8
sorts memorySeries metrics for deterministic hashing in ingester
owen-d Oct 31, 2019
7aceecc
Don't change the inMemorySeries metric
cyriltovena Oct 31, 2019
d18dc62
Removes shard label in ingester.QueryStream
cyriltovena Oct 31, 2019
9c89ba2
clean up logs
cyriltovena Nov 1, 2019
6f3d3d4
shardsumming reduces shard result dimensions when possible
owen-d Nov 1, 2019
6319051
astmapper squashes children together to support concat instead of OR
owen-d Nov 4, 2019
856321f
Codedc ifc for query sharding, concats instead of ORing when possible
owen-d Nov 4, 2019
1229a3c
logs filtered queries
owen-d Nov 5, 2019
b34dbbd
instruments chunk cache decoding
owen-d Nov 5, 2019
a666624
only sorts metrics for an existing series [ingester]
owen-d Nov 6, 2019
1d0a20a
further instruments cachingIndexClient and chunkstore
owen-d Nov 6, 2019
c48ab5b
addtl instrumentation
owen-d Nov 7, 2019
13fdc4a
fixes now-exported LabelsSeriesID call that was missed in rebase
owen-d Nov 11, 2019
146d505
querier.Chunkstore moved to new pkg, update refs
owen-d Nov 11, 2019
35d3606
user_state shard testing ensures shards are filtered correctly by che…
owen-d Nov 11, 2019
1b69011
changes embeddedSeparator to <|> in order to minimize collision chance
owen-d Nov 13, 2019
6beed96
removes defaultfilterReadQueries indirection
owen-d Nov 13, 2019
3ebf479
computes rowshards in one fn
owen-d Nov 13, 2019
f032cb1
fixes issue with computed rowshards not being injected
owen-d Nov 13, 2019
e9a8b80
middle shard filter test
owen-d Nov 14, 2019
7222772
isolates shard-aware logic in ingesters
owen-d Nov 14, 2019
7bda990
minor function doc improvements
owen-d Nov 14, 2019
eedf0f7
json codex instead of hex to support arbitrary embedded queries, impr…
owen-d Nov 15, 2019
6a4e151
shardannotation methods where possible
owen-d Nov 15, 2019
718051f
exports OrSquasher
owen-d Nov 15, 2019
db6b721
reuses astmapper.ShardFromMatchers
owen-d Nov 15, 2019
03e8070
benchmarks OR nodes vs Concat
owen-d Nov 15, 2019
0fda17b
linting fixes
owen-d Nov 15, 2019
b7e94a2
adds querier.sum-shards to changelog
owen-d Nov 15, 2019
b099b6a
does not attempt to map query AST when conf.RowShards < 2
owen-d Nov 20, 2019
a92927c
benchmarks sharding queries
owen-d Nov 20, 2019
2c83b66
benchmark both linear time queryable and constant time queryables
owen-d Nov 26, 2019
1adf00a
removes OR node combining benchmark
owen-d Nov 27, 2019
0a1cb1c
moves MockShardedQueryable test helper to queryrange pkg
owen-d Nov 27, 2019
a3d812b
adds astmapping test for parallelizing legs of histogram_quantile fn
owen-d Nov 27, 2019
aa08334
cache friendly query sharding
owen-d Nov 27, 2019
c257909
moves querysharding to unreleased/master in changelog
owen-d Dec 2, 2019
a111d1e
subtreeMapper for better query delegation
owen-d Dec 2, 2019
fb40fe3
middleware correctness tests
owen-d Dec 3, 2019
952a04f
gofmt linting
owen-d Dec 3, 2019
df3c38a
subtreeFolder should not fold leaf scalars/string nodes
owen-d Dec 4, 2019
4ea09a4
introduces query-audit, a query correctness tool
owen-d Dec 4, 2019
a4a28e3
increase logging verbosity for query-audit reports
owen-d Dec 4, 2019
b3efc15
query sharding in arguments.md
owen-d Dec 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## master / unreleased

* [FEATURE] Fan out parallelizable queries to backend queriers concurrently.
* `-querier.sum-shards` (bool)
* Requires a shard-compatible schema (v10+)
* This causes the number of traces to increase accordingly.
* The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume.
* It's also advised to increase downstream concurrency controls as well:
* `querier.max-outstanding-requests-per-tenant`
* `querier.max-query-parallelism`
* `querier.max-concurrent`
* `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861

## 0.4.0 / 2019-12-02
Expand Down
22 changes: 22 additions & 0 deletions docs/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ The ingester query API was improved over time, but defaults to the old behaviour

## Query Frontend

- `-querier.sum-shards`

If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example:
`sum by (foo) (rate(bar{baz=”blip”}[1m]))` ->
```
sum by (foo) (
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or
...
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m]))
)
```
When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume.
It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes:

- `querier.max-outstanding-requests-per-tenant`
- `querier.max-query-parallelism`
- `querier.max-concurrent`
- `server.grpc-max-concurrent-streams`

Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well.

- `-querier.align-querier-with-step`

If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ require (
google.golang.org/api v0.11.0
google.golang.org/grpc v1.25.1
gopkg.in/yaml.v2 v2.2.2
sigs.k8s.io/yaml v1.1.0
)

// Override since git.apache.org is down. The docs say to fetch from github.
Expand Down
6 changes: 6 additions & 0 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
}

func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()

var lock sync.Mutex
var entries []IndexEntry
err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
Expand All @@ -459,6 +462,9 @@ func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery
}

func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "store.parseIndexEntries")
defer log.Span.Finish()

result := make([]string, 0, len(entries))
for _, entry := range entries {
chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value)
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
tbmConfig TableManagerConfig
schemaCfg = DefaultSchemaConfig("", schemaName, 0)
)
err := schemaCfg.Validate()
require.NoError(t, err)
flagext.DefaultValues(&tbmConfig)
storage := NewMockStorage()
tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil)
Expand Down
17 changes: 14 additions & 3 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/prometheus/promql"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -146,13 +147,13 @@ func (c *Fetcher) worker() {
// FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be
// lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks")
log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks")
defer log.Span.Finish()

// Now fetch the actual chunk data from Memcache / S3
cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys)

fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs)
fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs)
if err != nil {
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
}
Expand Down Expand Up @@ -199,12 +200,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {

// ProcessCacheResponse decodes the chunks coming back from the cache, separating
// hits and misses.
func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
var (
requests = make([]decodeRequest, 0, len(keys))
responses = make(chan decodeResponse)
missing []Chunk
)
log, ctx := spanlogger.New(ctx, "Fetcher.processCacheResponse")
defer log.Span.Finish()

i, j := 0, 0
for i < len(chunks) && j < len(keys) {
Expand All @@ -229,6 +232,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
for ; i < len(chunks); i++ {
missing = append(missing, chunks[i])
}
level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))

go func() {
for _, request := range requests {
Expand All @@ -252,3 +256,10 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
}
return found, missing, err
}

func injectShardLabels(chunks []Chunk, shard astmapper.ShardAnnotation) {
for i, chunk := range chunks {
chunk.Metric = append(chunk.Metric, shard.Label())
chunks[i] = chunk
}
}
63 changes: 53 additions & 10 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"fmt"
"strings"

"strconv"

jsoniter "github.com/json-iterator/go"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
Expand Down Expand Up @@ -46,6 +50,7 @@ type Schema interface {
GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery

// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
Expand Down Expand Up @@ -114,7 +119,7 @@ func (s schema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userI
key := strings.Join([]string{
bucket.tableName,
bucket.hashKey,
string(labelsSeriesID(labels)),
string(LabelsSeriesID(labels)),
},
"-",
)
Expand Down Expand Up @@ -216,6 +221,10 @@ func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string,
return result, nil
}

func (s schema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return s.entries.FilterReadQueries(queries, shard)
}

type entries interface {
GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
Expand All @@ -226,13 +235,23 @@ type entries interface {
GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery
}

// noops is a placeholder which can be embedded to provide default implementations
type noops struct{}

func (n noops) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// original entries:
// - hash key: <userid>:<bucket>:<metric name>
// - range key: <label name>\0<label value>\0<chunk name>

type originalEntries struct{}
type originalEntries struct {
noops
}

func (originalEntries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
chunkIDBytes := []byte(chunkID)
Expand Down Expand Up @@ -349,7 +368,9 @@ func (base64Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName st
// - range key: \0<base64(label value)>\0<chunk name>\0<version 2>
// 2) - hash key: <userid>:<hour bucket>:<metric name>
// - range key: \0\0<chunk name>\0<version 3>
type labelNameInHashKeyEntries struct{}
type labelNameInHashKeyEntries struct {
noops
}

func (labelNameInHashKeyEntries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
chunkIDBytes := []byte(chunkID)
Expand Down Expand Up @@ -423,7 +444,9 @@ func (labelNameInHashKeyEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]I
// v5 schema is an extension of v4, with the chunk end time in the
// range key to improve query latency. However, it did it wrong
// so the chunk end times are ignored.
type v5Entries struct{}
type v5Entries struct {
noops
}

func (v5Entries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
chunkIDBytes := []byte(chunkID)
Expand Down Expand Up @@ -496,7 +519,9 @@ func (v5Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error

// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
// moves label value out of range key (see #199).
type v6Entries struct{}
type v6Entries struct {
noops
}

func (v6Entries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
chunkIDBytes := []byte(chunkID)
Expand Down Expand Up @@ -576,14 +601,15 @@ func (v6Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error

// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct {
noops
}

func (v9Entries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return nil, ErrNotSupported
}

func (v9Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)
seriesID := LabelsSeriesID(labels)

entries := []IndexEntry{
// Entry for metricName -> seriesID
Expand Down Expand Up @@ -613,7 +639,7 @@ func (v9Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels l
}

func (v9Entries) GetChunkWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)
seriesID := LabelsSeriesID(labels)
encodedThroughBytes := encodeTime(bucket.through)

entries := []IndexEntry{
Expand Down Expand Up @@ -683,7 +709,7 @@ func (v10Entries) GetWriteEntries(bucket Bucket, metricName string, labels label
}

func (s v10Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)
seriesID := LabelsSeriesID(labels)

// read first 32 bits of the hash and use this to calculate the shard
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards
Expand Down Expand Up @@ -716,7 +742,7 @@ func (s v10Entries) GetLabelWriteEntries(bucket Bucket, metricName string, label
}

func (v10Entries) GetChunkWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)
seriesID := LabelsSeriesID(labels)
encodedThroughBytes := encodeTime(bucket.through)

entries := []IndexEntry{
Expand Down Expand Up @@ -782,13 +808,29 @@ func (v10Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, erro
return nil, ErrNotSupported
}

// FilterReadQueries will return only queries that match a certain shard
func (v10Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) (matches []IndexQuery) {
if shard == nil {
return queries
}

for _, query := range queries {
s := strings.Split(query.HashValue, ":")[0]
n, err := strconv.Atoi(s)
if err == nil && n == shard.Shard {
matches = append(matches, query)
}
}
return matches
}

// v11Entries builds on v10 but adds index entries for each series to store respective labels.
type v11Entries struct {
v10Entries
}

func (s v11Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)
seriesID := LabelsSeriesID(labels)

// read first 32 bits of the hash and use this to calculate the shard
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards
Expand Down Expand Up @@ -845,4 +887,5 @@ func (v11Entries) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]Inde
HashValue: string(seriesID),
},
}, nil

}
20 changes: 17 additions & 3 deletions pkg/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,29 @@ func (cfg *SchemaConfig) loadFromFile() error {
// Validate the schema config and returns an error if the validation
// doesn't pass
func (cfg *SchemaConfig) Validate() error {
for _, periodCfg := range cfg.Configs {
for i, periodCfg := range cfg.Configs {
if err := periodCfg.validate(); err != nil {
return err
}
}

// apply default row shards
if periodCfg.RowShards == 0 {
periodCfg.RowShards = defaultRowShards(periodCfg.Schema)
cfg.Configs[i] = periodCfg
}
}
return nil
}

func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
return 0
default:
return 16
}
}

// ForEachAfter will call f() on every entry after t, splitting
// entries if necessary so there is an entry starting at t
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) {
Expand All @@ -219,7 +233,7 @@ func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)

// CreateSchema returns the schema defined by the PeriodConfig
func (cfg PeriodConfig) CreateSchema() Schema {
rowShards := uint32(16)
rowShards := defaultRowShards(cfg.Schema)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}
Expand Down
Loading