Skip to content

Commit

Permalink
TSDB shipper + WAL (grafana#6049)
Browse files Browse the repository at this point in the history
* begins speccing out TSDB Head

* auto incrementing series ref + mempostings

* mintime/maxtime methods

* tsdb head IndexReader impl

* head correctly populates ref lookup

* tsdb head tests

* adds prometheus license to tsdb head

* linting

* [WIP] speccing out tsdb head wal

* fix length check and adds tsdb wal encoding tests

* exposes wal structs & removes closed semantics

* logs start time in the tsdb wal

* wal interface + testing

* exports walrecord + returns ref when appending

* specs out head manager

* tsdb head manager wal initialization

* tsdb wal rotation

* wals dont use node name, but tsdb files do

* cleans up fn signature

* multi tsdb idx now just wraps Index interfaces

* no longer sorts indices when creating multi-idx

* tenantHeads & HeadManger index impls

* head mgr tests

* bugfixes & head manager tests

* tsdb dir selection now helper fns

* period utility

* pulls out more code to helpers, fixes some var races

* head recovery is more generic

* tsdb manager builds from wals

* pulls more helpers out of headmanager

* lockedIdx, Close() on idx, tsdbManager update

* removes mmap from index reader implementation

* tsdb file

* adds tsdb shipper config and refactors initStore

* removes unused tsdbManager code

* implements stores.Index and stores.ChunkWriter for tsdb

* chunk.Data now supports an Entries() method

* moves walreader to new util/wal pkg to avoid circular dep + tsdb storage alignment

* tsdb store

* passes indexWriter to chunkWriter

* build a tsdb per index bucket in according with shipper conventions

* dont open tsdb files until necessary for indexshipper

* tsdbManager Index impl

* tsdb defaults + initStore fix for invalid looping

* fixes UsingTSDB helper

* disables deleteRequestStore when using TSDB

* pass limits to tsdb store

* always start headmanager for tsdb
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes copy bug
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* more logging
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes duplicate tenant label bug
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* debug logs, uses label builder, removes __name__=logs for tsdb
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* tsdb fixes labels at earlier pt
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* account for setting tenant label in head manager test

* changing tsdb dir names

* identifier interface, builder to tsdb pkg

* tsdb version path prefix

* fixes buildfromwals identifier

* fixes tsdb shipper paths

* split buckets once per user set

* refactors combining single and multi tenant tsdb indices on shipper reads

* indexshipper ignores old gzip logic

* method name refactor

* remove unused record type

* removes v1 prefix in tsdb paths and refactores indices method

* ignores double optimization in tsdb looking for multitenant idx, shipper handles this

* removes 5-ln requirement on shipper tablename regexp

* groups identifiers, begins removing multitenant prefix in shipped files

* passses open fn to indexshipper

* exposes RealByteSlice

* TSDBFile no longer needs a file descriptor, parses gzip extensions

* method signature fixing

* stop masquerading as compressed indices post-download in indexshipper

* variable bucket regexp

* removes accidental configs committed

* label matcher handling for multitenancy and metricname in tsdb

* explicitly require fingerprint when creating tsdb index

* only add tenant label when creating multitenant tsdb

write fingerprints without synthetic tenant label

strip out tenant labels from queries

* linting + unused removal

* more linting :(

* goimports

* removes uploadername from indexshipper

* maxuint32 for arm32 builds

* tsdb chunk filterer support

* always set ingester name when using object storage index

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
owen-d and sandeepsukhani authored May 5, 2022
1 parent 03153e8 commit b45efd4
Showing 49 changed files with 2,926 additions and 297 deletions.
11 changes: 10 additions & 1 deletion pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
@@ -73,7 +73,9 @@ func (f Facade) Utilization() float64 {
return f.c.Utilization()
}

// Size implements encoding.Chunk.
// Size implements encoding.Chunk, which unfortunately uses
// the Size method to refer to the byte size and not the entry count
// like chunkenc.Chunk does.
func (f Facade) Size() int {
if f.c == nil {
return 0
@@ -82,6 +84,13 @@ func (f Facade) Size() int {
return f.c.CompressedSize()
}

func (f Facade) Entries() int {
if f.c == nil {
return 0
}
return f.c.Size()
}

// LokiChunk returns the chunkenc.Chunk.
func (f Facade) LokiChunk() Chunk {
return f.c
3 changes: 2 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/pkg/util"
errUtil "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/wal"
"github.com/grafana/loki/pkg/validation"
)

@@ -420,7 +421,7 @@ func (i *Ingester) starting(ctx context.Context) error {
)

level.Info(util_log.Logger).Log("msg", "recovering from WAL")
segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1)
segmentReader, segmentCloser, err := wal.NewWalReader(i.cfg.WAL.Dir, -1)
if err != nil {
return err
}
34 changes: 0 additions & 34 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
@@ -30,40 +30,6 @@ func (NoopWALReader) Err() error { return nil }
func (NoopWALReader) Record() []byte { return nil }
func (NoopWALReader) Close() error { return nil }

// If startSegment is <0, it means all the segments.
func newWalReader(dir string, startSegment int) (*wal.Reader, io.Closer, error) {
var (
segmentReader io.ReadCloser
err error
)
if startSegment < 0 {
segmentReader, err = wal.NewSegmentsReader(dir)
if err != nil {
return nil, nil, err
}
} else {
first, last, err := wal.Segments(dir)
if err != nil {
return nil, nil, err
}
if startSegment > last {
return nil, nil, errors.New("start segment is beyond the last WAL segment")
}
if first > startSegment {
startSegment = first
}
segmentReader, err = wal.NewSegmentsRangeReader(wal.SegmentRange{
Dir: dir,
First: startSegment,
Last: -1, // Till the end.
})
if err != nil {
return nil, nil, err
}
}
return wal.NewReader(segmentReader), segmentReader, nil
}

func newCheckpointReader(dir string) (WALReader, io.Closer, error) {
lastCheckpointDir, idx, err := lastCheckpoint(dir)
if err != nil {
29 changes: 29 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
@@ -103,6 +103,10 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
betterBoltdbShipperDefaults(r, &defaults)
}

if len(r.SchemaConfig.Configs) > 0 && config.UsingTSDB(r.SchemaConfig.Configs) {
betterTSDBShipperDefaults(r, &defaults)
}

applyFIFOCacheConfig(r)
applyIngesterFinalSleep(r)
applyIngesterReplicationFactor(r)
@@ -497,6 +501,31 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper) {
}
}

func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper) {
currentSchemaIdx := config.ActivePeriodConfig(cfg.SchemaConfig.Configs)
currentSchema := cfg.SchemaConfig.Configs[currentSchemaIdx]

if cfg.StorageConfig.TSDBShipperConfig.SharedStoreType == defaults.StorageConfig.TSDBShipperConfig.SharedStoreType {
cfg.StorageConfig.TSDBShipperConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.Common.PathPrefix != "" {
prefix := strings.TrimSuffix(cfg.Common.PathPrefix, "/")

if cfg.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory == "" {
cfg.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory = fmt.Sprintf("%s/tsdb-shipper-active", prefix)
}

if cfg.StorageConfig.TSDBShipperConfig.CacheLocation == "" {
cfg.StorageConfig.TSDBShipperConfig.CacheLocation = fmt.Sprintf("%s/tsdb-shipper-cache", prefix)
}
}
}

// applyFIFOCacheConfig turns on FIFO cache for the chunk store and for the query range results,
// but only if no other cache storage is configured (redis or memcache).
//
111 changes: 79 additions & 32 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cache"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
@@ -382,19 +383,25 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
// Always set these configs
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}

if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
// Set configs pertaining to object storage based indices
if config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID

switch true {
case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory, this also significantly helps performance.
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
@@ -412,22 +419,53 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// have query gaps on chunks flushed after an index entry is cached by keeping them retained in the ingester
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg)

// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)

t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
default:
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg)
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadWrite
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

}
}

t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing
if config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
var asyncStore bool

shipperConfigIdx := config.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
iTy := t.Cfg.SchemaConfig.Configs[shipperConfigIdx].IndexType
if iTy != config.BoltDBShipperType && iTy != config.TSDBType {
shipperConfigIdx++
}

// TODO(owen-d): make helper more agnostic between boltdb|tsdb
var resyncInterval time.Duration
switch t.Cfg.SchemaConfig.Configs[shipperConfigIdx].IndexType {
case config.BoltDBShipperType:
resyncInterval = t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval
case config.TSDBType:
resyncInterval = t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval
}

minIngesterQueryStoreDuration := shipperMinIngesterQueryStoreDuration(
t.Cfg.Ingester.MaxChunkAge,
shipperQuerierIndexUpdateDelay(
t.Cfg.StorageConfig.IndexCacheValidity,
resyncInterval,
),
)

var asyncStore bool
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
@@ -439,30 +477,34 @@ func (t *Loki) initStore() (_ services.Service, err error) {
asyncStore = true
case t.Cfg.isModuleEnabled(IndexGateway):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
t.Cfg.Ingester.QueryStore = true
boltdbShipperConfigIdx := config.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != config.BoltDBShipperType {
boltdbShipperConfigIdx++
}
mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
boltdbShipperMinIngesterQueryStoreDuration)

mlb, err := calculateMaxLookBack(
t.Cfg.SchemaConfig.Configs[shipperConfigIdx],
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
minIngesterQueryStoreDuration,
)
if err != nil {
return nil, err
}
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}
}

if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)),
if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(
t.Cfg.Querier.QueryIngestersWithin,
minIngesterQueryStoreDuration,
),
}
}
}

@@ -908,6 +950,11 @@ func (t *Loki) initUsageReport() (services.Service, error) {
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
@@ -954,24 +1001,24 @@ func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDura
return queryIngestersWithinConfig
}

// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// shipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// It considers upto 3 sync attempts for the indexgateway/queries to be successful in syncing the files to factor in worst case scenarios like
// failures in sync, low download throughput, various kinds of caches in between etc. which can delay the sync operation from getting all the updates from the storage.
// It also considers index cache validity because a querier could have cached index just before it was going to resync which means
// it would keep serving index until the cache entries expire.
func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration {
return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval*3
func shipperQuerierIndexUpdateDelay(cacheValidity, resyncInterval time.Duration) time.Duration {
return cacheValidity + resyncInterval*3
}

// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func boltdbShipperIngesterIndexUploadDelay() time.Duration {
// shipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func shipperIngesterIndexUploadDelay() time.Duration {
return uploads.ShardDBsByDuration + shipper.UploadInterval
}

// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid queriers from missing any logs or chunk ids due to async nature of BoltDB Shipper.
func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration {
return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 5*time.Minute
// shipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid missing any logs or chunk ids due to async nature of shipper.
func shipperMinIngesterQueryStoreDuration(maxChunkAge, querierUpdateDelay time.Duration) time.Duration {
return maxChunkAge + shipperIngesterIndexUploadDelay() + querierUpdateDelay + 5*time.Minute
}

// NewServerService constructs service from Server component.
4 changes: 4 additions & 0 deletions pkg/storage/chunk/bigchunk.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,10 @@ func newBigchunk() *bigchunk {
return &bigchunk{}
}

// TODO(owen-d): remove bigchunk from our code, we don't use it.
// Hack an Entries() impl
func (b *bigchunk) Entries() int { return 0 }

func (b *bigchunk) Add(sample model.SamplePair) (Data, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
2 changes: 2 additions & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
@@ -52,6 +52,8 @@ type Data interface {
Rebound(start, end model.Time, filter filter.Func) (Data, error)
// Size returns the approximate length of the chunk in bytes.
Size() int
// Entries returns the number of entries in a chunk
Entries() int
Utilization() float64
}

39 changes: 35 additions & 4 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ const (
StorageTypeSwift = "swift"
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"
TSDBType = "tsdb"
)

var (
@@ -184,17 +185,47 @@ func ActivePeriodConfig(configs []PeriodConfig) int {
return i
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(configs []PeriodConfig) bool {
func usingForPeriodConfigs(configs []PeriodConfig, fn func(PeriodConfig) bool) bool {
activePCIndex := ActivePeriodConfig(configs)
if configs[activePCIndex].IndexType == BoltDBShipperType ||
(len(configs)-1 > activePCIndex && configs[activePCIndex+1].IndexType == BoltDBShipperType) {

if fn(configs[activePCIndex]) ||
(len(configs)-1 > activePCIndex && fn(configs[activePCIndex+1])) {
return true
}

return false
}

func UsingObjectStorageIndex(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
switch cfg.IndexType {
case BoltDBShipperType, TSDBType:
return true
default:
return false
}
}

return usingForPeriodConfigs(configs, fn)
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
return cfg.IndexType == BoltDBShipperType
}

return usingForPeriodConfigs(configs, fn)
}

func UsingTSDB(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
return cfg.IndexType == TSDBType
}

return usingForPeriodConfigs(configs, fn)
}

func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
9 changes: 6 additions & 3 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/openstack"
"github.com/grafana/loki/pkg/storage/chunk/client/testutils"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
@@ -61,8 +62,9 @@ type Config struct {
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"`

MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"`
TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper"`

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
@@ -89,6 +91,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.")
cfg.BoltDBShipperConfig.RegisterFlags(f)
f.IntVar(&cfg.MaxChunkBatchSize, "store.max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.")
cfg.TSDBShipperConfig.RegisterFlagsWithPrefix("tsdb.", f)
}

// Validate config and returns error on failure
@@ -150,7 +153,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi
return boltDBIndexClientWithShipper, nil
}

if shouldUseIndexGatewayClient(cfg) {
if shouldUseBoltDBIndexGatewayClient(cfg) {
gateway, err := shipper.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, util_log.Logger)
if err != nil {
return nil, err
60 changes: 56 additions & 4 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
@@ -23,13 +23,16 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/tsdb"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
@@ -184,7 +187,7 @@ func (s *store) chunkClientForPeriod(p config.PeriodConfig) (client.Client, erro
return chunks, nil
}

func shouldUseIndexGatewayClient(cfg Config) bool {
func shouldUseBoltDBIndexGatewayClient(cfg Config) bool {
if cfg.BoltDBShipperConfig.Mode != shipper.ModeReadOnly || cfg.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled {
return false
}
@@ -198,11 +201,60 @@ func shouldUseIndexGatewayClient(cfg Config) bool {
}

func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, stores.Index, func(), error) {
// todo switch tsdb.

indexClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{"component": "index-store-" + p.From.String()}, s.registerer)

if p.IndexType == config.TSDBType {
var (
nodeName = s.cfg.TSDBShipperConfig.IngesterName
dir = s.cfg.TSDBShipperConfig.ActiveIndexDirectory
)
tsdbMetrics := tsdb.NewMetrics(indexClientReg)
objectClient, err := NewObjectClient(s.cfg.TSDBShipperConfig.SharedStoreType, s.cfg, s.clientMetrics)
if err != nil {
return nil, nil, nil, err
}

shpr, err := indexshipper.NewIndexShipper(
s.cfg.TSDBShipperConfig,
objectClient,
s.limits,
tsdb.OpenShippableTSDB,
)
if err != nil {
return nil, nil, nil, err
}
tsdbManager := tsdb.NewTSDBManager(
nodeName,
dir,
shpr,
p.IndexTables.Period,
util_log.Logger,
tsdbMetrics,
)
// TODO(owen-d): Only need HeadManager
// on the ingester. Otherwise, the TSDBManager is sufficient
headManager := tsdb.NewHeadManager(
util_log.Logger,
dir,
tsdbMetrics,
tsdbManager,
)
if err := headManager.Start(); err != nil {
return nil, nil, nil, err
}
idx := tsdb.NewIndexClient(headManager, p)
writer := tsdb.NewChunkWriter(f, p, headManager)

// TODO(owen-d): add TSDB index-gateway support

return writer, idx,
func() {
chunkClient.Stop()
f.Stop()
}, nil
}

idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, indexClientReg)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
@@ -222,7 +274,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
index stores.Index = seriesdIndex
)

if shouldUseIndexGatewayClient(s.cfg) {
if shouldUseBoltDBIndexGatewayClient(s.cfg) {
// inject the index-gateway client into the index store
gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
76 changes: 68 additions & 8 deletions pkg/storage/stores/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
@@ -7,13 +7,15 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
@@ -22,6 +24,10 @@ import (
"github.com/grafana/loki/pkg/util/spanlogger"
)

const (
gzipExtension = ".gz"
)

type IndexSet interface {
Init() error
Close()
@@ -300,11 +306,12 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDow
}

for _, file := range files {
listedDBs[file.Name] = struct{}{}
normalized := strings.TrimSuffix(file.Name, gzipExtension)
listedDBs[normalized] = struct{}{}

// Checking whether file was already downloaded, if not, download it.
// We do not ever upload files in the object store with the same name but different contents so we do not consider downloading modified files again.
_, ok := t.index[file.Name]
_, ok := t.index[normalized]
if !ok {
toDownload = append(toDownload, file)
}
@@ -323,11 +330,65 @@ func (t *indexSet) AwaitReady(ctx context.Context) error {
return t.indexMtx.awaitReady(ctx)
}

func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) error {
return shipper_util.DownloadFileFromStorage(filepath.Join(folderPathForTable, fileName), shipper_util.IsCompressedFile(fileName),
true, shipper_util.LoggerWithFilename(t.logger, fileName), func() (io.ReadCloser, error) {
func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) (string, error) {
decompress := shipper_util.IsCompressedFile(fileName)
dst := filepath.Join(folderPathForTable, fileName)
if decompress {
dst = strings.Trim(dst, gzipExtension)
}
return filepath.Base(dst), downloadFileFromStorage(
dst,
decompress,
true,
shipper_util.LoggerWithFilename(t.logger, fileName),
func() (io.ReadCloser, error) {
return t.baseIndexSet.GetFile(ctx, t.tableName, t.userID, fileName)
})
},
)
}

// DownloadFileFromStorage downloads a file from storage to given location.
func downloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc shipper_util.GetFileFunc) error {
start := time.Now()
readCloser, err := getFileFunc()
if err != nil {
return err
}

defer func() {
if err := readCloser.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close read closer", "err", err)
}
}()

f, err := os.Create(destination)
if err != nil {
return err
}

defer func() {
if err := f.Close(); err != nil {
level.Warn(logger).Log("msg", "failed to close file", "file", destination)
}
}()
var objectReader io.Reader = readCloser
if decompressFile {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)

objectReader = decompressedReader
}

_, err = io.Copy(f, objectReader)
if err != nil {
return err
}

level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start))
if sync {
return f.Sync()
}
return nil
}

// doConcurrentDownload downloads objects(files) concurrently. It ignores only missing file errors caused by removal of file by compaction.
@@ -337,8 +398,7 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind
downloadedFilesMtx := sync.Mutex{}

