Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Add store-gateway components #701

Merged
merged 52 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
d3f473d
WIP How to gather block metadata quickly from bucket
simonswine Feb 22, 2023
391a85a
Download metadata asynchronously
cyriltovena May 15, 2023
24cc7f5
Add Go module mimir/store-gateway dependency
cyriltovena May 16, 2023
776446c
Merge bucket listing branch
cyriltovena May 16, 2023
ffe1c80
Add store-gateway skeleton
cyriltovena May 16, 2023
eb17c54
Resolve conflicts in dependencies
cyriltovena May 16, 2023
babcfe0
Resolve lint issues
cyriltovena May 16, 2023
2f59938
Add users listing and fixes yaml dependency
cyriltovena May 16, 2023
ec177db
Fixes config files parsing and tsdb tests
cyriltovena May 16, 2023
d88fc0d
Hook into Pyroscope modules and disabled ip6
cyriltovena May 16, 2023
a9c545f
Fixes tenants listing
cyriltovena May 16, 2023
69de899
Adding initial code to list blocks
cyriltovena May 16, 2023
786c706
Implement the initial block listing without caching.
cyriltovena May 16, 2023
f465bb9
Speed up block listing
cyriltovena May 17, 2023
5c1f3a9
load from disk
cyriltovena May 17, 2023
e7c0016
Experiment with ristretto cache
cyriltovena May 17, 2023
d1f3cd4
fix block path
cyriltovena May 22, 2023
e9dd0bd
Revert dskit overrides now that the required update is merged
cyriltovena May 22, 2023
048acee
Merge remote-tracking branch 'origin/store-gateway-component' into st…
cyriltovena May 22, 2023
7926215
Fixes opening blocks from remote storage
cyriltovena May 22, 2023
0637dd1
Merge branch 'cache-store-gateway' into store-gateway-component
cyriltovena May 22, 2023
7ad32a0
Add the API skeleton for the storegateway
cyriltovena May 23, 2023
2b2e1a8
Implement the merge API in the query path
cyriltovena May 23, 2023
63d522a
Implement k-way merge of blocks and refactor sort iterator
cyriltovena May 23, 2023
2ef8e75
Refactor querier query replication
cyriltovena May 24, 2023
a9586a1
Initialize the store-gateway querier
cyriltovena May 25, 2023
738e295
Merge remote-tracking branch 'origin/main' into store-gateway-component
cyriltovena May 25, 2023
9f19111
Fixes the make generate target
cyriltovena May 25, 2023
e250daf
Merge commit '9f191111512ad0a7f45a042ca5d33dfe399bf2db' into store-ga…
cyriltovena May 25, 2023
b417de0
Regenerate docs
cyriltovena May 25, 2023
6f772fb
Fixes iterator tests
cyriltovena May 25, 2023
bb54d4d
Query the SelectMergeStacktraces API from Querier with store-gateway
cyriltovena May 25, 2023
189cf76
Fixes the store gateway querier initialization
cyriltovena May 26, 2023
aee9089
Fixes bucket sync on other providers
cyriltovena May 26, 2023
12a0986
Fixes wrong cancellation
cyriltovena May 26, 2023
91e53b2
Implement Select Series API
cyriltovena May 26, 2023
f02b165
Fixes lint issues.
cyriltovena May 27, 2023
4d17815
Merge remote-tracking branch 'origin/main' into store-gateway-component
cyriltovena May 30, 2023
cbfbb9d
Improve tracing instrumentation
cyriltovena May 30, 2023
b03dab8
generate help
cyriltovena May 30, 2023
1301f38
make fmt
cyriltovena May 30, 2023
23b896b
make helm
cyriltovena May 30, 2023
b7b32a0
Fixes deduplications in store-gateway
cyriltovena May 31, 2023
9c71811
Setting parquet read buffer size to 2MB
cyriltovena May 31, 2023
c4159ef
Merge remote-tracking branch 'origin/main' into store-gateway-component
cyriltovena May 31, 2023
4a3e4a6
make helm
cyriltovena May 31, 2023
888415b
Uses CloneVT from vtproto
cyriltovena May 31, 2023
355726b
Fixes objstore metrics
cyriltovena May 31, 2023
10649c3
make fmt
cyriltovena May 31, 2023
8e09dad
Fixes filesystem Bucket implementation
cyriltovena May 31, 2023
9036c3f
Merge remote-tracking branch 'origin/main' into store-gateway-component
cyriltovena Jun 1, 2023
a6bc302
Fixes the help text
cyriltovena Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement the merge API in the query path
  • Loading branch information
