Skip to content

Commit

Permalink
loads previously created TSDBs into shipper on startup (#6117)
Browse files Browse the repository at this point in the history
* loads previously created TSDBs into shipper on startup

* noopTSDBManager impl

* info logs for initial tsdb loading
  • Loading branch information
owen-d authored May 6, 2022
1 parent 756fc19 commit 729bf3f
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 5 deletions.
18 changes: 13 additions & 5 deletions pkg/storage/stores/indexshipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,20 @@ func (lt *table) ForEach(userID string, callback index.ForEachIndexCallback) err
lt.indexSetMtx.RLock()
defer lt.indexSetMtx.RUnlock()

idxSet, ok := lt.indexSet[userID]
if !ok {
return nil
}
// TODO(owen-d): refactor? Uploads mgr never has user indices,
// only common (multitenant) ones.
// iterate through both user and common index
for _, uid := range []string{userID, ""} {
idxSet, ok := lt.indexSet[uid]
if !ok {
continue
}

return idxSet.ForEach(callback)
if err := idxSet.ForEach(callback); err != nil {
return err
}
}
return nil
}

// Upload uploads the index to object storage.
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/stores/tsdb/head_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,16 @@ func (m *HeadManager) Start() error {
if err != nil {
return err
}
level.Info(m.log).Log("msg", "loaded wals by period", "groups", len(walsByPeriod))

m.activeHeads = newTenantHeads(now, m.shards, m.metrics, m.log)

// Load the shipper with any previously built TSDBs
if err := m.tsdbManager.Start(); err != nil {
return errors.Wrap(err, "failed to start tsdb manager")
}

// Build any old WALs into TSDBs for the shipper
for _, group := range walsByPeriod {
if group.period < curPeriod {
if err := m.tsdbManager.BuildFromWALs(
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/stores/tsdb/head_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
type noopTSDBManager struct{ NoopIndex }

func (noopTSDBManager) BuildFromWALs(_ time.Time, _ []WALIdentifier) error { return nil }
func (noopTSDBManager) Start() error { return nil }

func chunkMetasToChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []ChunkRef) {
for _, x := range xs {
Expand Down
84 changes: 84 additions & 0 deletions pkg/storage/stores/tsdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tsdb
import (
"context"
"fmt"
"io/ioutil"
"math"
"path/filepath"
"strconv"
"sync"
"time"

Expand All @@ -24,6 +26,7 @@ import (
// TSDBManager wraps the index shipper and writes/manages
// TSDB files on disk
type TSDBManager interface {
Start() error
Index
// Builds a new TSDB file from a set of WALs
BuildFromWALs(time.Time, []WALIdentifier) error
Expand Down Expand Up @@ -68,6 +71,87 @@ func NewTSDBManager(
}
}

func (m *tsdbManager) Start() (err error) {
var (
buckets, indices, loadingErrors int
)

defer func() {
level.Info(m.log).Log(
"msg", "loaded leftover local indices",
"err", err,
"successful", err == nil,
"buckets", buckets,
"indices", indices,
"failures", loadingErrors,
)
}()

// load list of multitenant tsdbs
mulitenantDir := managerMultitenantDir(m.dir)
files, err := ioutil.ReadDir(mulitenantDir)
if err != nil {
return err
}

for _, f := range files {
if !f.IsDir() {
continue
}

bucket, err := strconv.Atoi(f.Name())
if err != nil {
level.Warn(m.log).Log(
"msg", "failed to parse bucket in multitenant dir ",
"err", err.Error(),
)
continue
}
buckets++

tsdbs, err := ioutil.ReadDir(filepath.Join(mulitenantDir, f.Name()))
if err != nil {
level.Warn(m.log).Log(
"msg", "failed to open period bucket dir",
"bucket", bucket,
"err", err.Error(),
)
continue
}

for _, db := range tsdbs {
id, ok := parseMultitenantTSDBPath(db.Name())
if !ok {
continue
}
indices++

prefixed := newPrefixedIdentifier(id, filepath.Join(mulitenantDir, f.Name()), "")
loaded, err := NewShippableTSDBFile(
prefixed,
false,
)

if err != nil {
level.Warn(m.log).Log(
"msg", "",
"tsdbPath", prefixed.Path(),
"err", err.Error(),
)
loadingErrors++
}

if err := m.shipper.AddIndex(f.Name(), "", loaded); err != nil {
loadingErrors++
return err
}
}

}

return nil
}

func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) {
level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t)
// get relevant wals
Expand Down

0 comments on commit 729bf3f

Please sign in to comment.