Skip to content

Commit

Permalink
dedupe index on all the queries for a table instead of query batches (#…
Browse files Browse the repository at this point in the history
…3338)

* dedupe index on all the queries for a table instead query batches

* convert sync.Mutex to sync.RWMutex in deduper to allow concurrent checks for isSeen
  • Loading branch information
sandeepsukhani authored Feb 16, 2021
1 parent 9e33f8f commit 3ff0881
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
6 changes: 1 addition & 5 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca

level.Debug(log).Log("table-name", t.name, "query-count", len(queries))

id := shipper_util.NewIndexDeduper(callback)

for name, db := range t.dbs {
err := db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand All @@ -292,9 +290,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
}

for _, query := range queries {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, callback); err != nil {
return err
}
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c
lt.dbSnapshotsMtx.RLock()
defer lt.dbSnapshotsMtx.RUnlock()

id := shipper_util.NewIndexDeduper(callback)

for _, db := range lt.dbSnapshots {
err := db.boltdb.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand All @@ -199,9 +197,7 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c
}

for _, query := range queries {
if err := lt.boltdbIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
if err := lt.boltdbIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, callback); err != nil {
return err
}
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
errs := make(chan error)

id := NewIndexDeduper(callback)

for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
q := queries[i:util_math.Min(i+maxQueriesPerGoroutine, len(queries))]
go func(queries []chunk.IndexQuery) {
errs <- tableQuerier.MultiQueries(ctx, queries, callback)
errs <- tableQuerier.MultiQueries(ctx, queries, id.Callback)
}(q)
}

Expand All @@ -56,7 +58,7 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [
type IndexDeduper struct {
callback chunk_util.Callback
seenRangeValues map[string]map[string]struct{}
mtx sync.Mutex
mtx sync.RWMutex
}

func NewIndexDeduper(callback chunk_util.Callback) *IndexDeduper {
Expand All @@ -75,19 +77,32 @@ func (i *IndexDeduper) Callback(query chunk.IndexQuery, batch chunk.ReadBatch) b
}

func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool {
i.mtx.Lock()
defer i.mtx.Unlock()
i.mtx.RLock()

// index entries are never modified during query processing so it should be safe to reference a byte slice as a string.
rangeValueStr := yoloString(rangeValue)
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}

if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
i.mtx.RUnlock()
return true
}

i.mtx.RUnlock()

i.mtx.Lock()
defer i.mtx.Unlock()

// re-check if another concurrent call added the values already, if so do not add it again and return true
if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
return true
}

// add the hashValue first if missing
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}
}

// add the rangeValue
i.seenRangeValues[hashValue][rangeValueStr] = struct{}{}
return false
}
Expand Down

0 comments on commit 3ff0881

Please sign in to comment.