cyriltovena committed May 23, 2023
commit 2b2e1a83343a779538e0e5a854edc73b9869a483
53 changes: 43 additions & 10 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func (b *singleBlockQuerier) Close() error {
errs.Add(err)
}
}
b.opened = false
return errs.Err()
}

Expand Down Expand Up @@ -483,16 +484,32 @@ type Querier interface {
MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error)
MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error)
MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error)

Open(ctx context.Context) error
// Sorts profiles for retrieval.
Sort([]Profile) []Profile
}

type Queriers []Querier

func (queriers Queriers) Open(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(128)
for _, q := range queriers {
q := q
g.Go(func() error {
if err := q.Open(ctx); err != nil {
return err
}
return nil
})
}
return g.Wait()
}

func (queriers Queriers) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
iters := make([]iter.Iterator[Profile], 0, len(queriers))

// todo: if blocks overlap in time we should be deduping
// otherwise we can just append
for _, q := range queriers {
it, err := q.SelectMatchingProfiles(ctx, params)
if err != nil {
Expand All @@ -504,17 +521,19 @@ func (queriers Queriers) SelectMatchingProfiles(ctx context.Context, params *ing
return iter.NewSortProfileIterator(iters), nil
}

func (queriers Queriers) ForTimeRange(start, end model.Time) Queriers {
func (queriers Queriers) ForTimeRange(_ context.Context, start, end model.Time) (Queriers, error) {
result := make(Queriers, 0, len(queriers))
for _, q := range queriers {
if q.InRange(start, end) {
result = append(result, q)
}
}
return result
return result, nil
}

func (q Queriers) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error {
type BlockGetter func(ctx context.Context, start, end model.Time) (Queriers, error)

func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces")
defer sp.Finish()

Expand All @@ -537,7 +556,10 @@ func (q Queriers) MergeProfilesStacktraces(ctx context.Context, stream *connect.
otlog.String("profile_id", request.Type.ID),
)

queriers := q.ForTimeRange(model.Time(request.Start), model.Time(request.End))
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
return err
}

result := make([]*ingestv1.MergeProfilesStacktracesResult, 0, len(queriers))
var lock sync.Mutex
Expand Down Expand Up @@ -600,7 +622,7 @@ func (q Queriers) MergeProfilesStacktraces(ctx context.Context, stream *connect.
return nil
}

func (q Queriers) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error {
func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels")
defer sp.Finish()

Expand All @@ -626,7 +648,10 @@ func (q Queriers) MergeProfilesLabels(ctx context.Context, stream *connect.BidiS
otlog.String("by", strings.Join(by, ",")),
)

queriers := q.ForTimeRange(model.Time(request.Start), model.Time(request.End))
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
return err
}
result := make([][]*typesv1.Series, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
s := lo.Synchronize()
Expand Down Expand Up @@ -686,7 +711,7 @@ func (q Queriers) MergeProfilesLabels(ctx context.Context, stream *connect.BidiS
return nil
}

func (q Queriers) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error {
func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof")
defer sp.Finish()

Expand All @@ -709,7 +734,10 @@ func (q Queriers) MergeProfilesPprof(ctx context.Context, stream *connect.BidiSt
otlog.String("profile_id", request.Type.ID),
)

queriers := q.ForTimeRange(model.Time(request.Start), model.Time(request.End))
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
return err
}

result := make([]*profile.Profile, 0, len(queriers))
var lock sync.Mutex
Expand Down Expand Up @@ -1079,6 +1107,8 @@ func (r *parquetReader[M, P]) Close() error {
if r.reader != nil {
return r.reader.Close()
}
r.reader = nil
r.file = nil
return nil
}

Expand Down Expand Up @@ -1193,6 +1223,9 @@ func (r *inMemoryparquetReader[M, P]) Close() error {
if r.reader != nil {
return r.reader.Close()
}
r.reader = nil
r.file = nil
r.cache = nil
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/phlaredb/head_queriers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (q *headOnDiskQuerier) rowGroup() *rowGroupOnDisk {
return q.head.profiles.rowGroups[q.rowGroupIdx]
}

func (q *headOnDiskQuerier) Open(_ context.Context) error {
return nil
}

func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadOnDisk")
defer sp.Finish()
Expand Down Expand Up @@ -157,6 +161,10 @@ type headInMemoryQuerier struct {
head *Head
}

func (q *headInMemoryQuerier) Open(_ context.Context) error {
return nil
}

func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadInMemory")
defer sp.Finish()
Expand Down Expand Up @@ -184,7 +192,7 @@ func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params
continue
}

var profiles = make([]*schemav1.Profile, len(profileSeries.profiles))
profiles := make([]*schemav1.Profile, len(profileSeries.profiles))
copy(profiles, profileSeries.profiles)

iters = append(iters,
Expand Down Expand Up @@ -259,7 +267,6 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator
}

return q.head.resolvePprof(ctx, stacktraceSamples), nil

}

func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/phlaredb/phlaredb.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ func (f *PhlareDB) Queriers() Queriers {
}

func (f *PhlareDB) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error {
return f.Queriers().MergeProfilesStacktraces(ctx, stream)
return MergeProfilesStacktraces(ctx, stream, f.blockQuerier.Queriers().ForTimeRange)
}

func (f *PhlareDB) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error {
return f.Queriers().MergeProfilesLabels(ctx, stream)
return MergeProfilesLabels(ctx, stream, f.blockQuerier.Queriers().ForTimeRange)
}

func (f *PhlareDB) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error {
return f.Queriers().MergeProfilesPprof(ctx, stream)
return MergeProfilesPprof(ctx, stream, f.blockQuerier.Queriers().ForTimeRange)
}

type BidiServerMerge[Res any, Req any] interface {
Expand Down
12 changes: 12 additions & 0 deletions pkg/phlaredb/phlaredb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ type ingesterHandlerPhlareDB struct {
// *PhlareDB
}

func (i *ingesterHandlerPhlareDB) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error {
return MergeProfilesStacktraces(ctx, stream, i.ForTimeRange)
}

func (i *ingesterHandlerPhlareDB) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error {
return MergeProfilesLabels(ctx, stream, i.ForTimeRange)
}

func (i *ingesterHandlerPhlareDB) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error {
return MergeProfilesPprof(ctx, stream, i.ForTimeRange)
}

func (i *ingesterHandlerPhlareDB) Push(context.Context, *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
return nil, errors.New("not implemented")
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/storegateway/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package storegateway

import (
"context"
"os"
"path/filepath"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/phlare/pkg/phlaredb"
"github.com/grafana/phlare/pkg/phlaredb/block"
"github.com/grafana/phlare/pkg/util"
)

type BlockCloser interface {
phlaredb.Querier
Close() error
}

type Block struct {
BlockCloser
meta *block.Meta
logger log.Logger
}

func (bs *BucketStore) createBlock(ctx context.Context, meta *block.Meta) (*Block, error) {
blockLocalPath := filepath.Join(bs.syncDir, meta.ULID.String())
// add the dir if it doesn't exist
if _, err := os.Stat(blockLocalPath); errors.Is(err, os.ErrNotExist) {
if err := os.MkdirAll(bs.syncDir, 0o750); err != nil {
return nil, errors.Wrap(err, "create dir")
}
}
metaPath := filepath.Join(bs.syncDir, block.MetaFilename)
if _, err := os.Stat(metaPath); errors.Is(err, os.ErrNotExist) {
// add meta.json if it does not exist
if _, err := meta.WriteToFile(bs.logger, bs.syncDir); err != nil {
return nil, errors.Wrap(err, "write meta.json")
}
} else {
// read meta.json if it exists and validate it
if diskMeta, _, err := block.MetaFromDir(bs.syncDir); err != nil {
if meta.String() != diskMeta.String() {
return nil, errors.Wrap(err, "meta.json does not match")
}
return nil, errors.Wrap(err, "read meta.json")
}
}

blk := phlaredb.NewSingleBlockQuerierFromMeta(ctx, bs.bucket, meta)
// Load the block into memory if it's within the last 24 hours.
// Todo make this configurable
if blk.InRange(model.Now().Add(-24*time.Hour), model.Now()) {
go func() {
if err := blk.Open(ctx); err != nil {
level.Error(util.Logger).Log("msg", "open block", "err", err)
}
}()
}
return &Block{
meta: meta,
logger: bs.logger,
BlockCloser: blk,
}, nil
}
Loading