Skip to content

Commit

Permalink
receive: upload compacted blocks if OOO enabled (thanos-io#6974)
Browse files Browse the repository at this point in the history
  • Loading branch information
GiedriusS authored Dec 12, 2023
1 parent 32f227a commit 7b8eb86
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func runReceive(
// Has this thanos receive instance been configured to ingest metrics into a local TSDB?
enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor

isOOOEnabled := tsdbOpts.OutOfOrderTimeWindow > 0

upload := len(confContentYaml) > 0
if enableIngestion {
if upload {
Expand All @@ -176,6 +178,9 @@ func runReceive(
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
if isOOOEnabled {
level.Warn(logger).Log("msg", "out-of-order support is also enabled which means that Receiver will now upload compacted blocks to not lose any data. Vertical compaction needs to be enabled on Compactor! See https://github.com/prometheus/prometheus/issues/13112")
}
}
// The background shipper continuously scans the data directory and uploads
// new blocks to object storage service.
Expand Down Expand Up @@ -213,6 +218,7 @@ func runReceive(
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
isOOOEnabled,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type MultiTSDB struct {
allowOutOfOrderUpload bool
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig
uploadCompactedBlocks bool
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -74,6 +75,7 @@ func NewMultiTSDB(
tenantLabelName string,
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
allowCompactedUpload bool,
hashFunc metadata.HashFunc,
) *MultiTSDB {
if l == nil {
Expand All @@ -91,6 +93,7 @@ func NewMultiTSDB(
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
uploadCompactedBlocks: allowCompactedUpload,
hashFunc: hashFunc,
}
}
Expand Down Expand Up @@ -596,6 +599,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
return err
}
var ship *shipper.Shipper

if t.bucket != nil {
ship = shipper.New(
logger,
Expand All @@ -604,7 +608,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lset },
metadata.ReceiveSource,
nil,
func() bool { return t.uploadCompactedBlocks },
t.allowOutOfOrderUpload,
t.hashFunc,
shipper.DefaultMetaFilename,
Expand Down
10 changes: 10 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestMultiTSDB(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -135,6 +136,7 @@ func TestMultiTSDB(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -178,6 +180,7 @@ func TestMultiTSDB(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -445,6 +448,7 @@ func TestMultiTSDBPrune(t *testing.T) {
"tenant_id",
test.bucket,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -506,6 +510,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
"tenant_id",
objstore.NewInMemBucket(),
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -567,6 +572,7 @@ func TestAlignedHeadFlush(t *testing.T) {
"tenant_id",
test.bucket,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -641,6 +647,7 @@ func TestMultiTSDBStats(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -670,6 +677,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -711,6 +719,7 @@ func TestProxyLabelValues(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -801,6 +810,7 @@ func BenchmarkMultiTSDB(b *testing.B) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB {
"tenant_id",
bucket,
false,
false,
metadata.NoneFunc,
)

Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func TestWriter(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })
Expand Down Expand Up @@ -435,6 +436,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })
Expand Down

0 comments on commit 7b8eb86

Please sign in to comment.