From 5dfacc83498082ebe8268d139f19b01632da0031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 12 Dec 2023 17:12:48 +0200 Subject: [PATCH] receive: upload compacted blocks if OOO enabled (#6974) --- cmd/thanos/receive.go | 6 ++++++ pkg/receive/handler_test.go | 1 + pkg/receive/multitsdb.go | 6 +++++- pkg/receive/multitsdb_test.go | 10 ++++++++++ pkg/receive/receive_test.go | 1 + pkg/receive/writer_test.go | 2 ++ 6 files changed, 25 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 509516debf..b478e69219 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -164,6 +164,8 @@ func runReceive( // Has this thanos receive instance been configured to ingest metrics into a local TSDB? enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor + isOOOEnabled := tsdbOpts.OutOfOrderTimeWindow > 0 + upload := len(confContentYaml) > 0 if enableIngestion { if upload { @@ -173,6 +175,9 @@ func runReceive( "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) } level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") + if isOOOEnabled { + level.Warn(logger).Log("msg", "out-of-order support is also enabled which means that Receiver will now upload compacted blocks to not lose any data. Vertical compaction needs to be enabled on Compactor! See https://github.com/prometheus/prometheus/issues/13112") + } } // The background shipper continuously scans the data directory and uploads // new blocks to object storage service. @@ -210,6 +215,7 @@ func runReceive( conf.tenantLabelName, bkt, conf.allowOutOfOrderUpload, + isOOOEnabled, hashFunc, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{ diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 7d40d83136..53f86ccc61 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -948,6 +948,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(b, m.Close()) }() diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 1364dfdee0..97e6fbec80 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -61,6 +61,7 @@ type MultiTSDB struct { allowOutOfOrderUpload bool hashFunc metadata.HashFunc hashringConfigs []HashringConfig + uploadCompactedBlocks bool } // NewMultiTSDB creates new MultiTSDB. @@ -74,6 +75,7 @@ func NewMultiTSDB( tenantLabelName string, bucket objstore.Bucket, allowOutOfOrderUpload bool, + allowCompactedUpload bool, hashFunc metadata.HashFunc, ) *MultiTSDB { if l == nil { @@ -91,6 +93,7 @@ func NewMultiTSDB( tenantLabelName: tenantLabelName, bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, + uploadCompactedBlocks: allowCompactedUpload, hashFunc: hashFunc, } } @@ -605,6 +608,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant return err } var ship *shipper.Shipper + if t.bucket != nil { ship = shipper.New( logger, @@ -613,7 +617,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.bucket, func() labels.Labels { return lset }, metadata.ReceiveSource, - nil, + func() bool { return t.uploadCompactedBlocks }, t.allowOutOfOrderUpload, t.hashFunc, ) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index cc13b5c6ab..1b6bb39af8 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -51,6 +51,7 @@ func TestMultiTSDB(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -135,6 +136,7 @@ func TestMultiTSDB(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -178,6 +180,7 @@ func TestMultiTSDB(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -445,6 +448,7 @@ func TestMultiTSDBPrune(t *testing.T) { "tenant_id", test.bucket, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -506,6 +510,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { "tenant_id", objstore.NewInMemBucket(), false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -567,6 +572,7 @@ func TestAlignedHeadFlush(t *testing.T) { "tenant_id", test.bucket, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -641,6 +647,7 @@ func TestMultiTSDBStats(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -670,6 +677,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -711,6 +719,7 @@ func TestProxyLabelValues(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -801,6 +810,7 @@ func BenchmarkMultiTSDB(b *testing.B) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) defer func() { testutil.Ok(b, m.Close()) }() diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 55b59d182a..0266923584 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -814,6 +814,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB { "tenant_id", bucket, false, + false, metadata.NoneFunc, ) diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 34613794b8..9c2ea60490 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -343,6 +343,7 @@ func TestWriter(t *testing.T) { "tenant_id", nil, false, + false, metadata.NoneFunc, ) t.Cleanup(func() { testutil.Ok(t, m.Close()) }) @@ -435,6 +436,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr "tenant_id", nil, false, + false, metadata.NoneFunc, ) b.Cleanup(func() { testutil.Ok(b, m.Close()) })