diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 4a7a3b9cadfe4..e5bbe3b5b1bf5 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -15,6 +15,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client/testutils" + "github.com/grafana/loki/pkg/storage/config" ) func parseTime(s string) model.Time { @@ -25,6 +26,16 @@ func parseTime(s string) model.Time { return model.TimeFromUnix(t.Unix()) } +func parseDayTime(s string) config.DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return config.DayTime{ + Time: model.TimeFromUnix(t.Unix()), + } +} + func newMockBloomClient(t *testing.T) (*BloomClient, string) { oc := testutils.NewInMemoryObjectClient() dir := t.TempDir() diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index 6b44ed51c6f9f..86e8ed90a174c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -144,7 +144,7 @@ func TestBloomShipper_ForEach(t *testing.T) { store, _, _ := newMockBloomStore(t) for i := 0; i < len(blockRefs); i++ { - block, err := CreateBlockInStorage(t, store, "tenant", model.Time(i*24*int(time.Hour)), 0x0000, 0x00ff) + block, err := createBlockInStorage(t, store, "tenant", model.Time(i*24*int(time.Hour)), 0x0000, 0x00ff) require.NoError(t, err) blockRefs = append(blockRefs, block.BlockRef) } diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index e79f5ec6b2e14..4b57e807195f1 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -1,43 +1,158 @@ package bloomshipper import ( + "bytes" "context" + "encoding/json" "math/rand" "os" "path/filepath" "testing" "time" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk/cache" storageconfig "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) +func newMockBloomStore(t *testing.T) (*BloomStore, string, error) { + workDir := t.TempDir() + return newMockBloomStoreWithWorkDir(t, workDir) +} + +func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, string, error) { + periodicConfigs := []storageconfig.PeriodConfig{ + { + ObjectType: storageconfig.StorageTypeInMemory, + From: parseDayTime("2024-01-01"), + IndexTables: storageconfig.IndexPeriodicTableConfig{ + PeriodicTableConfig: storageconfig.PeriodicTableConfig{ + Period: 24 * time.Hour, + Prefix: "schema_a_table_", + }}, + }, + { + ObjectType: storageconfig.StorageTypeInMemory, + From: parseDayTime("2024-02-01"), + IndexTables: storageconfig.IndexPeriodicTableConfig{ + PeriodicTableConfig: storageconfig.PeriodicTableConfig{ + Period: 24 * time.Hour, + Prefix: "schema_b_table_", + }}, + }, + } + + storageConfig := storage.Config{ + BloomShipperConfig: config.Config{ + WorkingDirectory: workDir, + BlocksDownloadingQueue: config.DownloadingQueueConfig{ + WorkersCount: 1, + }, + BlocksCache: cache.EmbeddedCacheConfig{ + MaxSizeItems: 1000, + TTL: 1 * time.Hour, + }, + }, + } + + reg := prometheus.NewPedanticRegistry() + metrics := storage.NewClientMetrics() + t.Cleanup(metrics.Unregister) + logger := log.NewLogfmtLogger(os.Stderr) + + metasCache := cache.NewMockCache() + blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) + + store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, reg, logger) + if err == nil { + t.Cleanup(store.Stop) + } + + return store, workDir, err +} + +func createMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) { + meta := Meta{ + MetaRef: MetaRef{ + Ref: Ref{ + TenantID: tenant, + Bounds: v1.NewBounds(minFp, maxFp), + // Unused + // StartTimestamp: start, + // EndTimestamp: start.Add(12 * time.Hour), + }, + }, + Blocks: []BlockRef{}, + } + err := store.storeDo(start, func(s *bloomStoreEntry) error { + raw, _ := json.Marshal(meta) + meta.MetaRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0] + return s.objectClient.PutObject(context.Background(), s.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw)) + }) + return meta, err +} + +func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { + tmpDir := t.TempDir() + fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") + + blockWriter := v1.NewDirectoryBlockWriter(tmpDir) + err := blockWriter.Init() + require.NoError(t, err) + + err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + require.NoError(t, err) + + _, _ = fp.Seek(0, 0) + + block := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: tenant, + Bounds: v1.NewBounds(minFp, maxFp), + StartTimestamp: start, + EndTimestamp: start.Add(12 * time.Hour), + }, + }, + Data: fp, + } + err = store.storeDo(start, func(s *bloomStoreEntry) error { + block.BlockRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0] + return s.objectClient.PutObject(context.Background(), s.Block(block.BlockRef).Addr(), block.Data) + }) + return block, err +} + func TestBloomStore_ResolveMetas(t *testing.T) { - store, _, err := NewMockBloomStore(t) + store, _, err := newMockBloomStore(t) require.NoError(t, err) // schema 1 // outside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff) // outside of interval, inside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff) // inside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) // inside of interval, inside of bounds - m1, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) // schema 2 // inside of interval, inside of bounds - m2, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) + m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) // inside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff) // outside of interval, inside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff) // outside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff) t.Run("tenant matches", func(t *testing.T) { ctx := context.Background() @@ -72,28 +187,28 @@ func TestBloomStore_ResolveMetas(t *testing.T) { } func TestBloomStore_FetchMetas(t *testing.T) { - store, _, err := NewMockBloomStore(t) + store, _, err := newMockBloomStore(t) require.NoError(t, err) // schema 1 // outside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff) // outside of interval, inside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff) // inside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) // inside of interval, inside of bounds - m1, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) // schema 2 // inside of interval, inside of bounds - m2, _ := CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) + m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) // inside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff) // outside of interval, inside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff) // outside of interval, outside of bounds - _, _ = CreateMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff) + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff) t.Run("tenant matches", func(t *testing.T) { ctx := context.Background() @@ -126,15 +241,15 @@ func TestBloomStore_FetchMetas(t *testing.T) { } func TestBloomStore_FetchBlocks(t *testing.T) { - store, _, err := NewMockBloomStore(t) + store, _, err := newMockBloomStore(t) require.NoError(t, err) // schema 1 - b1, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) - b2, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) + b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + b2, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) // schema 2 - b3, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) - b4, _ := CreateBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff) + b3, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) + b4, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff) ctx := context.Background() @@ -165,7 +280,7 @@ func TestBloomShipper_WorkingDir(t *testing.T) { fi, _ := os.Stat(wd) t.Log("working directory", wd, fi.Mode()) - _, _, err = NewMockBloomStoreWithWorkDir(t, wd) + _, _, err = newMockBloomStoreWithWorkDir(t, wd) require.ErrorContains(t, err, "insufficient permissions") }) @@ -175,9 +290,9 @@ func TestBloomShipper_WorkingDir(t *testing.T) { wd := filepath.Join(base, "doesnotexist") t.Log("working directory", wd) - store, _, err := NewMockBloomStoreWithWorkDir(t, wd) + store, _, err := newMockBloomStoreWithWorkDir(t, wd) require.NoError(t, err) - b, err := CreateBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + b, err := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) require.NoError(t, err) ctx := context.Background() diff --git a/pkg/storage/stores/shipper/bloomshipper/testutils.go b/pkg/storage/stores/shipper/bloomshipper/testutils.go deleted file mode 100644 index 2209a00bf3c6a..0000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/testutils.go +++ /dev/null @@ -1,139 +0,0 @@ -package bloomshipper - -import ( - "bytes" - "context" - "encoding/json" - "github.com/go-kit/log" - "os" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/storage" - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/chunk/cache" - storageconfig "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" -) - -func NewMockBloomStore(t *testing.T) (*BloomStore, string, error) { - workDir := t.TempDir() - return NewMockBloomStoreWithWorkDir(t, workDir) -} - -func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, string, error) { - periodicConfigs := []storageconfig.PeriodConfig{ - { - ObjectType: storageconfig.StorageTypeInMemory, - From: parseDayTime("2024-01-01"), - IndexTables: storageconfig.IndexPeriodicTableConfig{ - PeriodicTableConfig: storageconfig.PeriodicTableConfig{ - Period: 24 * time.Hour, - Prefix: "schema_a_table_", - }}, - }, - { - ObjectType: storageconfig.StorageTypeInMemory, - From: parseDayTime("2024-02-01"), - IndexTables: storageconfig.IndexPeriodicTableConfig{ - PeriodicTableConfig: storageconfig.PeriodicTableConfig{ - Period: 24 * time.Hour, - Prefix: "schema_b_table_", - }}, - }, - } - - storageConfig := storage.Config{ - BloomShipperConfig: config.Config{ - WorkingDirectory: workDir, - BlocksDownloadingQueue: config.DownloadingQueueConfig{ - WorkersCount: 1, - }, - BlocksCache: cache.EmbeddedCacheConfig{ - MaxSizeItems: 1000, - TTL: 1 * time.Hour, - }, - }, - } - - reg := prometheus.NewPedanticRegistry() - metrics := storage.NewClientMetrics() - t.Cleanup(metrics.Unregister) - logger := log.NewLogfmtLogger(os.Stderr) - - metasCache := cache.NewMockCache() - blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) - - store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, reg, logger) - if err == nil { - t.Cleanup(store.Stop) - } - - return store, workDir, err -} - -func CreateMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) { - meta := Meta{ - MetaRef: MetaRef{ - Ref: Ref{ - TenantID: tenant, - Bounds: v1.NewBounds(minFp, maxFp), - // Unused - // StartTimestamp: start, - // EndTimestamp: start.Add(12 * time.Hour), - }, - }, - Blocks: []BlockRef{}, - } - err := store.storeDo(start, func(s *bloomStoreEntry) error { - raw, _ := json.Marshal(meta) - meta.MetaRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0] - return s.objectClient.PutObject(context.Background(), s.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw)) - }) - return meta, err -} - -func CreateBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { - tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") - - blockWriter := v1.NewDirectoryBlockWriter(tmpDir) - err := blockWriter.Init() - require.NoError(t, err) - - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) - require.NoError(t, err) - - _, _ = fp.Seek(0, 0) - - block := Block{ - BlockRef: BlockRef{ - Ref: Ref{ - TenantID: tenant, - Bounds: v1.NewBounds(minFp, maxFp), - StartTimestamp: start, - EndTimestamp: start.Add(12 * time.Hour), - }, - }, - Data: fp, - } - err = store.storeDo(start, func(s *bloomStoreEntry) error { - block.BlockRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0] - return s.objectClient.PutObject(context.Background(), s.Block(block.BlockRef).Addr(), block.Data) - }) - return block, err -} - -func parseDayTime(s string) storageconfig.DayTime { - t, err := time.Parse("2006-01-02", s) - if err != nil { - panic(err) - } - return storageconfig.DayTime{ - Time: model.TimeFromUnix(t.Unix()), - } -}