Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: upload compacted blocks if OOO enabled #6974

Merged
merged 2 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func runReceive(
// Has this thanos receive instance been configured to ingest metrics into a local TSDB?
enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor

isOOOEnabled := tsdbOpts.OutOfOrderTimeWindow > 0

upload := len(confContentYaml) > 0
if enableIngestion {
if upload {
Expand All @@ -176,6 +178,9 @@ func runReceive(
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
if isOOOEnabled {
level.Warn(logger).Log("msg", "out-of-order support is also enabled which means that Receiver will now upload compacted blocks to not lose any data. Vertical compaction needs to be enabled on Compactor! See https://github.com/prometheus/prometheus/issues/13112")
}
}
// The background shipper continuously scans the data directory and uploads
// new blocks to object storage service.
Expand Down Expand Up @@ -213,6 +218,7 @@ func runReceive(
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
isOOOEnabled,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type MultiTSDB struct {
allowOutOfOrderUpload bool
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig
uploadCompactedBlocks bool
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -74,6 +75,7 @@ func NewMultiTSDB(
tenantLabelName string,
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
allowCompactedUpload bool,
hashFunc metadata.HashFunc,
) *MultiTSDB {
if l == nil {
Expand All @@ -91,6 +93,7 @@ func NewMultiTSDB(
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
uploadCompactedBlocks: allowCompactedUpload,
hashFunc: hashFunc,
}
}
Expand Down Expand Up @@ -596,6 +599,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
return err
}
var ship *shipper.Shipper

if t.bucket != nil {
ship = shipper.New(
logger,
Expand All @@ -604,7 +608,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lset },
metadata.ReceiveSource,
nil,
func() bool { return t.uploadCompactedBlocks },
t.allowOutOfOrderUpload,
t.hashFunc,
shipper.DefaultMetaFilename,
Expand Down
10 changes: 10 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestMultiTSDB(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -135,6 +136,7 @@ func TestMultiTSDB(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -178,6 +180,7 @@ func TestMultiTSDB(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -445,6 +448,7 @@ func TestMultiTSDBPrune(t *testing.T) {
"tenant_id",
test.bucket,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -506,6 +510,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
"tenant_id",
objstore.NewInMemBucket(),
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -567,6 +572,7 @@ func TestAlignedHeadFlush(t *testing.T) {
"tenant_id",
test.bucket,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -641,6 +647,7 @@ func TestMultiTSDBStats(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -670,6 +677,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -711,6 +719,7 @@ func TestProxyLabelValues(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()
Expand Down Expand Up @@ -801,6 +810,7 @@ func BenchmarkMultiTSDB(b *testing.B) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB {
"tenant_id",
bucket,
false,
false,
metadata.NoneFunc,
)

Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func TestWriter(t *testing.T) {
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })
Expand Down Expand Up @@ -435,6 +436,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
"tenant_id",
nil,
false,
false,
metadata.NoneFunc,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })
Expand Down
Loading