Skip to content

Commit

Permalink
tsdb migrate versions (grafana#9275)
Browse files Browse the repository at this point in the history
Update of grafana#9198 while
@sandeepsukhani is on holiday :)

---------

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
owen-d and sandeepsukhani authored Apr 25, 2023
1 parent 03531f5 commit 08af884
Show file tree
Hide file tree
Showing 10 changed files with 543 additions and 13 deletions.
35 changes: 32 additions & 3 deletions pkg/storage/stores/tsdb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,27 @@ func (b *Builder) DropChunk(streamID string, chk index.ChunkMeta) (bool, error)
return chunkFound, nil
}

func (b *Builder) BuildWithVersion(
ctx context.Context,
version int, // build TSDB with specified version. 0 means default version.
scratchDir string,
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, err error) {
return b.buildWithVersion(ctx, version, scratchDir, createFn)
}

func (b *Builder) Build(
ctx context.Context,
scratchDir string,
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, err error) {
return b.buildWithVersion(ctx, 0, scratchDir, createFn)
}

func (b *Builder) buildWithVersion(
ctx context.Context,
version int, // build TSDB with specified version. 0 means default version.
scratchDir string,
// Determines how to create the resulting Identifier and file name.
// This is variable as we use Builder for multiple reasons,
// such as building multi-tenant tsdbs on the ingester
Expand All @@ -107,9 +125,20 @@ func (b *Builder) Build(
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)

writer, err := index.NewWriter(ctx, tmpPath)
if err != nil {
return id, err
var writer *index.Writer

if version == 0 {
var err error
writer, err = index.NewWriter(ctx, tmpPath)
if err != nil {
return id, err
}
} else {
var err error
writer, err = index.NewWriterWithVersion(ctx, version, tmpPath)
if err != nil {
return id, err
}
}
// TODO(owen-d): multithread

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (c *compactedIndex) ToIndexFile() (index_shipper.Index, error) {
Through: through,
Checksum: checksum,
}
return newPrefixedIdentifier(id, c.workingDir, "")
return NewPrefixedIdentifier(id, c.workingDir, "")
})
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/tsdb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func setupMultiTenantIndex(t *testing.T, userStreams map[string][]stream, destDi
}
}

dst := newPrefixedIdentifier(
dst := NewPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: "test",
ts: ts,
Expand Down Expand Up @@ -174,7 +174,7 @@ func setupPerTenantIndex(t *testing.T, streams []stream, destDir string, ts time
Through: through,
Checksum: checksum,
}
return newPrefixedIdentifier(id, destDir, "")
return NewPrefixedIdentifier(id, destDir, "")
},
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/tsdb/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func identifierFromPath(p string) (Identifier, error) {
// try parsing as single tenant since the filename is more deterministic without an arbitrary nodename for uploader
id, ok := parseSingleTenantTSDBPath(p)
if ok {
return newPrefixedIdentifier(id, filepath.Dir(p), ""), nil
return NewPrefixedIdentifier(id, filepath.Dir(p), ""), nil
}

multiID, ok := parseMultitenantTSDBPath(p)
Expand All @@ -34,10 +34,10 @@ func identifierFromPath(p string) (Identifier, error) {
}

parent := filepath.Dir(p)
return newPrefixedIdentifier(multiID, parent, ""), nil
return NewPrefixedIdentifier(multiID, parent, ""), nil
}

func newPrefixedIdentifier(id Identifier, path, name string) prefixedIdentifier {
func NewPrefixedIdentifier(id Identifier, path, name string) Identifier {
return prefixedIdentifier{
Identifier: id,
parentPath: path,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/tsdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (m *tsdbManager) Start() (err error) {
}
indices++

prefixed := newPrefixedIdentifier(id, filepath.Join(mulitenantDir, bucket), "")
prefixed := NewPrefixedIdentifier(id, filepath.Join(mulitenantDir, bucket), "")
loaded, err := NewShippableTSDBFile(prefixed)

if err != nil {
Expand Down Expand Up @@ -196,7 +196,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, shipper indexshipper.Ind

for p, b := range periods {
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := newPrefixedIdentifier(
dst := NewPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: heads.start,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestQueryIndex(t *testing.T) {
Through: through,
Checksum: checksum,
}
return newPrefixedIdentifier(id, dir, dir)
return NewPrefixedIdentifier(id, dir, dir)
})
require.Nil(t, err)

Expand Down
50 changes: 50 additions & 0 deletions pkg/storage/stores/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ package tsdb

import (
"context"
"errors"
"io"
"math"
"path/filepath"
"time"

"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/storage/chunk"
index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
)

var ErrAlreadyOnDesiredVersion = errors.New("tsdb file already on desired version")

// GetRawFileReaderFunc returns an io.ReadSeeker for reading raw tsdb file from disk
type GetRawFileReaderFunc func() (io.ReadSeeker, error)

Expand All @@ -25,6 +32,49 @@ func OpenShippableTSDB(p string) (index_shipper.Index, error) {
return NewShippableTSDBFile(id)
}

func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index_shipper.Index, error) {
indexFile, err := OpenShippableTSDB(path)
if err != nil {
return nil, err
}

defer func() {
if err := indexFile.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close index file", "err", err)
}
}()

currVer := indexFile.(*TSDBFile).Index.(*TSDBIndex).reader.(*index.Reader).Version()
if currVer == desiredVer {
return nil, ErrAlreadyOnDesiredVersion
}

builder := NewBuilder()
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
builder.AddSeries(lbls.Copy(), fp, chks)
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
if err != nil {
return nil, err
}

parentDir := filepath.Dir(path)

id, err := builder.BuildWithVersion(ctx, desiredVer, parentDir, func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
TS: time.Now(),
From: from,
Through: through,
Checksum: checksum,
}
return NewPrefixedIdentifier(id, parentDir, "")
})

if err != nil {
return nil, err
}
return NewShippableTSDBFile(id)
}

// nolint
// TSDBFile is backed by an actual file and implements the indexshipper/index.Index interface
type TSDBFile struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func BuildIndex(t testing.TB, dir string, cases []LoadableSeries) *TSDBFile {
Through: through,
Checksum: checksum,
}
return newPrefixedIdentifier(id, dir, dir)
return NewPrefixedIdentifier(id, dir, dir)
})
require.Nil(t, err)

Expand Down
Loading

0 comments on commit 08af884

Please sign in to comment.