Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsdb/inverted index #6233

Merged
merged 3 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions pkg/ingester/index/bitprefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package index

import (
"fmt"
"sort"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// BitPrefixInvertedIndex is another inverted index implementation
// that uses the bit prefix sharding algorithm in tsdb/index/shard.go
// instead of a modulo approach.
// This is the standard for TSDB compatibility because
// the same series must resolve to the same shard (for each period config),
// whether it's resolved on the ingester or via the store.
type BitPrefixInvertedIndex struct {
totalShards uint32
shards []*indexShard
}

func NewBitPrefixWithShards(totalShards uint32) (*BitPrefixInvertedIndex, error) {
if requiredBits := index.NewShard(0, totalShards).RequiredBits(); 1<<requiredBits != totalShards {
return nil, fmt.Errorf("Shard factor must be a power of two, got %d", totalShards)
}

shards := make([]*indexShard, totalShards)
for i := uint32(0); i < totalShards; i++ {
shards[i] = &indexShard{
idx: map[string]indexEntry{},
shard: i,
}
}
return &BitPrefixInvertedIndex{
totalShards: totalShards,
shards: shards,
}, nil
}

func (ii *BitPrefixInvertedIndex) getShards(shard *astmapper.ShardAnnotation) ([]*indexShard, bool) {
if shard == nil {
return ii.shards, false
}

// When comparing a higher shard factor to a lower inverted index shard factor
// we must filter resulting fingerprints as the the lower shard factor in the
// inverted index is a superset of the requested factor.
//
// For instance, the 3_of_4 shard factor maps to the bit prefix 0b11.
// If the inverted index only has a factor of 2, we'll need to check the 0b1
// prefixed shard (which contains the 0b10 and 0b11 prefixes).
// Conversely, if the requested shard is 1_of_2, but the index has a factor of 4,
// we can _exactly_ match ob1 => (ob10, ob11) and know all fingerprints in those
// resulting shards have the requested ob1 prefix (don't need to filter).
var filter bool
if shard.Of > len(ii.shards) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between len(ii.shards) and ii.totalShards? Since it seems that the length of ii.shards does not change, we could use ii.totalShards instead.

Suggested change
if shard.Of > len(ii.shards) {
if shard.Of > ii.totalShards {

filter = true
}

requestedShard := shard.TSDB()
minFp, maxFp := requestedShard.Bounds()

// Determine how many bits we need to take from
// the requested shard's min/max fingerprint values
// in order to calculate the indices for the inverted index's
// shard factor.
requiredBits := index.NewShard(0, uint32(len(ii.shards))).RequiredBits()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for index.NewShard(), why not index.NewShard(0, ii.totalShards)?

lowerIdx := int(minFp >> (64 - requiredBits))
upperIdx := int(maxFp >> (64 - requiredBits))

// If the upper bound's shard doesn't align exactly
// with the maximum fingerprint, we must also
// check the subsequent shard.
// This happens in two cases:
// 1) When requesting the last shard of any factor.
// This accounts for zero indexing in our sharding logic
// to successfully request `shards[start:len(shards)]`
// 2) When requesting the _first_ shard of a larger factor
// than the index uses. In this case, the required_bits are not
// enough and the requested end prefix gets trimmed.
// If confused, comment out this line and see which tests fail.
if (upperIdx << (64 - requiredBits)) != int(maxFp) {
upperIdx++
}

return ii.shards[lowerIdx:upperIdx], filter
}

func (ii *BitPrefixInvertedIndex) shardForFP(fp model.Fingerprint) int {
localShard := index.NewShard(0, uint32(len(ii.shards)))
return int(fp >> (64 - localShard.RequiredBits()))
}

func (ii *BitPrefixInvertedIndex) validateShard(shard *astmapper.ShardAnnotation) error {
if shard == nil {
return nil
}

if 1<<(shard.TSDB().RequiredBits()) != shard.Of {
return fmt.Errorf("Shard factor must be a power of two, got %d", shard.Of)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error strings should not be capitalized

Suggested change
return fmt.Errorf("Shard factor must be a power of two, got %d", shard.Of)
return fmt.Errorf("shard factor must be a power of two, got %d", shard.Of)

}
return nil
}

// Add a fingerprint under the specified labels.
// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (ii *BitPrefixInvertedIndex) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) labels.Labels {
// add() returns 'interned' values so the original labels are not retained
return ii.shards[ii.shardForFP(fp)].add(labels, fp)
}

// Lookup all fingerprints for the provided matchers.
func (ii *BitPrefixInvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
if err := ii.validateShard(shard); err != nil {
return nil, err
}

var result []model.Fingerprint
shards, filter := ii.getShards(shard)

// if no matcher is specified, all fingerprints would be returned
if len(matchers) == 0 {
for i := range shards {
fps := shards[i].allFPs()
result = append(result, fps...)
}
} else {
for i := range shards {
fps := shards[i].lookup(matchers)
result = append(result, fps...)
}
}
Comment on lines +127 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this extra logic for no matchers be abstracted into the lookup() function of indexShard?
I saw that we do the same logic in the Lookup() function of InvertedIndex as well.


// Because bit prefix order is also ascending order,
// the merged fingerprints from ascending shards are also in order.
if filter {
minFP, maxFP := shard.TSDB().Bounds()
minIdx := sort.Search(len(result), func(i int) bool {
return result[i] >= minFP
})

maxIdx := sort.Search(len(result), func(i int) bool {
return result[i] >= maxFP
})

result = result[minIdx:maxIdx]
}

return result, nil
}

// LabelNames returns all label names.
func (ii *BitPrefixInvertedIndex) LabelNames(shard *astmapper.ShardAnnotation) ([]string, error) {
if err := ii.validateShard(shard); err != nil {
return nil, err
}

var extractor func(unlockIndex) []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about extracting a type for this function?

shards, filter := ii.getShards(shard)

// If we need to check shard inclusion, we have to do it the expensive way :(
// Therefore it's more performant to request shard factors lower or equal to the
// inverted index factor
if filter {
s := shard.TSDB()

extractor = func(x unlockIndex) (results []string) {

outer:
for name, entry := range x {
for _, valEntry := range entry.fps {
for _, fp := range valEntry.fps {
if s.Match(fp) {
results = append(results, name)
continue outer
}
}
}
}

return results
}
}

results := make([][]string, 0, len(shards))
for i := range shards {
shardResult := shards[i].labelNames(extractor)
results = append(results, shardResult)
}

return mergeStringSlices(results), nil
}

// LabelValues returns the values for the given label.
func (ii *BitPrefixInvertedIndex) LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error) {
if err := ii.validateShard(shard); err != nil {
return nil, err
}

var extractor func(indexEntry) []string
shards, filter := ii.getShards(shard)
if filter {
s := shard.TSDB()

extractor = func(x indexEntry) []string {
results := make([]string, 0, len(x.fps))

outer:
for val, valEntry := range x.fps {
for _, fp := range valEntry.fps {
if s.Match(fp) {
results = append(results, val)
continue outer
}
}
}
return results
}
}
results := make([][]string, 0, len(shards))

for i := range shards {
shardResult := shards[i].labelValues(name, extractor)
results = append(results, shardResult)
}

return mergeStringSlices(results), nil
}

// Delete a fingerprint with the given label pairs.
func (ii *BitPrefixInvertedIndex) Delete(labels labels.Labels, fp model.Fingerprint) {
localShard := index.NewShard(0, uint32(len(ii.shards)))
idx := int(fp >> (64 - localShard.RequiredBits()))
ii.shards[idx].delete(labels, fp)
}
Loading