Skip to content

Write block deletion marks in the global location too #3561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"time"

Expand All @@ -20,6 +19,7 @@ import (

"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -49,30 +49,31 @@ func testBlocksCleanerWithConcurrency(t *testing.T, concurrency int) {
// Create a bucket client on the local storage.
bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

// Create blocks.
ctx := context.Background()
now := time.Now()
deletionDelay := 12 * time.Hour
block1 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 10, 20, nil)
block2 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 20, 30, nil)
block3 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 30, 40, nil)
block1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, nil)
block2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, nil)
block3 := createTSDBBlock(t, bucketClient, "user-1", 30, 40, nil)
block4 := ulid.MustNew(4, rand.Reader)
block5 := ulid.MustNew(5, rand.Reader)
block6 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 40, 50, nil)
block7 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 10, 20, nil)
block8 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 40, 50, nil)
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold.
require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark.
createDeletionMark(t, filepath.Join(storageDir, "user-2"), block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
block6 := createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil)
block7 := createTSDBBlock(t, bucketClient, "user-2", 10, 20, nil)
block8 := createTSDBBlock(t, bucketClient, "user-2", 40, 50, nil)
createDeletionMark(t, bucketClient, "user-1", block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet.
createDeletionMark(t, bucketClient, "user-1", block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
createDeletionMark(t, bucketClient, "user-1", block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet.
createDeletionMark(t, bucketClient, "user-1", block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold.
require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark.
createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.

// Blocks for user-3, marked for deletion.
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3"))
block9 := createTSDBBlock(t, filepath.Join(storageDir, "user-3"), 10, 30, nil)
block10 := createTSDBBlock(t, filepath.Join(storageDir, "user-3"), 30, 50, nil)
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)

cfg := BlocksCleanerConfig{
DataDir: dataDir,
Expand All @@ -96,14 +97,18 @@ func testBlocksCleanerWithConcurrency(t *testing.T, concurrency int) {
// Check the storage to ensure only the block which has reached the deletion threshold
// has been effectively deleted.
{path: path.Join("user-1", block1.String(), metadata.MetaFilename), expectedExists: true},
{path: path.Join("user-1", block2.String(), metadata.MetaFilename), expectedExists: true},
{path: path.Join("user-1", block3.String(), metadata.MetaFilename), expectedExists: false},
{path: path.Join("user-2", block7.String(), metadata.MetaFilename), expectedExists: false},
{path: path.Join("user-2", block8.String(), metadata.MetaFilename), expectedExists: true},
// Should not delete a block with deletion mark who hasn't reached the deletion threshold yet.
{path: path.Join("user-1", block2.String(), metadata.MetaFilename), expectedExists: true},
{path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block2)), expectedExists: true},
// Should delete a partial block with deletion mark who hasn't reached the deletion threshold yet.
{path: path.Join("user-1", block4.String(), metadata.DeletionMarkFilename), expectedExists: false},
{path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block4)), expectedExists: false},
// Should delete a partial block with deletion mark who has reached the deletion threshold.
{path: path.Join("user-1", block5.String(), metadata.DeletionMarkFilename), expectedExists: false},
{path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block5)), expectedExists: false},
// Should not delete a partial block without deletion mark.
{path: path.Join("user-1", block6.String(), "index"), expectedExists: true},
// Should completely delete blocks for user-3, marked for deletion
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -270,6 +271,9 @@ func (c *Compactor) starting(ctx context.Context) error {
return errors.Wrap(err, "failed to initialize compactor objects")
}

// Wrap the bucket client to write block deletion marks in the global location too.
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)

// Create the users scanner.
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUser, c.parentLogger)

Expand Down
43 changes: 30 additions & 13 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package compactor

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -538,6 +538,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", []string{"user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json"}, nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", nil)
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)

c, _, tsdbPlanner, logs, registry, cleanup := prepare(t, cfg, bucketClient)
Expand Down Expand Up @@ -863,7 +864,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
}
}

func createTSDBBlock(t *testing.T, dir string, minT, maxT int64, externalLabels map[string]string) ulid.ULID {
func createTSDBBlock(t *testing.T, bkt objstore.Bucket, userID string, minT, maxT int64, externalLabels map[string]string) ulid.ULID {
// Create a temporary dir for TSDB.
tempDir, err := ioutil.TempDir(os.TempDir(), "tsdb")
require.NoError(t, err)
Expand Down Expand Up @@ -916,24 +917,40 @@ func createTSDBBlock(t *testing.T, dir string, minT, maxT int64, externalLabels
_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(snapshotDir, blockID.String()), meta, nil)
require.NoError(t, err)

// Ensure the output directory exists.
require.NoError(t, os.MkdirAll(dir, os.ModePerm))
// Copy the block files to the bucket.
srcRoot := filepath.Join(snapshotDir, blockID.String())
require.NoError(t, filepath.Walk(srcRoot, func(file string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}

// Copy the block files to the storage dir.
require.NoError(t, exec.Command("cp", "-r", filepath.Join(snapshotDir, blockID.String()), dir).Run())
// Read the file content in memory.
content, err := ioutil.ReadFile(file)
if err != nil {
return err
}

// Upload it to the bucket.
relPath, err := filepath.Rel(srcRoot, file)
if err != nil {
return err
}

return bkt.Upload(context.Background(), path.Join(userID, blockID.String(), relPath), bytes.NewReader(content))
}))

return blockID
}

