Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dedupe index on all the queries for a table instead of query batches #3338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca

level.Debug(log).Log("table-name", t.name, "query-count", len(queries))

id := shipper_util.NewIndexDeduper(callback)

for name, db := range t.dbs {
err := db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand All @@ -292,9 +290,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
}

for _, query := range queries {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, callback); err != nil {
return err
}
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c
lt.dbSnapshotsMtx.RLock()
defer lt.dbSnapshotsMtx.RUnlock()

id := shipper_util.NewIndexDeduper(callback)

for _, db := range lt.dbSnapshots {
err := db.boltdb.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand All @@ -199,9 +197,7 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c
}

for _, query := range queries {
if err := lt.boltdbIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
if err := lt.boltdbIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, callback); err != nil {
return err
}
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
errs := make(chan error)

id := NewIndexDeduper(callback)

for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
q := queries[i:util_math.Min(i+maxQueriesPerGoroutine, len(queries))]
go func(queries []chunk.IndexQuery) {
errs <- tableQuerier.MultiQueries(ctx, queries, callback)
errs <- tableQuerier.MultiQueries(ctx, queries, id.Callback)
}(q)
}

Expand All @@ -56,7 +58,7 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [
type IndexDeduper struct {
callback chunk_util.Callback
seenRangeValues map[string]map[string]struct{}
mtx sync.Mutex
mtx sync.RWMutex
}

func NewIndexDeduper(callback chunk_util.Callback) *IndexDeduper {
Expand All @@ -75,19 +77,32 @@ func (i *IndexDeduper) Callback(query chunk.IndexQuery, batch chunk.ReadBatch) b
}

func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool {
i.mtx.Lock()
defer i.mtx.Unlock()
i.mtx.RLock()

// index entries are never modified during query processing so it should be safe to reference a byte slice as a string.
rangeValueStr := yoloString(rangeValue)
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}

if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
i.mtx.RUnlock()
return true
}

i.mtx.RUnlock()

i.mtx.Lock()
defer i.mtx.Unlock()

// re-check if another concurrent call added the values already, if so do not add it again and return true
if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
return true
}

// add the hashValue first if missing
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}
}

// add the rangeValue
i.seenRangeValues[hashValue][rangeValueStr] = struct{}{}
return false
}
Expand Down