err := concurrency.ForEachJob(ctx, len(files), maxDownloadConcurrency, func(ctx context.Context, idx int) error {
fileName := files[idx].Name
err := t.downloadFileFromStorage(ctx, fileName, t.cacheLocation)
fileName, err := t.downloadFileFromStorage(ctx, files[idx].Name, t.cacheLocation)
if err != nil {
if t.baseIndexSet.IsFileNotFoundErr(err) {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing file %s, possibly removed during compaction", fileName))
9 changes: 5 additions & 4 deletions pkg/storage/stores/indexshipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
@@ -241,18 +241,19 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
return err
}

// regex for finding daily tables which have a 5 digit number at the end.
re, err := regexp.Compile(`.+[0-9]{5}$`)
// regexp for finding the trailing index bucket number at the end
re, err := regexp.Compile(`[0-9]+$`)
if err != nil {
return err
}

for _, tableName := range tables {
if !re.MatchString(tableName) {
match := re.Find([]byte(tableName))
if match == nil {
continue
}

tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64)
tableNumber, err := strconv.ParseInt(string(match), 10, 64)
if err != nil {
return err
}
7 changes: 3 additions & 4 deletions pkg/storage/stores/indexshipper/shipper.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,6 @@ type Config struct {
ResyncInterval time.Duration `yaml:"resync_interval"`
QueryReadyNumDays int `yaml:"query_ready_num_days"`

UploaderName string
IngesterName string
Mode Mode
IngesterDBRetainPeriod time.Duration
@@ -83,9 +82,10 @@ type indexShipper struct {

// NewIndexShipper creates a shipper for providing index store functionality using index files and object storage.
// It manages the whole life cycle of uploading the index and downloading the index at query time.
func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits) (IndexShipper, error) {
func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, open index.OpenIndexFileFunc) (IndexShipper, error) {
shipper := indexShipper{
cfg: cfg,
cfg: cfg,
openIndexFileFunc: open,
}

err := shipper.init(storageClient, limits)
@@ -103,7 +103,6 @@ func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.

if s.cfg.Mode != ModeReadOnly {
cfg := uploads.Config{
Uploader: s.cfg.UploaderName,
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.IngesterDBRetainPeriod,
}
12 changes: 1 addition & 11 deletions pkg/storage/stores/indexshipper/uploads/index_set.go
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@ type indexSet struct {
storageIndexSet storage.IndexSet
tableName, userID string
logger log.Logger
uploader string

index map[string]index.Index
indexMtx sync.RWMutex
@@ -238,14 +237,5 @@ func (t *indexSet) removeIndex(name string) error {
}

func (t *indexSet) buildFileName(indexName string) string {
// Files are stored with <uploader>-<index-name>
fileName := fmt.Sprintf("%s-%s", t.uploader, indexName)

// if the file is a migrated one then don't add its name to the object key otherwise we would re-upload them again here with a different name.
// This is kept for historic reasons of boltdb-shipper.
if t.tableName == indexName {
fileName = t.uploader
}

return fmt.Sprintf("%s.gz", fileName)
return fmt.Sprintf("%s.gz", indexName)
}
4 changes: 1 addition & 3 deletions pkg/storage/stores/indexshipper/uploads/table.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@ type Table interface {
// All the public methods are concurrency safe and take care of mutexes to avoid any data race.
type table struct {
name string
uploader string
baseUserIndexSet, baseCommonIndexSet storage.IndexSet
logger log.Logger

@@ -41,10 +40,9 @@ type table struct {
}

// NewTable create a new table instance.
func NewTable(name, uploader string, storageClient storage.Client) Table {
func NewTable(name string, storageClient storage.Client) Table {
return &table{
name: name,
uploader: uploader,
baseUserIndexSet: storage.NewIndexSet(storageClient, true),
baseCommonIndexSet: storage.NewIndexSet(storageClient, false),
logger: log.With(util_log.Logger, "table-name", name),
3 changes: 1 addition & 2 deletions pkg/storage/stores/indexshipper/uploads/table_manager.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ import (
)

type Config struct {
Uploader string
UploadInterval time.Duration
DBRetainPeriod time.Duration
}
@@ -101,7 +100,7 @@ func (tm *tableManager) getOrCreateTable(tableName string) Table {

table, ok = tm.tables[tableName]
if !ok {
table = NewTable(tableName, tm.cfg.Uploader, tm.storageClient)
table = NewTable(tableName, tm.storageClient)
tm.tables[tableName] = table
}
}
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ func buildTestTableManager(t *testing.T, testDir string) (TableManager, stopFunc
storageClient := buildTestStorageClient(t, testDir)

cfg := Config{
Uploader: "test-table-manager",
UploadInterval: time.Hour,
}
tm, err := NewTableManager(cfg, storageClient)
3 changes: 1 addition & 2 deletions pkg/storage/stores/indexshipper/uploads/table_test.go
Original file line number Diff line number Diff line change
@@ -13,13 +13,12 @@ import (

const (
testTableName = "test-table"
uploader = "test-uploader"
)

func TestTable(t *testing.T) {
tempDir := t.TempDir()
storageClient := buildTestStorageClient(t, tempDir)
testTable := NewTable(testTableName, uploader, storageClient)
testTable := NewTable(testTableName, storageClient)
defer testTable.Stop()

for userIdx := 0; userIdx < 2; userIdx++ {
8 changes: 4 additions & 4 deletions pkg/storage/stores/series/series_store_write.go
Original file line number Diff line number Diff line change
@@ -20,13 +20,13 @@ import (
)

var (
dedupedChunksTotal = promauto.NewCounter(prometheus.CounterOpts{
DedupedChunksTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "chunk_store_deduped_chunks_total",
Help: "Count of chunks which were not stored because they have already been stored by another replica.",
})

indexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
IndexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "chunk_store_index_entries_per_chunk",
Help: "Number of entries written to storage per chunk.",
@@ -86,7 +86,7 @@ func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk

if len(found) > 0 {
writeChunk = false
dedupedChunksTotal.Inc()
DedupedChunksTotal.Inc()
}

// If we dont have to write the chunk and DisableIndexDeduplication is false, we do not have to do anything.
@@ -170,7 +170,7 @@ func (c *Writer) calculateIndexEntries(ctx context.Context, from, through model.
}
entries = append(entries, chunkEntries...)

indexEntriesPerChunk.Observe(float64(len(entries)))
IndexEntriesPerChunk.Observe(float64(len(entries)))

// Remove duplicate entries based on tableName:hashValue:rangeValue
result := c.indexWriter.NewWriteBatch()
24 changes: 14 additions & 10 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
@@ -80,16 +80,20 @@ type Config struct {

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.IndexGatewayClientConfig.RegisterFlagsWithPrefix("boltdb.shipper.index-gateway-client", f)

f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it")
f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of common index to be kept downloaded for queries. For per tenant index query readiness, use limits overrides config.")
f.BoolVar(&cfg.BuildPerTenantIndex, "boltdb.shipper.build-per-tenant-index", false, "Build per tenant index files")
cfg.RegisterFlagsWithPrefix("boltdb.", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.IndexGatewayClientConfig.RegisterFlagsWithPrefix(prefix+"shipper.index-gateway-client", f)

f.StringVar(&cfg.ActiveIndexDirectory, prefix+"shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.SharedStoreType, prefix+"shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, prefix+"shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it")
f.StringVar(&cfg.CacheLocation, prefix+"shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, prefix+"shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, prefix+"shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
f.IntVar(&cfg.QueryReadyNumDays, prefix+"shipper.query-ready-num-days", 0, "Number of days of common index to be kept downloaded for queries. For per tenant index query readiness, use limits overrides config.")
f.BoolVar(&cfg.BuildPerTenantIndex, prefix+"shipper.build-per-tenant-index", false, "Build per tenant index files")
}

func (cfg *Config) Validate() error {
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package index
package tsdb

import (
"context"
@@ -13,33 +13,9 @@ import (
"github.com/prometheus/prometheus/storage"

chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// Identifier has all the information needed to resolve a TSDB index
// Notably this abstracts away OS path separators, etc.
type Identifier struct {
Tenant string
From, Through model.Time
Checksum uint32
}

func (i Identifier) String() string {
return filepath.Join(
i.Tenant,
fmt.Sprintf(
"%s-%d-%d-%x.tsdb",
IndexFilename,
i.From,
i.Through,
i.Checksum,
),
)
}

func (i Identifier) FilePath(parentDir string) string {
return filepath.Join(parentDir, i.String())
}

// Builder is a helper used to create tsdb indices.
// It can accept streams in any order and will create the tsdb
// index appropriately via `Build()`
@@ -51,41 +27,50 @@ type Builder struct {

type stream struct {
labels labels.Labels
chunks ChunkMetas
fp model.Fingerprint
chunks index.ChunkMetas
}

func NewBuilder() *Builder {
return &Builder{streams: make(map[string]*stream)}
}

func (b *Builder) AddSeries(ls labels.Labels, chks []ChunkMeta) {
func (b *Builder) AddSeries(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
id := ls.String()
s, ok := b.streams[id]
if !ok {
s = &stream{
labels: ls,
fp: fp,
}
b.streams[id] = s
}

s.chunks = append(s.chunks, chks...)
}

func (b *Builder) Build(ctx context.Context, dir, tenant string) (id Identifier, err error) {
func (b *Builder) Build(
ctx context.Context,
scratchDir string,
// Determines how to create the resulting Identifier and file name.
// This is variable as we use Builder for multiple reasons,
// such as building multi-tenant tsdbs on the ingester
// and per tenant ones during compaction
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, err error) {
// Ensure the parent dir exists (i.e. index/<bucket>/<tenant>/)
parent := filepath.Join(dir, tenant)
if parent != "" {
if err := chunk_util.EnsureDirectory(parent); err != nil {
if scratchDir != "" {
if err := chunk_util.EnsureDirectory(scratchDir); err != nil {
return id, err
}
}

// First write tenant/index-bounds-random.staging
rng := rand.Int63()
name := fmt.Sprintf("%s-%x.staging", IndexFilename, rng)
tmpPath := filepath.Join(parent, name)
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)

writer, err := NewWriter(ctx, tmpPath)
writer, err := index.NewWriter(ctx, tmpPath)
if err != nil {
return id, err
}
@@ -128,7 +113,7 @@ func (b *Builder) Build(ctx context.Context, dir, tenant string) (id Identifier,

// Add series
for i, s := range streams {
if err := writer.AddSeries(storage.SeriesRef(i), s.labels, s.chunks.finalize()...); err != nil {
if err := writer.AddSeries(storage.SeriesRef(i), s.labels, s.fp, s.chunks.Finalize()...); err != nil {
return id, err
}
}
@@ -137,20 +122,15 @@ func (b *Builder) Build(ctx context.Context, dir, tenant string) (id Identifier,
return id, err
}

reader, err := NewFileReader(tmpPath)
reader, err := index.NewFileReader(tmpPath)
if err != nil {
return id, err
}

from, through := reader.Bounds()

// load the newly compacted index to grab checksum, promptly close
id = Identifier{
Tenant: tenant,
From: model.Time(from),
Through: model.Time(through),
Checksum: reader.Checksum(),
}
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())

reader.Close()
defer func() {
@@ -159,11 +139,13 @@ func (b *Builder) Build(ctx context.Context, dir, tenant string) (id Identifier,
}
}()

dst := id.FilePath(dir)

if err := os.Rename(tmpPath, dst); err != nil {
if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err
}

return id, nil
return dst, nil
}
100 changes: 100 additions & 0 deletions pkg/storage/stores/tsdb/chunkwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package tsdb

import (
"context"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/grafana/loki/pkg/util/spanlogger"
)

type IndexWriter interface {
Append(userID string, ls labels.Labels, chks index.ChunkMetas) error
}

type ChunkWriter struct {
schemaCfg config.SchemaConfig
fetcher *fetcher.Fetcher
indexWriter IndexWriter
}

func NewChunkWriter(
fetcher *fetcher.Fetcher,
pd config.PeriodConfig,
indexWriter IndexWriter,
) *ChunkWriter {
return &ChunkWriter{
schemaCfg: config.SchemaConfig{
Configs: []config.PeriodConfig{pd},
},
fetcher: fetcher,
indexWriter: indexWriter,
}
}

func (w *ChunkWriter) Put(ctx context.Context, chunks []chunk.Chunk) error {
for _, chunk := range chunks {
if err := w.PutOne(ctx, chunk.From, chunk.Through, chunk); err != nil {
return err
}
}
return nil
}

func (w *ChunkWriter) PutOne(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
log, ctx := spanlogger.New(ctx, "SeriesStore.PutOne")
defer log.Finish()

// with local TSDB indices, we _always_ write the index entry
// to avoid data loss if we lose an ingester's disk
// but we can skip writing the chunk if another replica
// has already written it to storage.
writeChunk := true

// If this chunk is in cache it must already be in the database so we don't need to write it again
found, _, _, _ := w.fetcher.Cache().Fetch(ctx, []string{w.schemaCfg.ExternalKey(chk.ChunkRef)})

if len(found) > 0 {
writeChunk = false
series.DedupedChunksTotal.Inc()
}

chunks := []chunk.Chunk{chk}

c := w.fetcher.Client()
if writeChunk {
if err := c.PutChunks(ctx, chunks); err != nil {
return errors.Wrap(err, "writing chunk")
}
}

// Always write the index to benefit durability via replication factor.
metas := index.ChunkMetas{
{
Checksum: chk.ChunkRef.Checksum,
MinTime: int64(chk.ChunkRef.From),
MaxTime: int64(chk.ChunkRef.Through),
KB: uint32(chk.Size()) / (1 << 10),
Entries: uint32(chk.Data.Entries()),
},
}
if err := w.indexWriter.Append(chk.UserID, chk.Metric, metas); err != nil {
return errors.Wrap(err, "writing index entry")
}

if writeChunk {
if cacheErr := w.fetcher.WriteBackCache(ctx, chunks); cacheErr != nil {
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}
}

return nil
}
44 changes: 37 additions & 7 deletions pkg/storage/stores/tsdb/compact.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package tsdb

import (
"context"
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@@ -23,34 +24,63 @@ func NewCompactor(tenant, parentDir string) *Compactor {
}
}

func (c *Compactor) Compact(ctx context.Context, indices ...*TSDBIndex) (res index.Identifier, err error) {
func (c *Compactor) Compact(ctx context.Context, indices ...*TSDBIndex) (res Identifier, err error) {
// No need to compact a single index file
if len(indices) == 1 {
return indices[0].Identifier(c.tenant), nil
return newPrefixedIdentifier(
indices[0].Identifier(c.tenant),
c.parentDir,
c.parentDir,
),
nil
}

b := index.NewBuilder()
multi, err := NewMultiIndex(indices...)
ifcs := make([]Index, 0, len(indices))
for _, idx := range indices {
ifcs = append(ifcs, idx)
}

b := NewBuilder()
multi, err := NewMultiIndex(ifcs...)
if err != nil {
return res, err
}

// TODO(owen-d): introduce parallelism
// Until then,
// Instead of using the MultiIndex.forIndices helper, we loop over each sub-index manually.
// The index builder is single threaded, so we avoid races.
// Additionally, this increases the likelihood we add chunks in order
// by processing the indices in ascending order.
for _, idx := range multi.indices {
if err := idx.forSeries(
casted, ok := idx.(*TSDBIndex)
if !ok {
return nil, fmt.Errorf("expected tsdb index to compact, found :%T", idx)
}
if err := casted.forSeries(
ctx,
nil,
func(ls labels.Labels, _ model.Fingerprint, chks []index.ChunkMeta) {
// AddSeries copies chks into it's own slice
b.AddSeries(ls.Copy(), chks)
b.AddSeries(ls.Copy(), model.Fingerprint(ls.Hash()), chks)
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return res, err
}
}

return b.Build(ctx, c.parentDir, c.tenant)
return b.Build(
ctx,
c.parentDir,
func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
Tenant: c.tenant,
From: from,
Through: through,
Checksum: checksum,
}
return newPrefixedIdentifier(id, c.parentDir, c.parentDir)
},
)
}
7 changes: 5 additions & 2 deletions pkg/storage/stores/tsdb/compact_test.go
Original file line number Diff line number Diff line change
@@ -356,16 +356,19 @@ func TestCompactor(t *testing.T) {
for _, cases := range tc.input {
idx := BuildIndex(t, dir, "fake", cases)
defer idx.Close()
indices = append(indices, idx)
casted, ok := idx.Index.(*TSDBIndex)
require.Equal(t, true, ok)
indices = append(indices, casted)
}

out, err := c.Compact(context.Background(), indices...)
if tc.err {
require.NotNil(t, err)
return
}
require.Nil(t, err)

idx, err := LoadTSDBIdentifier(dir, out)
idx, err := NewShippableTSDBFile(out, false)
require.Nil(t, err)
defer idx.Close()

59 changes: 42 additions & 17 deletions pkg/storage/stores/tsdb/head.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@ const (
// 1) Heads are per-tenant in Loki
// 2) Loki tends to have a few orders of magnitude less series per node than
// Prometheus|Cortex|Mimir.
// Do not specify without bit shifting. This allows us to
// do shard index calcuations via bitwise & rather than modulos.
defaultStripeSize = 64
)

@@ -53,16 +55,36 @@ guaranteeing we maintain querying consistency for the entire data lifecycle.
*/

// TODO(owen-d)
type HeadMetrics struct {
seriesNotFound prometheus.Counter
type Metrics struct {
seriesNotFound prometheus.Counter
tsdbCreationsTotal prometheus.Counter
tsdbCreationFailures prometheus.Counter
tsdbManagerUpdatesTotal prometheus.Counter
tsdbManagerUpdatesFailedTotal prometheus.Counter
}

func NewHeadMetrics(r prometheus.Registerer) *HeadMetrics {
return &HeadMetrics{
func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
seriesNotFound: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_head_series_not_found_total",
Help: "Total number of requests for series that were not found.",
}),
tsdbCreationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_creations_total",
Help: "Total number of tsdb creations attempted",
}),
tsdbCreationFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_creations_failed_total",
Help: "Total number of tsdb creations failed",
}),
tsdbManagerUpdatesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_manager_updates_total",
Help: "Total number of tsdb manager updates (loading/rotating tsdbs in mem)",
}),
tsdbManagerUpdatesFailedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_manager_updates_failed_total",
Help: "Total number of tsdb manager update failures (loading/rotating tsdbs in mem)",
}),
}
}

@@ -75,7 +97,7 @@ type Head struct {
// in the MemPostings, but is eventually discarded when we create a real TSDB index.
lastSeriesID atomic.Uint64

metrics *HeadMetrics
metrics *Metrics
logger log.Logger

series *stripeSeries
@@ -86,7 +108,7 @@ type Head struct {
closed bool
}

func NewHead(tenant string, metrics *HeadMetrics, logger log.Logger) *Head {
func NewHead(tenant string, metrics *Metrics, logger log.Logger) *Head {
return &Head{
tenant: tenant,
metrics: metrics,
@@ -108,42 +130,44 @@ func (h *Head) MaxTime() int64 {
return h.maxTime.Load()
}

func (h *Head) updateMinMaxTime(mint, maxt int64) {
// Will CAS until successfully updates bounds or the condition is no longer valid
func updateMintMaxt(mint, maxt int64, mintSrc, maxtSrc *atomic.Int64) {
for {
lt := h.MinTime()
if mint >= lt {
lt := mintSrc.Load()
if mint >= lt && lt != 0 {
break
}
if h.minTime.CAS(lt, mint) {
if mintSrc.CAS(lt, mint) {
break
}
}
for {
ht := h.MaxTime()
ht := maxtSrc.Load()
if maxt <= ht {
break
}
if h.maxTime.CAS(ht, maxt) {
if maxtSrc.CAS(ht, maxt) {
break
}
}
}

// Note: chks must not be nil or zero-length
func (h *Head) Append(ls labels.Labels, chks index.ChunkMetas) {
func (h *Head) Append(ls labels.Labels, chks index.ChunkMetas) (created bool, refID uint64) {
from, through := chks.Bounds()
var id uint64
created := h.series.Append(ls, chks, func() *memSeries {
created, refID = h.series.Append(ls, chks, func() *memSeries {
id = h.lastSeriesID.Inc()
return newMemSeries(id, ls)
})
h.updateMinMaxTime(int64(from), int64(through))
updateMintMaxt(int64(from), int64(through), &h.minTime, &h.maxTime)

if !created {
return
}
h.postings.Add(storage.SeriesRef(id), ls)
h.numSeries.Inc()
return
}

// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
@@ -211,7 +235,7 @@ func (s *stripeSeries) Append(
ls labels.Labels,
chks index.ChunkMetas,
createFn func() *memSeries,
) (created bool) {
) (created bool, refID uint64) {
fp := ls.Hash()
i := fp & uint64(s.shards-1)
mtx := &s.locks[i]
@@ -222,7 +246,7 @@ func (s *stripeSeries) Append(
series = createFn()
s.hashes[i].set(fp, series)

// the series locks are modulo'd by the ref, not fingerprint
// the series locks are determined by the ref, not fingerprint
refIdx := series.ref & uint64(s.shards-1)
s.series[refIdx][series.ref] = series
created = true
@@ -231,6 +255,7 @@ func (s *stripeSeries) Append(

series.Lock()
series.chks = append(series.chks, chks...)
refID = series.ref
series.Unlock()

return
663 changes: 663 additions & 0 deletions pkg/storage/stores/tsdb/head_manager.go

Large diffs are not rendered by default.

310 changes: 310 additions & 0 deletions pkg/storage/stores/tsdb/head_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
package tsdb

import (
"context"
"math"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

type noopTSDBManager struct{ NoopIndex }

func (noopTSDBManager) BuildFromWALs(_ time.Time, _ []WALIdentifier) error { return nil }

func chunkMetasToChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []ChunkRef) {
for _, x := range xs {
res = append(res, ChunkRef{
User: user,
Fingerprint: model.Fingerprint(fp),
Start: x.From(),
End: x.Through(),
Checksum: x.Checksum,
})
}
return
}

// Test append
func Test_TenantHeads_Append(t *testing.T) {
h := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger())
ls := mustParseLabels(`{foo="bar"}`)
chks := []index.ChunkMeta{
{
Checksum: 0,
MinTime: 1,
MaxTime: 10,
KB: 2,
Entries: 30,
},
}
_ = h.Append("fake", ls, chks)

found, err := h.GetChunkRefs(
context.Background(),
"fake",
0,
100,
nil, nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs("fake", ls.Hash(), chks), found)

}

// Test multitenant reads
func Test_TenantHeads_MultiRead(t *testing.T) {
h := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger())
ls := mustParseLabels(`{foo="bar"}`)
chks := []index.ChunkMeta{
{
Checksum: 0,
MinTime: 1,
MaxTime: 10,
KB: 2,
Entries: 30,
},
}

tenants := []struct {
user string
ls labels.Labels
}{
{
user: "tenant1",
ls: append(ls.Copy(), labels.Label{
Name: "tenant",
Value: "tenant1",
}),
},
{
user: "tenant2",
ls: append(ls.Copy(), labels.Label{
Name: "tenant",
Value: "tenant2",
}),
},
}

// add data for both tenants
for _, tenant := range tenants {
_ = h.Append(tenant.user, tenant.ls, chks)

}

// ensure we're only returned the data from the correct tenant
for _, tenant := range tenants {
found, err := h.GetChunkRefs(
context.Background(),
tenant.user,
0,
100,
nil, nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(tenant.user, tenant.ls.Hash(), chks), found)
}

}

// test head recover from wal
func Test_HeadManager_RecoverHead(t *testing.T) {
now := time.Now()
dir := t.TempDir()
cases := []struct {
Labels labels.Labels
Chunks []index.ChunkMeta
User string
}{
{
User: "tenant1",
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 10,
Checksum: 3,
},
},
},
{
User: "tenant2",
Labels: mustParseLabels(`{foo="bard", bazz="bozz", bonk="borb"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 7,
Checksum: 4,
},
},
},
}

mgr := NewHeadManager(log.NewNopLogger(), dir, nil, noopTSDBManager{})
// This bit is normally handled by the Start() fn, but we're testing a smaller surface area
// so ensure our dirs exist
for _, d := range managerRequiredDirs(dir) {
require.Nil(t, util.EnsureDirectory(d))
}

// Call Rotate() to ensure the new head tenant heads exist, etc
require.Nil(t, mgr.Rotate(now))

// now build a WAL independently to test recovery
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.dir, now), now)
require.Nil(t, err)

for i, c := range cases {
require.Nil(t, w.Log(&WALRecord{
UserID: c.User,
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.Labels,
},
Chks: ChunkMetasRecord{
Chks: c.Chunks,
Ref: uint64(i),
},
}))
}

require.Nil(t, w.Stop())

grp, ok, err := walsForPeriod(mgr.dir, mgr.period, mgr.period.PeriodFor(now))
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, 1, len(grp.wals))
require.Nil(t, recoverHead(mgr.dir, mgr.activeHeads, grp.wals))

for _, c := range cases {
refs, err := mgr.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Labels.Hash(), c.Chunks), refs)
}

}

// test mgr recover from multiple wals across multiple periods
func Test_HeadManager_Lifecycle(t *testing.T) {
dir := t.TempDir()
curPeriod := time.Now()
cases := []struct {
Labels labels.Labels
Chunks []index.ChunkMeta
User string
}{
{
User: "tenant1",
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 10,
Checksum: 3,
},
},
},
{
User: "tenant2",
Labels: mustParseLabels(`{foo="bard", bazz="bozz", bonk="borb"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 7,
Checksum: 4,
},
},
},
}

mgr := NewHeadManager(log.NewNopLogger(), dir, nil, noopTSDBManager{})
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.dir, curPeriod), curPeriod)
require.Nil(t, err)

// Write old WALs
for i, c := range cases {
require.Nil(t, w.Log(&WALRecord{
UserID: c.User,
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.Labels,
},
Chks: ChunkMetasRecord{
Chks: c.Chunks,
Ref: uint64(i),
},
}))
}

require.Nil(t, w.Stop())

// Start, ensuring recovery from old WALs
require.Nil(t, mgr.Start())
// Ensure old WAL data is queryable
for _, c := range cases {
refs, err := mgr.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)

lbls := labels.NewBuilder(c.Labels)
lbls.Set(TenantLabel, c.User)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Labels.Hash(), c.Chunks), refs)
}

// Add data
newCase := struct {
Labels labels.Labels
Chunks []index.ChunkMeta
User string
}{
User: "tenant3",
Labels: mustParseLabels(`{foo="bard", other="hi"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 7,
Checksum: 4,
},
},
}

require.Nil(t, mgr.Append(newCase.User, newCase.Labels, newCase.Chunks))

// Ensure old + new data is queryable
for _, c := range append(cases, newCase) {
refs, err := mgr.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
nil, nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)

lbls := labels.NewBuilder(c.Labels)
lbls.Set(TenantLabel, c.User)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Labels.Hash(), c.Chunks), refs)
}
}
4 changes: 2 additions & 2 deletions pkg/storage/stores/tsdb/head_read.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ import (
)

// Index returns an IndexReader against the block.
func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64), nil
func (h *Head) Index() IndexReader {
return h.indexRange(math.MinInt64, math.MaxInt64)
}

func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
213 changes: 213 additions & 0 deletions pkg/storage/stores/tsdb/head_wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package tsdb

import (
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/grafana/loki/pkg/util/encoding"
)

type WAL interface {
Start(time.Time) error
Log(*WALRecord) error
Stop() error
}

// TODO(owen-d): There are probably some performance gains to be had by utilizing
// pools here, but in the interest of implementation time and given chunks aren't
// flushed often (generally ~5/s), this seems fine.
// This may also be applicable to varint encoding.

// 128KB
// The segment sizes are kept small for the TSDB Head here because
// we only store chunk references
const walSegmentSize = 128 << 10

type RecordType byte

// By prefixing records with versions, we can easily update our wal schema
const (
// FirstWrite is a special record type written once
// at the beginning of every WAL. It records the system time
// when the WAL was created. This is used to determine when to rotate
// WALs and persists across restarts.
WalRecordSeries RecordType = iota
WalRecordChunks
)

type WALRecord struct {
UserID string
Series record.RefSeries
Chks ChunkMetasRecord
}

type ChunkMetasRecord struct {
Chks index.ChunkMetas
Ref uint64
}

func (r *WALRecord) encodeSeries(b []byte) []byte {
buf := encoding.EncWith(b)
buf.PutByte(byte(WalRecordSeries))
buf.PutUvarintStr(r.UserID)

var enc record.Encoder
// The 'encoded' already has the type header and userID here, hence re-using
// the remaining part of the slice (i.e. encoded[len(encoded):])) to encode the series.
encoded := buf.Get()
encoded = append(encoded, enc.Series([]record.RefSeries{r.Series}, encoded[len(encoded):])...)

return encoded
}

func (r *WALRecord) encodeChunks(b []byte) []byte {
buf := encoding.EncWith(b)
buf.PutByte(byte(WalRecordChunks))
buf.PutUvarintStr(r.UserID)
buf.PutBE64(r.Chks.Ref)
buf.PutUvarint(len(r.Chks.Chks))

for _, chk := range r.Chks.Chks {
buf.PutBE64(uint64(chk.MinTime))
buf.PutBE64(uint64(chk.MaxTime))
buf.PutBE32(chk.Checksum)
buf.PutBE32(chk.KB)
buf.PutBE32(chk.Entries)
}

return buf.Get()
}

func decodeChunks(b []byte, rec *WALRecord) error {
if len(b) == 0 {
return nil
}

dec := encoding.DecWith(b)

rec.Chks.Ref = dec.Be64()
if err := dec.Err(); err != nil {
return errors.Wrap(err, "decoding series ref")
}

ln := dec.Uvarint()
if err := dec.Err(); err != nil {
return errors.Wrap(err, "decoding number of chunks")
}
// allocate space for the required number of chunks
rec.Chks.Chks = make(index.ChunkMetas, 0, ln)

for len(dec.B) > 0 && dec.Err() == nil {
rec.Chks.Chks = append(rec.Chks.Chks, index.ChunkMeta{
MinTime: dec.Be64int64(),
MaxTime: dec.Be64int64(),
Checksum: dec.Be32(),
KB: dec.Be32(),
Entries: dec.Be32(),
})
}

if err := dec.Err(); err != nil {
return errors.Wrap(err, "decoding chunk metas")
}

return nil
}

func decodeWALRecord(b []byte, walRec *WALRecord) error {
var (
userID string
dec record.Decoder

decbuf = encoding.DecWith(b)
t = RecordType(decbuf.Byte())
)

switch t {
case WalRecordSeries:
userID = decbuf.UvarintStr()
rSeries, err := dec.Series(decbuf.B, nil)
if err != nil {
return errors.Wrap(err, "decoding head series")
}
// unlike tsdb, we only add one series per record.
if len(rSeries) > 1 {
return errors.New("more than one series detected in tsdb head wal record")
}
if len(rSeries) == 1 {
walRec.Series = rSeries[0]
}
case WalRecordChunks:
userID = decbuf.UvarintStr()
if err := decodeChunks(decbuf.B, walRec); err != nil {
return err
}
default:
return errors.New("unknown record type")
}

if decbuf.Err() != nil {
return decbuf.Err()
}

walRec.UserID = userID
return nil
}

// the headWAL, unlike Head, is multi-tenant. This is just to avoid the need to maintain
// an open segment per tenant (potentially thousands of them)
type headWAL struct {
initialized time.Time
log log.Logger
wal *wal.WAL
}

func newHeadWAL(log log.Logger, dir string, t time.Time) (*headWAL, error) {
// NB: if we use a non-nil Prometheus Registerer, ensure
// that the underlying metrics won't conflict with existing WAL metrics in the ingester.
// Likely, this can be done by adding extra label(s)
wal, err := wal.NewSize(log, nil, dir, walSegmentSize, false)
if err != nil {
return nil, err
}

return &headWAL{
initialized: t,
log: log,
wal: wal,
}, nil
}

func (w *headWAL) Stop() error {
return w.wal.Close()
}

func (w *headWAL) Log(record *WALRecord) error {
if record == nil {
return nil
}

var buf []byte

// Always write series before chunks
if len(record.Series.Labels) > 0 {
buf = record.encodeSeries(buf[:0])
if err := w.wal.Log(buf); err != nil {
return err
}
}

if len(record.Chks.Chks) > 0 {
buf = record.encodeChunks(buf[:0])
if err := w.wal.Log(buf); err != nil {
return err
}
}

return nil
}
102 changes: 102 additions & 0 deletions pkg/storage/stores/tsdb/head_wal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package tsdb

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

func Test_Encoding_Series(t *testing.T) {
record := &WALRecord{
UserID: "foo",
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(1),
Labels: mustParseLabels(`{foo="bar"}`),
},
}
buf := record.encodeSeries(nil)
decoded := &WALRecord{}

err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, decoded)
}

func Test_Encoding_Chunks(t *testing.T) {
record := &WALRecord{
UserID: "foo",
Chks: ChunkMetasRecord{
Ref: 1,
Chks: index.ChunkMetas{
{
Checksum: 1,
MinTime: 1,
MaxTime: 4,
KB: 5,
Entries: 6,
},
{
Checksum: 2,
MinTime: 5,
MaxTime: 10,
KB: 7,
Entries: 8,
},
},
},
}
buf := record.encodeChunks(nil)
decoded := &WALRecord{}

err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, decoded)
}

func Test_HeadWALLog(t *testing.T) {
dir := t.TempDir()
w, err := newHeadWAL(log.NewNopLogger(), dir, time.Now())
require.Nil(t, err)

newSeries := &WALRecord{
UserID: "foo",
Series: record.RefSeries{Ref: 1, Labels: mustParseLabels(`{foo="bar"}`)},
Chks: ChunkMetasRecord{
Chks: []index.ChunkMeta{
{
Checksum: 1,
MinTime: 1,
MaxTime: 10,
KB: 5,
Entries: 50,
},
},
Ref: 1,
},
}
require.Nil(t, w.Log(newSeries))

chunksOnly := &WALRecord{
UserID: "foo",
Chks: ChunkMetasRecord{
Chks: []index.ChunkMeta{
{
Checksum: 2,
MinTime: 5,
MaxTime: 100,
KB: 3,
Entries: 25,
},
},
Ref: 1,
},
}
require.Nil(t, w.Log(chunksOnly))
require.Nil(t, w.Stop())
}
202 changes: 202 additions & 0 deletions pkg/storage/stores/tsdb/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package tsdb

import (
"fmt"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// Identifier can resolve an index to a name (in object storage)
// and a path (on disk)
type Identifier interface {
Name() string
Path() string
}

// identifierFromPath will detect whether this is a single or multitenant TSDB
func identifierFromPath(p string) (Identifier, error) {
multiID, multitenant := parseMultitenantTSDBPath(p)
if multitenant {
parent := filepath.Dir(p)
return newPrefixedIdentifier(multiID, parent, ""), nil
}

id, parent, ok := parseSingleTenantTSDBPath(p)
if !ok {
return nil, fmt.Errorf("invalid tsdb path: %s", p)
}

return newPrefixedIdentifier(id, parent, ""), nil
}

func newPrefixedIdentifier(id Identifier, path, name string) prefixedIdentifier {
return prefixedIdentifier{
Identifier: id,
parentPath: path,
parentName: name,
}
}

// parentIdentifier wraps an Identifier and prepends to its methods
type prefixedIdentifier struct {
parentPath, parentName string
Identifier
}

func (p prefixedIdentifier) Path() string {
return filepath.Join(p.parentPath, p.Identifier.Path())
}

func (p prefixedIdentifier) Name() string {
return path.Join(p.parentName, p.Identifier.Name())
}

func newSuffixedIdentifier(id Identifier, pathSuffix string) suffixedIdentifier {
return suffixedIdentifier{
pathSuffix: pathSuffix,
Identifier: id,
}
}

// Generally useful for gzip extensions
type suffixedIdentifier struct {
pathSuffix string
Identifier
}

func (s suffixedIdentifier) Path() string {
return s.Identifier.Path() + s.pathSuffix
}

// Identifier has all the information needed to resolve a TSDB index
// Notably this abstracts away OS path separators, etc.
type SingleTenantTSDBIdentifier struct {
Tenant string
From, Through model.Time
Checksum uint32
}

func (i SingleTenantTSDBIdentifier) str() string {
return fmt.Sprintf(
"%s-%d-%d-%x.tsdb",
index.IndexFilename,
i.From,
i.Through,
i.Checksum,
)
}

func (i SingleTenantTSDBIdentifier) Name() string {
return path.Join(i.Tenant, i.str())
}

func (i SingleTenantTSDBIdentifier) Path() string {
return filepath.Join(i.Tenant, i.str())
}

func parseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, parent string, ok bool) {
// parsing as multitenant didn't work, so try single tenant
file := filepath.Base(p)
parents := filepath.Dir(p)
pathPrefix := filepath.Dir(parents)
tenant := filepath.Base(parents)

// no tenant was provided
if tenant == "." {
return
}

// incorrect suffix
trimmed := strings.TrimSuffix(file, ".tsdb")
if trimmed == file {
return
}

elems := strings.Split(trimmed, "-")
if len(elems) != 4 {
return
}

if elems[0] != index.IndexFilename {
return
}

from, err := strconv.ParseInt(elems[1], 10, 64)
if err != nil {
return
}

through, err := strconv.ParseInt(elems[2], 10, 64)
if err != nil {
return
}

checksum, err := strconv.ParseInt(elems[3], 16, 32)
if err != nil {
return
}

return SingleTenantTSDBIdentifier{
Tenant: tenant,
From: model.Time(from),
Through: model.Time(through),
Checksum: uint32(checksum),
}, pathPrefix, true

}

type MultitenantTSDBIdentifier struct {
nodeName string
ts time.Time
}

func (id MultitenantTSDBIdentifier) Name() string {
return fmt.Sprintf("%d-%s.tsdb", id.ts.Unix(), id.nodeName)
}

func (id MultitenantTSDBIdentifier) Path() string {
// There are no directories, so reuse name
return id.Name()
}

func parseMultitenantTSDBPath(p string) (id MultitenantTSDBIdentifier, ok bool) {
cleaned := filepath.Base(p)
return parseMultitenantTSDBNameFromBase(cleaned)
}

func parseMultitenantTSDBName(p string) (id MultitenantTSDBIdentifier, ok bool) {
cleaned := path.Base(p)
return parseMultitenantTSDBNameFromBase(cleaned)
}

func parseMultitenantTSDBNameFromBase(name string) (res MultitenantTSDBIdentifier, ok bool) {

trimmed := strings.TrimSuffix(name, ".tsdb")

// incorrect suffix
if trimmed == name {
return
}

xs := strings.Split(trimmed, "-")
if len(xs) != 2 {
return
}

ts, err := strconv.Atoi(xs[0])
if err != nil {
return
}

return MultitenantTSDBIdentifier{
ts: time.Unix(int64(ts), 0),
nodeName: xs[1],
}, true
}
57 changes: 57 additions & 0 deletions pkg/storage/stores/tsdb/identifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package tsdb

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestParseSingleTenantTSDBPath(t *testing.T) {
for _, tc := range []struct {
desc string
input string
id SingleTenantTSDBIdentifier
parent string
ok bool
}{
{
desc: "simple_works",
input: "parent/fake/index-1-10-ff.tsdb",
id: SingleTenantTSDBIdentifier{
Tenant: "fake",
From: 1,
Through: 10,
Checksum: 255,
},
parent: "parent",
ok: true,
},
{
desc: "no tenant dir",
input: "index-1-10-ff.tsdb",
ok: false,
},
{
desc: "wrong index name",
input: "fake/notindex-1-10-ff.tsdb",
ok: false,
},
{
desc: "wrong argument len",
input: "fake/index-10-ff.tsdb",
ok: false,
},
{
desc: "wrong argument encoding",
input: "fake/index-ff-10-ff.tsdb",
ok: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
id, parent, ok := parseSingleTenantTSDBPath(tc.input)
require.Equal(t, tc.id, id)
require.Equal(t, tc.parent, parent)
require.Equal(t, tc.ok, ok)
})
}
}
24 changes: 24 additions & 0 deletions pkg/storage/stores/tsdb/index.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

@@ -32,6 +33,8 @@ func (r ChunkRef) Less(x ChunkRef) bool {

type Index interface {
Bounded
SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
Close() error
// GetChunkRefs accepts an optional []ChunkRef argument.
// If not nil, it will use that slice to build the result,
// allowing us to avoid unnecessary allocations at the caller's discretion.
@@ -48,3 +51,24 @@ type Index interface {
LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)
LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)
}

type NoopIndex struct{}

func (NoopIndex) Close() error { return nil }
func (NoopIndex) Bounds() (from, through model.Time) { return }
func (NoopIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
return nil, nil
}

// Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.
func (NoopIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
return nil, nil
}
func (NoopIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
return nil, nil
}
func (NoopIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
return nil, nil
}

func (NoopIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {}
4 changes: 2 additions & 2 deletions pkg/storage/stores/tsdb/index/chunk.go
Original file line number Diff line number Diff line change
@@ -63,10 +63,10 @@ func (c ChunkMetas) Less(i, j int) bool {
return a.Checksum < b.Checksum
}

// finalize sorts and dedupes
// Finalize sorts and dedupes
// TODO(owen-d): can we remove the need for this by ensuring we only push
// in order and without duplicates?
func (c ChunkMetas) finalize() ChunkMetas {
func (c ChunkMetas) Finalize() ChunkMetas {
sort.Sort(c)

if len(c) == 0 {
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/index/chunk_test.go
Original file line number Diff line number Diff line change
@@ -135,7 +135,7 @@ func TestChunkMetasFinalize(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.output, tc.input.finalize())
require.Equal(t, tc.output, tc.input.Finalize())
})
}
}
53 changes: 31 additions & 22 deletions pkg/storage/stores/tsdb/index/index.go
Original file line number Diff line number Diff line change
@@ -30,10 +30,10 @@ import (
"unsafe"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
tsdb_enc "github.com/prometheus/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"

"github.com/grafana/loki/pkg/util/encoding"
@@ -134,8 +134,9 @@ type Writer struct {
fingerprintOffsets FingerprintOffsets

// Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels
lastRef storage.SeriesRef
lastSeries labels.Labels
lastSeriesHash uint64
lastRef storage.SeriesRef

crc32 hash.Hash

@@ -436,13 +437,17 @@ func (w *Writer) writeMeta() error {
}

// AddSeries adds the series one at a time along with its chunks.
func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...ChunkMeta) error {
// Requires a specific fingerprint to be passed in the case where the "desired"
// fingerprint differs from what labels.Hash() produces. For example,
// multitenant TSDBs embed a tenant label, but the actual series has no such
// label and so the derived fingerprint differs.
func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.Fingerprint, chunks ...ChunkMeta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
}

labelHash := lset.Hash()
lastHash := w.lastSeries.Hash()
labelHash := uint64(fp)
lastHash := w.lastSeriesHash
// Ensure series are sorted by the priorities: [`hash(labels)`, `labels`]
if (labelHash < lastHash && len(w.lastSeries) > 0) || labelHash == lastHash && labels.Compare(lset, w.lastSeries) < 0 {
return errors.Errorf("out-of-order series added with label set %q", lset)
@@ -530,6 +535,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
}

w.lastSeries = append(w.lastSeries[:0], lset...)
w.lastSeriesHash = labelHash
w.lastRef = ref

if ref%fingerprintInterval == 0 {
@@ -598,7 +604,7 @@ func (w *Writer) finishSymbols() error {
}

// Load in the symbol table efficiently for the rest of the index writing.
w.symbols, err = NewSymbols(realByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols))
w.symbols, err = NewSymbols(RealByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols))
if err != nil {
return errors.Wrap(err, "read symbols")
}
@@ -617,7 +623,7 @@ func (w *Writer) writeLabelIndices() error {
}
defer f.Close()

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.pos)))
cnt := w.cntPO
current := []byte{}
values := []uint32{}
@@ -787,7 +793,7 @@ func (w *Writer) writePostingsOffsetTable() error {
f.Close()
}
}()
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.pos)))
cnt := w.cntPO
for d.Err() == nil && cnt > 0 {
w.buf1.Reset()
@@ -848,7 +854,9 @@ func (w *Writer) writeFingerprintOffsetsTable() error {

// write length
ln := w.buf1.Len()
if ln > math.MaxUint32 {
// TODO(owen-d): can remove the uint32 cast in the future
// Had to uint32 wrap these for arm32 builds, which we'll remove in the future.
if uint32(ln) > uint32(math.MaxUint32) {
return errors.Errorf("fingerprint offset size exceeds 4 bytes: %d", ln)
}

@@ -905,7 +913,7 @@ func (w *Writer) writePostingsToTmpFiles() error {

// Write out the special all posting.
offsets := []uint32{}
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
@@ -951,7 +959,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
// Label name -> label value -> positions.
postings := map[uint32]map[uint32][]uint32{}

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
@@ -1167,17 +1175,17 @@ type ByteSlice interface {
Range(start, end int) []byte
}

type realByteSlice []byte
type RealByteSlice []byte

func (b realByteSlice) Len() int {
func (b RealByteSlice) Len() int {
return len(b)
}

func (b realByteSlice) Range(start, end int) []byte {
func (b RealByteSlice) Range(start, end int) []byte {
return b[start:end]
}

func (b realByteSlice) Sub(start, end int) ByteSlice {
func (b RealByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}

@@ -1187,18 +1195,19 @@ func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, ioutil.NopCloser(nil))
}

type nopCloser struct{}

func (nopCloser) Close() error { return nil }

// NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path)
b, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
r, err := newReader(realByteSlice(f.Bytes()), f)
r, err := newReader(RealByteSlice(b), nopCloser{})
if err != nil {
return nil, tsdb_errors.NewMulti(
err,
f.Close(),
).Err()
return r, err
}

return r, nil
19 changes: 10 additions & 9 deletions pkg/storage/stores/tsdb/index/index_test.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
@@ -168,10 +169,10 @@ func TestIndexRW_Postings(t *testing.T) {

// Postings lists are only written if a series with the respective
// reference was added before.
require.NoError(t, iw.AddSeries(1, series[0]))
require.NoError(t, iw.AddSeries(2, series[1]))
require.NoError(t, iw.AddSeries(3, series[2]))
require.NoError(t, iw.AddSeries(4, series[3]))
require.NoError(t, iw.AddSeries(1, series[0], model.Fingerprint(series[0].Hash())))
require.NoError(t, iw.AddSeries(2, series[1], model.Fingerprint(series[1].Hash())))
require.NoError(t, iw.AddSeries(3, series[2], model.Fingerprint(series[2].Hash())))
require.NoError(t, iw.AddSeries(4, series[3], model.Fingerprint(series[3].Hash())))

require.NoError(t, iw.Close())

@@ -257,7 +258,7 @@ func TestPostingsMany(t *testing.T) {
})

for i, s := range series {
require.NoError(t, iw.AddSeries(storage.SeriesRef(i), s))
require.NoError(t, iw.AddSeries(storage.SeriesRef(i), s, model.Fingerprint(s.Hash())))
}
require.NoError(t, iw.Close())

@@ -384,7 +385,7 @@ func TestPersistence_index_e2e(t *testing.T) {
mi := newMockIndex()

for i, s := range input {
err = iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...)
err = iw.AddSeries(storage.SeriesRef(i), s.labels, model.Fingerprint(s.labels.Hash()), s.chunks...)
require.NoError(t, err)
require.NoError(t, mi.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))

@@ -465,14 +466,14 @@ func TestPersistence_index_e2e(t *testing.T) {
}

func TestDecbufUvarintWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
b := RealByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})

db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable)
require.Error(t, db.Err())
}

func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
b := RealByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})

_, err := NewReader(b)
require.Error(t, err)
@@ -509,7 +510,7 @@ func TestSymbols(t *testing.T) {
checksum := crc32.Checksum(buf.Get()[symbolsStart+4:], castagnoliTable)
buf.PutBE32(checksum) // Check sum at the end.

s, err := NewSymbols(realByteSlice(buf.Get()), FormatV2, symbolsStart)
s, err := NewSymbols(RealByteSlice(buf.Get()), FormatV2, symbolsStart)
require.NoError(t, err)

// We store only 4 offsets to symbols.
118 changes: 118 additions & 0 deletions pkg/storage/stores/tsdb/index_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package tsdb

import (
"context"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// implements stores.Index
type IndexClient struct {
schema config.SchemaConfig
idx Index
}

func NewIndexClient(idx Index, pd config.PeriodConfig) *IndexClient {
return &IndexClient{
schema: config.SchemaConfig{
Configs: []config.PeriodConfig{pd},
},
idx: idx,
}
}

// TODO(owen-d): This is a hack for compatibility with how the current query-mapping works.
// Historically, Loki will read the index shard factor and the query planner will inject shard
// labels accordingly.
// In the future, we should use dynamic sharding in TSDB to determine the shard factors
// and we may no longer wish to send a shard label inside the queries,
// but rather expose it as part of the stores.Index interface
func (c *IndexClient) shard(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.ShardAnnotation, error) {
s, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers)
if err != nil {
return nil, nil, err
}

var shard *index.ShardAnnotation
if s != nil {
matchers = append(matchers[:shardLabelIndex], matchers[shardLabelIndex+1:]...)
shard = &index.ShardAnnotation{
Shard: uint32(s.Shard),
Of: uint32(s.Of),
}
}

return matchers, shard, err

}

// TODO(owen-d): synchronize logproto.ChunkRef and tsdb.ChunkRef so we don't have to convert.
// They share almost the same fields, so we can add the missing `KB` field to the proto and then
// use that within the tsdb package.
func (c *IndexClient) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
matchers, shard, err := c.shard(matchers...)
if err != nil {
return nil, err
}

// TODO(owen-d): use a pool to reduce allocs here
chks, err := c.idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
if err != nil {
return nil, err
}

refs := make([]logproto.ChunkRef, 0, len(chks))
for _, chk := range chks {
refs = append(refs, logproto.ChunkRef{
Fingerprint: uint64(chk.Fingerprint),
UserID: chk.User,
From: chk.Start,
Through: chk.End,
Checksum: chk.Checksum,
})
}

return refs, err
}

func (c *IndexClient) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
matchers, shard, err := c.shard(matchers...)
if err != nil {
return nil, err
}

xs, err := c.idx.Series(ctx, userID, from, through, nil, shard, matchers...)
if err != nil {
return nil, err
}

res := make([]labels.Labels, 0, len(xs))
for _, x := range xs {
res = append(res, x.Labels)
}
return res, nil
}

// tsdb no longer uses the __metric_name__="logs" hack, so we can ignore metric names!
func (c *IndexClient) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
return c.idx.LabelValues(ctx, userID, from, through, labelName, matchers...)
}

// tsdb no longer uses the __metric_name__="logs" hack, so we can ignore metric names!
func (c *IndexClient) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return c.idx.LabelNames(ctx, userID, from, through)
}

// SetChunkFilterer sets a chunk filter to be used when retrieving chunks.
// This is only used for GetSeries implementation.
// Todo we might want to pass it as a parameter to GetSeries instead.
func (c *IndexClient) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
c.idx.SetChunkFilterer(chunkFilter)
}
66 changes: 66 additions & 0 deletions pkg/storage/stores/tsdb/lazy_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package tsdb

import (
"context"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// Index adapter for a function which returns an index when queried.
type LazyIndex func() (Index, error)

func (f LazyIndex) Bounds() (model.Time, model.Time) {
i, err := f()
if err != nil {
return 0, 0
}
return i.Bounds()
}

func (f LazyIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i, err := f()
if err == nil {
i.SetChunkFilterer(chunkFilter)
}
}

func (f LazyIndex) Close() error {
i, err := f()
if err != nil {
return err
}
return i.Close()
}

func (f LazyIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
i, err := f()
if err != nil {
return nil, err
}
return i.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)
}
func (f LazyIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
i, err := f()
if err != nil {
return nil, err
}
return i.Series(ctx, userID, from, through, res, shard, matchers...)
}
func (f LazyIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
i, err := f()
if err != nil {
return nil, err
}
return i.LabelNames(ctx, userID, from, through, matchers...)
}
func (f LazyIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
i, err := f()
if err != nil {
return nil, err
}
return i.LabelValues(ctx, userID, from, through, name, matchers...)
}
284 changes: 284 additions & 0 deletions pkg/storage/stores/tsdb/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
package tsdb

import (
"context"
"fmt"
"math"
"path/filepath"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// nolint:revive
// TSDBManager wraps the index shipper and writes/manages
// TSDB files on disk
type TSDBManager interface {
Index
// Builds a new TSDB file from a set of WALs
BuildFromWALs(time.Time, []WALIdentifier) error
}

/*
tsdbManager is responsible for:
* Turning WALs into optimized multi-tenant TSDBs when requested
* Serving reads from these TSDBs
* Shipping them to remote storage
* Keeping them available for querying
* Removing old TSDBs which are no longer needed
*/
type tsdbManager struct {
indexPeriod time.Duration
nodeName string // node name
log log.Logger
dir string
metrics *Metrics

sync.RWMutex

chunkFilter chunk.RequestChunkFilterer
shipper indexshipper.IndexShipper
}

func NewTSDBManager(
nodeName,
dir string,
shipper indexshipper.IndexShipper,
indexPeriod time.Duration,
logger log.Logger,
metrics *Metrics,
) TSDBManager {
return &tsdbManager{
indexPeriod: indexPeriod,
nodeName: nodeName,
log: log.With(logger, "component", "tsdb-manager"),
dir: dir,
metrics: metrics,
shipper: shipper,
}
}

func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) {
level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t)
// get relevant wals
// iterate them, build tsdb in scratch dir
defer func() {
m.metrics.tsdbCreationsTotal.Inc()
if err != nil {
m.metrics.tsdbCreationFailures.Inc()
}
}()

level.Debug(m.log).Log("msg", "recovering tenant heads")
tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, ids); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}

periods := make(map[int]*Builder)

if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) {

// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[int]index.ChunkMetas)
for _, chk := range chks {
for _, bucket := range indexBuckets(m.indexPeriod, chk.From(), chk.Through()) {
pds[bucket] = append(pds[bucket], chk)
}
}

// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(TenantLabel, user)
withTenant := lb.Labels()

// Add the chunks to all relevant builders
for pd, matchingChks := range pds {
b, ok := periods[pd]
if !ok {
b = NewBuilder()
periods[pd] = b
}

b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(ls.Hash()),
matchingChks,
)
}

}); err != nil {
level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs")
return err
}

for p, b := range periods {

dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := newPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: t,
},
dstDir,
"",
)

level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path())
// build+move tsdb to multitenant dir
start := time.Now()
_, err = b.Build(
context.Background(),
managerScratchDir(m.dir),
func(from, through model.Time, checksum uint32) Identifier {
return dst
},
)
if err != nil {
return err
}

level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start))

loaded, err := NewShippableTSDBFile(dst, false)
if err != nil {
return err
}

if err := m.shipper.AddIndex(fmt.Sprintf("%d", p), "", loaded); err != nil {
return err
}
}

return nil
}

func indexBuckets(indexPeriod time.Duration, from, through model.Time) (res []int) {
start := from.Time().UnixNano() / int64(indexPeriod)
end := through.Time().UnixNano() / int64(indexPeriod)
for cur := start; cur <= end; cur++ {
res = append(res, int(cur))
}
return
}

func (m *tsdbManager) indices(ctx context.Context, from, through model.Time, user string) (Index, error) {
var indices []Index

// Ensure we query both per tenant and multitenant TSDBs

for _, bkt := range indexBuckets(m.indexPeriod, from, through) {
if err := m.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(idx shipper_index.Index) error {
_, multitenant := parseMultitenantTSDBName(idx.Name())
impl, ok := idx.(Index)
if !ok {
return fmt.Errorf("unexpected shipper index type: %T", idx)
}
if multitenant {
indices = append(indices, NewMultiTenantIndex(impl))
} else {
indices = append(indices, impl)
}
return nil
}); err != nil {
return nil, err
}

}

if len(indices) == 0 {
return NoopIndex{}, nil
}
idx, err := NewMultiIndex(indices...)
if err != nil {
return nil, err
}

if m.chunkFilter != nil {
idx.SetChunkFilterer(m.chunkFilter)
}
return idx, nil
}

// TODO(owen-d): how to better implement this?
// setting 0->maxint will force the tsdbmanager to always query
// underlying tsdbs, which is safe, but can we optimize this?
func (m *tsdbManager) Bounds() (model.Time, model.Time) {
return 0, math.MaxInt64
}

func (m *tsdbManager) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
m.chunkFilter = chunkFilter
}

// Close implements Index.Close, but we offload this responsibility
// to the index shipper
func (m *tsdbManager) Close() error {
return nil
}

func (m *tsdbManager) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
matchers = withoutNameLabel(matchers)
return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)
}

func (m *tsdbManager) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
matchers = withoutNameLabel(matchers)
return idx.Series(ctx, userID, from, through, res, shard, matchers...)
}

func (m *tsdbManager) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
matchers = withoutNameLabel(matchers)
return idx.LabelNames(ctx, userID, from, through, matchers...)
}

func (m *tsdbManager) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
matchers = withoutNameLabel(matchers)
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
}

// TODO(owen-d): in the future, handle this by preventing passing the __name__="logs" label
// to TSDB indices at all.
func withoutNameLabel(matchers []*labels.Matcher) []*labels.Matcher {
if len(matchers) == 0 {
return nil
}

dst := make([]*labels.Matcher, 0, len(matchers)-1)
for _, m := range matchers {
if m.Name == labels.MetricName {
continue
}
dst = append(dst, m)
}

return dst
}
67 changes: 40 additions & 27 deletions pkg/storage/stores/tsdb/multi_file_index.go
Original file line number Diff line number Diff line change
@@ -3,35 +3,25 @@ package tsdb
import (
"context"
"errors"
"sort"

"github.com/grafana/dskit/multierror"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

type MultiIndex struct {
indices []*TSDBIndex
indices []Index
}

func NewMultiIndex(indices ...*TSDBIndex) (*MultiIndex, error) {
func NewMultiIndex(indices ...Index) (*MultiIndex, error) {
if len(indices) == 0 {
return nil, errors.New("must supply at least one index")
}

sort.Slice(indices, func(i, j int) bool {
aFrom, aThrough := indices[i].Bounds()
bFrom, bThrough := indices[j].Bounds()

if aFrom != bFrom {
return aFrom < bFrom
}
// tiebreaker uses through
return aThrough <= bThrough
})

return &MultiIndex{indices: indices}, nil
}

@@ -51,7 +41,24 @@ func (i *MultiIndex) Bounds() (model.Time, model.Time) {
return lowest, highest
}

func (i *MultiIndex) forIndices(ctx context.Context, from, through model.Time, fn func(context.Context, *TSDBIndex) (interface{}, error)) ([]interface{}, error) {
func (i *MultiIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
for _, x := range i.indices {
x.SetChunkFilterer(chunkFilter)
}
}

func (i *MultiIndex) Close() error {
var errs multierror.MultiError
for _, idx := range i.indices {
if err := idx.Close(); err != nil {
errs = append(errs, err)
}
}
return errs.Err()

}

func (i *MultiIndex) forIndices(ctx context.Context, from, through model.Time, fn func(context.Context, Index) (interface{}, error)) ([]interface{}, error) {
queryBounds := newBounds(from, through)
g, ctx := errgroup.WithContext(ctx)

@@ -64,14 +71,20 @@ func (i *MultiIndex) forIndices(ctx context.Context, from, through model.Time, f
if Overlap(queryBounds, idx) {
// run all queries in linked goroutines (cancel after first err),
// bounded by parallelism controls if applicable.
g.Go(func() error {
got, err := fn(ctx, idx)
if err != nil {
return err
}
ch <- got
return nil
})

// must wrap g.Go in anonymous function to capture
// idx variable during iteration
func(idx Index) {
g.Go(func() error {
got, err := fn(ctx, idx)
if err != nil {
return err
}
ch <- got
return nil
})
}(idx)

}
}

@@ -94,7 +107,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
}
res = res[:0]

groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx *TSDBIndex) (interface{}, error) {
groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx Index) (interface{}, error) {
return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
})
if err != nil {
@@ -128,7 +141,7 @@ func (i *MultiIndex) Series(ctx context.Context, userID string, from, through mo
}
res = res[:0]

groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx *TSDBIndex) (interface{}, error) {
groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx Index) (interface{}, error) {
return idx.Series(ctx, userID, from, through, nil, shard, matchers...)
})
if err != nil {
@@ -154,7 +167,7 @@ func (i *MultiIndex) Series(ctx context.Context, userID string, from, through mo
}

func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx *TSDBIndex) (interface{}, error) {
groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx Index) (interface{}, error) {
return idx.LabelNames(ctx, userID, from, through, matchers...)
})
if err != nil {
@@ -189,7 +202,7 @@ func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, throug
}

func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx *TSDBIndex) (interface{}, error) {
groups, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx Index) (interface{}, error) {
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
})
if err != nil {
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/multi_file_index_test.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ func TestMultiIndex(t *testing.T) {

// group 5 indices together, all with duplicate data
n := 5
var indices []*TSDBIndex
var indices []Index
dir := t.TempDir()
for i := 0; i < n; i++ {
indices = append(indices, BuildIndex(t, dir, "fake", cases))
74 changes: 74 additions & 0 deletions pkg/storage/stores/tsdb/multitenant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package tsdb

import (
"context"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

// TenantLabel is part of the reserved label namespace (__ prefix)
// It's used to create multi-tenant TSDBs (which do not have a tenancy concept)
// These labels are stripped out during compaction to single-tenant TSDBs
const TenantLabel = "__loki_tenant__"

// MultiTenantIndex will inject a tenant label to it's queries
// This works with pre-compacted TSDBs which aren't yet per tenant.
type MultiTenantIndex struct {
idx Index
}

func NewMultiTenantIndex(idx Index) *MultiTenantIndex {
return &MultiTenantIndex{idx: idx}
}

func withTenantLabel(userID string, matchers []*labels.Matcher) []*labels.Matcher {
cpy := make([]*labels.Matcher, len(matchers))
copy(cpy, matchers)
cpy = append(cpy, labels.MustNewMatcher(labels.MatchEqual, TenantLabel, userID))
return cpy
}

func withoutTenantLabel(ls labels.Labels) labels.Labels {
for i, l := range ls {
if l.Name == TenantLabel {
ls = append(ls[:i], ls[i+1:]...)
break
}
}
return ls
}

func (m *MultiTenantIndex) Bounds() (model.Time, model.Time) { return m.idx.Bounds() }

func (m *MultiTenantIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
m.idx.SetChunkFilterer(chunkFilter)
}

func (m *MultiTenantIndex) Close() error { return m.idx.Close() }

func (m *MultiTenantIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
return m.idx.GetChunkRefs(ctx, userID, from, through, res, shard, withTenantLabel(userID, matchers)...)
}

func (m *MultiTenantIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
xs, err := m.idx.Series(ctx, userID, from, through, res, shard, withTenantLabel(userID, matchers)...)
if err != nil {
return nil, err
}
for i := range xs {
xs[i].Labels = withoutTenantLabel(xs[i].Labels)
}
return xs, nil
}

func (m *MultiTenantIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
return m.idx.LabelNames(ctx, userID, from, through, withTenantLabel(userID, matchers)...)
}

func (m *MultiTenantIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
return m.idx.LabelValues(ctx, userID, from, through, name, withTenantLabel(userID, matchers)...)
}
17 changes: 13 additions & 4 deletions pkg/storage/stores/tsdb/querier_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

@@ -21,7 +22,7 @@ func mustParseLabels(s string) labels.Labels {

func TestQueryIndex(t *testing.T) {
dir := t.TempDir()
b := index.NewBuilder()
b := NewBuilder()
cases := []struct {
labels labels.Labels
chunks []index.ChunkMeta
@@ -85,13 +86,21 @@ func TestQueryIndex(t *testing.T) {
},
}
for _, s := range cases {
b.AddSeries(s.labels, s.chunks)
b.AddSeries(s.labels, model.Fingerprint(s.labels.Hash()), s.chunks)
}

dst, err := b.Build(context.Background(), dir, "fake")
dst, err := b.Build(context.Background(), dir, func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
Tenant: "fake",
From: from,
Through: through,
Checksum: checksum,
}
return newPrefixedIdentifier(id, dir, dir)
})
require.Nil(t, err)

reader, err := index.NewFileReader(dst.FilePath(dir))
reader, err := index.NewFileReader(dst.Path())
require.Nil(t, err)

p, err := PostingsForMatchers(reader, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
Loading

0 comments on commit b45efd4

Please sign in to comment.