func createDeletionMark(t *testing.T, dir string, blockID ulid.ULID, deletionTime time.Time) {
func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID, deletionTime time.Time) {
content := mockDeletionMarkJSON(blockID.String(), deletionTime)
blockPath := filepath.Join(dir, blockID.String())
markPath := filepath.Join(blockPath, metadata.DeletionMarkFilename)

// Ensure the block directory exists.
require.NoError(t, os.MkdirAll(blockPath, os.ModePerm))
blockPath := path.Join(userID, blockID.String())
markPath := path.Join(blockPath, metadata.DeletionMarkFilename)

require.NoError(t, ioutil.WriteFile(markPath, []byte(content), os.ModePerm))
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
}

func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
Expand Down
99 changes: 25 additions & 74 deletions pkg/querier/blocks_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package querier

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand All @@ -21,13 +19,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand All @@ -36,10 +34,10 @@ func TestBlocksScanner_InitialScan(t *testing.T) {
s, bucket, _, reg, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

user1Block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
user1Block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
user2Block1 := mockStorageBlock(t, bucket, "user-2", 10, 20)
user2Mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-2", user2Block1))
user1Block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
user1Block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
user2Block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-2", 10, 20)
user2Mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-2", user2Block1))

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand Down Expand Up @@ -224,9 +222,9 @@ func TestBlocksScanner_PeriodicScanFindsNewUser(t *testing.T) {
require.Equal(t, 0, len(blocks))
assert.Empty(t, deletionMarks)

block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
mark2 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block2))
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
mark2 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block2))

// Trigger a periodic sync
require.NoError(t, s.scan(ctx))
Expand All @@ -248,7 +246,7 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) {
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand All @@ -259,7 +257,7 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) {
assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second)
assert.Empty(t, deletionMarks)

block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)

// Trigger a periodic sync
require.NoError(t, s.scan(ctx))
Expand All @@ -279,8 +277,8 @@ func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand All @@ -291,7 +289,7 @@ func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
assert.Equal(t, block1.ULID, blocks[1].ID)
assert.Empty(t, deletionMarks)

mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block1))
mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block1))

// Trigger a periodic sync
require.NoError(t, s.scan(ctx))
Expand All @@ -311,8 +309,8 @@ func TestBlocksScanner_PeriodicScanFindsDeletedBlock(t *testing.T) {
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand Down Expand Up @@ -340,8 +338,8 @@ func TestBlocksScanner_PeriodicScanFindsDeletedUser(t *testing.T) {
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand All @@ -368,8 +366,8 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand All @@ -390,7 +388,7 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing
require.Equal(t, 0, len(blocks))
assert.Empty(t, deletionMarks)

block3 := mockStorageBlock(t, bucket, "user-1", 30, 40)
block3 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 30, 40)

// Trigger a periodic sync
require.NoError(t, s.scan(ctx))
Expand All @@ -407,11 +405,11 @@ func TestBlocksScanner_GetBlocks(t *testing.T) {
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
defer cleanup()

block1 := mockStorageBlock(t, bucket, "user-1", 10, 15)
block2 := mockStorageBlock(t, bucket, "user-1", 12, 20)
block3 := mockStorageBlock(t, bucket, "user-1", 20, 30)
block4 := mockStorageBlock(t, bucket, "user-1", 30, 40)
mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block3))
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 15)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 12, 20)
block3 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
block4 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 30, 40)
mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block3))

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

Expand Down Expand Up @@ -523,50 +521,3 @@ func prepareBlocksScannerConfig() BlocksScannerConfig {
IgnoreDeletionMarksDelay: time.Hour,
}
}

func mockStorageBlock(t *testing.T, bucket objstore.Bucket, userID string, minT, maxT int64) tsdb.BlockMeta {
// Generate a block ID whose timestamp matches the maxT (for simplicity we assume it
// has been compacted and shipped in zero time, even if not realistic).
id := ulid.MustNew(uint64(maxT), rand.Reader)

meta := tsdb.BlockMeta{
Version: 1,
ULID: id,
MinTime: minT,
MaxTime: maxT,
Compaction: tsdb.BlockMetaCompaction{
Level: 1,
Sources: []ulid.ULID{id},
},
}

metaContent, err := json.Marshal(meta)
if err != nil {
panic("failed to marshal mocked block meta")
}

metaContentReader := strings.NewReader(string(metaContent))
metaPath := fmt.Sprintf("%s/%s/meta.json", userID, id.String())
require.NoError(t, bucket.Upload(context.Background(), metaPath, metaContentReader))

return meta
}

func mockStorageDeletionMark(t *testing.T, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *metadata.DeletionMark {
mark := metadata.DeletionMark{
ID: meta.ULID,
DeletionTime: time.Now().Add(-time.Minute).Unix(),
Version: metadata.DeletionMarkVersion1,
}

markContent, err := json.Marshal(mark)
if err != nil {
panic("failed to marshal mocked block meta")
}

markContentReader := strings.NewReader(string(markContent))
markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), metadata.DeletionMarkFilename)
require.NoError(t, bucket.Upload(context.Background(), markPath, markContentReader))

return &mark
}
5 changes: 3 additions & 2 deletions pkg/storage/tsdb/bucketindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

const (
IndexFilename = "bucket-index.json"
IndexVersion1 = 1
IndexFilename = "bucket-index.json"
IndexCompressedFilename = IndexFilename + ".gz"
IndexVersion1 = 1

SegmentsFormatUnknown = ""

Expand Down
Loading