Skip to content

Commit

Permalink
undo extraction to testutils
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Mar 19, 2024
1 parent 00361e2 commit b3de24c
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 166 deletions.
11 changes: 11 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/client/testutils"
"github.com/grafana/loki/pkg/storage/config"
)

func parseTime(s string) model.Time {
Expand All @@ -25,6 +26,16 @@ func parseTime(s string) model.Time {
return model.TimeFromUnix(t.Unix())
}

func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{
Time: model.TimeFromUnix(t.Unix()),
}
}

func newMockBloomClient(t *testing.T) (*BloomClient, string) {
oc := testutils.NewInMemoryObjectClient()
dir := t.TempDir()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/bloomshipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestBloomShipper_ForEach(t *testing.T) {

store, _, _ := newMockBloomStore(t)
for i := 0; i < len(blockRefs); i++ {
block, err := CreateBlockInStorage(t, store, "tenant", model.Time(i*24*int(time.Hour)), 0x0000, 0x00ff)
block, err := createBlockInStorage(t, store, "tenant", model.Time(i*24*int(time.Hour)), 0x0000, 0x00ff)
require.NoError(t, err)
blockRefs = append(blockRefs, block.BlockRef)
}
Expand Down
167 changes: 141 additions & 26 deletions pkg/storage/stores/shipper/bloomshipper/store_test.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,158 @@
package bloomshipper

import (
"bytes"
"context"
"encoding/json"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
storageconfig "github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)

func newMockBloomStore(t *testing.T) (*BloomStore, string, error) {
workDir := t.TempDir()
return newMockBloomStoreWithWorkDir(t, workDir)
}

func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, string, error) {
periodicConfigs := []storageconfig.PeriodConfig{
{
ObjectType: storageconfig.StorageTypeInMemory,
From: parseDayTime("2024-01-01"),
IndexTables: storageconfig.IndexPeriodicTableConfig{
PeriodicTableConfig: storageconfig.PeriodicTableConfig{
Period: 24 * time.Hour,
Prefix: "schema_a_table_",
}},
},
{
ObjectType: storageconfig.StorageTypeInMemory,
From: parseDayTime("2024-02-01"),
IndexTables: storageconfig.IndexPeriodicTableConfig{
PeriodicTableConfig: storageconfig.PeriodicTableConfig{
Period: 24 * time.Hour,
Prefix: "schema_b_table_",
}},
},
}

storageConfig := storage.Config{
BloomShipperConfig: config.Config{
WorkingDirectory: workDir,
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
BlocksCache: cache.EmbeddedCacheConfig{
MaxSizeItems: 1000,
TTL: 1 * time.Hour,
},
},
}

reg := prometheus.NewPedanticRegistry()
metrics := storage.NewClientMetrics()
t.Cleanup(metrics.Unregister)
logger := log.NewLogfmtLogger(os.Stderr)

metasCache := cache.NewMockCache()
blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger)

store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, reg, logger)
if err == nil {
t.Cleanup(store.Stop)
}

return store, workDir, err
}

func createMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) {
meta := Meta{
MetaRef: MetaRef{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
// Unused
// StartTimestamp: start,
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
}
err := store.storeDo(start, func(s *bloomStoreEntry) error {
raw, _ := json.Marshal(meta)
meta.MetaRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0]
return s.objectClient.PutObject(context.Background(), s.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw))
})
return meta, err
}

func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) {
tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")

blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)

err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)

_, _ = fp.Seek(0, 0)

block := Block{
BlockRef: BlockRef{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
},
Data: fp,
}
err = store.storeDo(start, func(s *bloomStoreEntry) error {
block.BlockRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0]
return s.objectClient.PutObject(context.Background(), s.Block(block.BlockRef).Addr(), block.Data)
})
return block, err
}

func TestBloomStore_ResolveMetas(t *testing.T) {
store, _, err := NewMockBloomStore(t)
store, _, err := newMockBloomStore(t)
require.NoError(t, err)

// schema 1
// outside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// inside of interval, inside of bounds
m1, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)

// schema 2
// inside of interval, inside of bounds
m2, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff)
// outside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff)

t.Run("tenant matches", func(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -72,28 +187,28 @@ func TestBloomStore_ResolveMetas(t *testing.T) {
}

func TestBloomStore_FetchMetas(t *testing.T) {
store, _, err := NewMockBloomStore(t)
store, _, err := newMockBloomStore(t)
require.NoError(t, err)

// schema 1
// outside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// inside of interval, inside of bounds
m1, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)

// schema 2
// inside of interval, inside of bounds
m2, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff)
// outside of interval, outside of bounds
_, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff)
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff)

t.Run("tenant matches", func(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -126,15 +241,15 @@ func TestBloomStore_FetchMetas(t *testing.T) {
}

func TestBloomStore_FetchBlocks(t *testing.T) {
store, _, err := NewMockBloomStore(t)
store, _, err := newMockBloomStore(t)
require.NoError(t, err)

// schema 1
b1, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
b2, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
b2, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// schema 2
b3, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
b4, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff)
b3, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
b4, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff)

ctx := context.Background()

Expand Down Expand Up @@ -165,7 +280,7 @@ func TestBloomShipper_WorkingDir(t *testing.T) {
fi, _ := os.Stat(wd)
t.Log("working directory", wd, fi.Mode())

_, _, err = NewMockBloomStoreWithWorkDir(t, wd)
_, _, err = newMockBloomStoreWithWorkDir(t, wd)
require.ErrorContains(t, err, "insufficient permissions")
})

Expand All @@ -175,9 +290,9 @@ func TestBloomShipper_WorkingDir(t *testing.T) {
wd := filepath.Join(base, "doesnotexist")
t.Log("working directory", wd)

store, _, err := NewMockBloomStoreWithWorkDir(t, wd)
store, _, err := newMockBloomStoreWithWorkDir(t, wd)
require.NoError(t, err)
b, err := CreateBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
b, err := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
require.NoError(t, err)

ctx := context.Background()
Expand Down
Loading

0 comments on commit b3de24c

Please sign in to comment.