Skip to content

Commit

Permalink
Logs deletion support for boltdb-shipper store (#3688)
Browse files Browse the repository at this point in the history
* add support for slicing the chunks for partial chunk deletion

* add delete requests store and handler for requests

* add delete requests manager and a way to check for chunk expiry based on delete requests

* add chunk rewriter for partially deleted chunks

* hook delete requests manager into compactor and update expiration checker

* use chunk client in sweeper for deleting chunks

* upload delete requests table during shutdown and some code refactoring

* add tests and metrics

* fixes post rebase

* lint

* changes suggested from PR review

* avoid allocation when getting userid from chunkid

* use bytes.IndexByte instead
  • Loading branch information
sandeepsukhani authored May 18, 2021
1 parent fa24735 commit 2b7a6f2
Show file tree
Hide file tree
Showing 23 changed files with 2,277 additions and 71 deletions.
4 changes: 4 additions & 0 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (c *dumbChunk) Close() error {
return nil
}

func (c *dumbChunk) Rebound(start, end time.Time) (Chunk, error) {
return nil, nil
}

type dumbChunkIterator struct {
direction logproto.Direction
i int
Expand Down
12 changes: 12 additions & 0 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package chunkenc
import (
"io"

"github.com/prometheus/common/model"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
)

Expand Down Expand Up @@ -83,6 +85,16 @@ func (f Facade) LokiChunk() Chunk {
return f.c
}

func (f Facade) Rebound(start, end model.Time) (encoding.Chunk, error) {
newChunk, err := f.c.Rebound(start.Time(), end.Time())
if err != nil {
return nil, err
}
return &Facade{
c: newChunk,
}, nil
}

// UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk.
func UncompressedSize(c encoding.Chunk) (int, bool) {
f, ok := c.(*Facade)
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type Chunk interface {
CompressedSize() int
Close() error
Encoding() Encoding
Rebound(start, end time.Time) (Chunk, error)
}

// Block is a chunk block.
Expand Down
39 changes: 39 additions & 0 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk/encoding"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand All @@ -31,6 +34,11 @@ const (

blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024

// defaultBlockSize is used for target block size when cutting partially deleted chunks from a delete request.
// This could wary from configured block size using `ingester.chunks-block-size` flag or equivalent yaml config resulting in
// different block size in the new chunk which should be fine.
defaultBlockSize = 256 * 1024
)

var magicNumber = uint32(0x12EE56A)
Expand Down Expand Up @@ -756,6 +764,37 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {
return blocks
}

// Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {
// add a nanosecond to end time because the Chunk.Iterator considers end time to be non-inclusive.
itr, err := c.Iterator(context.Background(), start, end.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
if err != nil {
return nil, err
}

// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk := NewMemChunk(c.Encoding(), defaultBlockSize, c.CompressedSize())

for itr.Next() {
entry := itr.Entry()
if err := newChunk.Append(&entry); err != nil {
return nil, err
}
}

if newChunk.Size() == 0 {
return nil, encoding.ErrSliceNoDataInRange
}

if err := newChunk.Close(); err != nil {
return nil, err
}

return newChunk, nil
}

// encBlock is an internal wrapper for a block, mainly to avoid binding an encoding in a block itself.
// This may seem roundabout, but the encoding is already a field on the parent MemChunk type. encBlock
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
Expand Down
95 changes: 95 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk/encoding"

"github.com/grafana/loki/pkg/chunkenc/testdata"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -1055,3 +1057,96 @@ func Test_HeadIteratorReverse(t *testing.T) {
require.NoError(t, c.cut())
assertOrder(t, i)
}

func TestMemChunk_Rebound(t *testing.T) {
chkFrom := time.Unix(0, 0)
chkThrough := chkFrom.Add(time.Hour)
originalChunk := buildTestMemChunk(t, chkFrom, chkThrough)

for _, tc := range []struct {
name string
sliceFrom, sliceTo time.Time
err error
}{
{
name: "slice whole chunk",
sliceFrom: chkFrom,
sliceTo: chkThrough,
},
{
name: "slice first half",
sliceFrom: chkFrom,
sliceTo: chkFrom.Add(30 * time.Minute),
},
{
name: "slice second half",
sliceFrom: chkFrom.Add(30 * time.Minute),
sliceTo: chkThrough,
},
{
name: "slice in the middle",
sliceFrom: chkFrom.Add(15 * time.Minute),
sliceTo: chkFrom.Add(45 * time.Minute),
},
{
name: "slice interval not aligned with sample intervals",
sliceFrom: chkFrom.Add(time.Second),
sliceTo: chkThrough.Add(-time.Second),
},
{
name: "slice out of bounds without overlap",
err: encoding.ErrSliceNoDataInRange,
sliceFrom: chkThrough.Add(time.Minute),
sliceTo: chkThrough.Add(time.Hour),
},
{
name: "slice out of bounds with overlap",
sliceFrom: chkFrom.Add(10 * time.Minute),
sliceTo: chkThrough.Add(10 * time.Minute),
},
} {
t.Run(tc.name, func(t *testing.T) {
newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo)
if tc.err != nil {
require.Equal(t, tc.err, err)
return
}
require.NoError(t, err)

// iterate originalChunk from slice start to slice end + nanosecond. Adding a nanosecond here to be inclusive of sample at end time.
originalChunkItr, err := originalChunk.Iterator(context.Background(), tc.sliceFrom, tc.sliceTo.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)

// iterate newChunk for whole chunk interval which should include all the samples in the chunk and hence align it with expected values.
newChunkItr, err := newChunk.Iterator(context.Background(), chkFrom, chkThrough, logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)

for {
originalChunksHasMoreSamples := originalChunkItr.Next()
newChunkHasMoreSamples := newChunkItr.Next()

// either both should have samples or none of them
require.Equal(t, originalChunksHasMoreSamples, newChunkHasMoreSamples)
if !originalChunksHasMoreSamples {
break
}

require.Equal(t, originalChunkItr.Entry(), newChunkItr.Entry())
}

})
}
}

func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {
chk := NewMemChunk(EncGZIP, defaultBlockSize, 0)
for ; from.Before(through); from = from.Add(time.Second) {
err := chk.Append(&logproto.Entry{
Line: from.String(),
Timestamp: from,
})
require.NoError(t, err)
}

return chk
}
6 changes: 6 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,12 @@ func (t *Loki) initCompactor() (services.Service, error) {
return nil, err
}

if t.Cfg.CompactorConfig.RetentionEnabled {
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/admin/cancel_delete_request").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
}

return t.compactor, nil
}

Expand Down
93 changes: 75 additions & 18 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/objectclient"
"github.com/cortexproject/cortex/pkg/chunk/storage"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
Expand All @@ -27,13 +31,14 @@ import (
const delimiter = "/"

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
}

// RegisterFlags registers flags.
Expand All @@ -45,6 +50,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.")
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "purger.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
}

func (cfg *Config) IsDefaults() bool {
Expand All @@ -60,11 +66,14 @@ func (cfg *Config) Validate() error {
type Compactor struct {
services.Service

cfg Config
objectClient chunk.ObjectClient
tableMarker retention.TableMarker
sweeper *retention.Sweeper
metrics *metrics
cfg Config
objectClient chunk.ObjectClient
tableMarker retention.TableMarker
sweeper *retention.Sweeper
deleteRequestsStore deletion.DeleteRequestsStore
DeleteRequestsHandler *deletion.DeleteRequestHandler
deleteRequestsManager *deletion.DeleteRequestsManager
metrics *metrics
}

func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) {
Expand All @@ -83,20 +92,39 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st
}
prefixedClient := util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix)

var encoder objectclient.KeyEncoder
if _, ok := objectClient.(*local.FSObjectClient); ok {
encoder = objectclient.Base64Encoder
}

chunkClient := objectclient.NewClient(objectClient, encoder)

retentionWorkDir := filepath.Join(cfg.WorkingDirectory, "retention")
sweeper, err := retention.NewSweeper(retentionWorkDir, chunkClient, cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r)
if err != nil {
return nil, err
}

sweeper, err := retention.NewSweeper(retentionWorkDir, retention.NewDeleteClient(objectClient), cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r)
deletionWorkDir := filepath.Join(cfg.WorkingDirectory, "deletion")

deletesStore, err := deletion.NewDeleteStore(deletionWorkDir, prefixedClient)
if err != nil {
return nil, err
}

compactor := &Compactor{
cfg: cfg,
objectClient: prefixedClient,
metrics: newMetrics(r),
sweeper: sweeper,
cfg: cfg,
objectClient: prefixedClient,
metrics: newMetrics(r),
sweeper: sweeper,
deleteRequestsStore: deletesStore,
DeleteRequestsHandler: deletion.NewDeleteRequestHandler(deletesStore, time.Hour, r),
deleteRequestsManager: deletion.NewDeleteRequestsManager(deletesStore, cfg.DeleteRequestCancelPeriod, r),
}
marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, retention.NewExpirationChecker(limits), r)

expirationChecker := newExpirationChecker(retention.NewExpirationChecker(limits), compactor.deleteRequestsManager)

marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, expirationChecker, chunkClient, r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,6 +172,8 @@ func (c *Compactor) loop(ctx context.Context) error {
}

wg.Wait()
c.deleteRequestsManager.Stop()
c.deleteRequestsStore.Stop()
return nil
}

Expand All @@ -166,12 +196,22 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
status := statusSuccess
start := time.Now()

if c.cfg.RetentionEnabled {
c.deleteRequestsManager.MarkPhaseStarted()
}

defer func() {
c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc()
dmCallback := c.deleteRequestsManager.MarkPhaseFailed
if status == statusSuccess {
dmCallback = c.deleteRequestsManager.MarkPhaseFinished
c.metrics.compactTablesOperationDurationSeconds.Set(time.Since(start).Seconds())
c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
}

if c.cfg.RetentionEnabled {
dmCallback()
}
}()

_, dirs, err := c.objectClient.List(ctx, "", delimiter)
Expand Down Expand Up @@ -199,3 +239,20 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {

return nil
}

type expirationChecker struct {
retentionExpiryChecker retention.ExpirationChecker
deletionExpiryChecker retention.ExpirationChecker
}

func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retention.ExpirationChecker) retention.ExpirationChecker {
return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker}
}

func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) {
if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired {
return expired, nonDeletedIntervals
}

return e.deletionExpiryChecker.Expired(ref, now)
}
Loading

0 comments on commit 2b7a6f2

Please sign in to comment.