Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a DeletionMode config variable #5481

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
929cbbe
Add deletionEnabled setting and remove delete request manager
MichelHollands Feb 25, 2022
42bfebf
Rebase
MichelHollands Feb 25, 2022
a6ed435
Remove deletion handling from delete requests manager
MichelHollands Feb 25, 2022
3ce76d6
change store so it stores a logql statement
MichelHollands Feb 25, 2022
bb2b3eb
Add validation code for logql statement
MichelHollands Feb 25, 2022
9397b50
Run deleteRequestsManager when deletion is enabled
MichelHollands Feb 25, 2022
6f676aa
Remove unused variables
MichelHollands Feb 25, 2022
e1afcc3
Revert "Remove deletion handling from delete requests manager"
MichelHollands Mar 1, 2022
f6eb37e
Re-add IsDeleted method
MichelHollands Mar 1, 2022
0fbdbe1
Re-add tests for IsDeleted
MichelHollands Mar 1, 2022
5495668
Fix delete request store test
MichelHollands Mar 1, 2022
027adfd
Fix linting issue
MichelHollands Mar 1, 2022
587f6d2
Revert compactor changes
MichelHollands Mar 1, 2022
b10e121
Add deletion mode
MichelHollands Mar 1, 2022
b90811a
Add v1 mode
MichelHollands Mar 1, 2022
6a7ca3a
Rename LogQLRequest to Query
MichelHollands Mar 1, 2022
815c617
Fix linting issues
MichelHollands Mar 1, 2022
a9b8f73
Use DeleteMode in compactor module
MichelHollands Mar 1, 2022
65504d0
Rename logql to query
MichelHollands Mar 1, 2022
e75c36b
Put cancel under delete verb
MichelHollands Mar 2, 2022
1a5069b
Update documentation
MichelHollands Mar 2, 2022
dc4ab08
Update changelog
MichelHollands Mar 2, 2022
b00482c
Revert only the API surface area while keeping everything else
MasslessParticle Mar 11, 2022
38d819f
Use moved code in syntax package
MichelHollands Mar 15, 2022
3ae0aea
Remove duplicte import
MichelHollands Mar 15, 2022
b6b4734
Use renamed field in tests
MichelHollands Mar 15, 2022
beddf81
Remove duplicates and empty lines in changelog
MichelHollands Mar 15, 2022
e3dbe38
Update changelog description
MichelHollands Mar 15, 2022
1ba285a
Update pkg/storage/stores/shipper/compactor/deletion/delete_request.go
MichelHollands Mar 17, 2022
8152916
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
eb46a59
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
197e08b
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
ff27a1b
Update CHANGELOG.md
MichelHollands Mar 17, 2022
d249f18
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
0b35252
Make DeletionMode struct member non public
MichelHollands Mar 17, 2022
0b49512
Revert change to docs re cancellation
MichelHollands Mar 17, 2022
1db21ca
Use same variable names
MichelHollands Mar 17, 2022
92aa117
Add parameter validation to changelog
MichelHollands Mar 17, 2022
a574d62
Rename v1 to WholeStreamDeletion
MichelHollands Mar 17, 2022
8098e1b
Fix default value of deletion mode config setting
MichelHollands Mar 18, 2022
86a2a53
reimplement new api
MasslessParticle Mar 18, 2022
16e9ab2
Add delete request handler when delete mode is set
MichelHollands Apr 4, 2022
9bec8a1
Remove unused variable
MichelHollands Apr 4, 2022
a10466b
Add comment to change the code when other deletion modes are available
MichelHollands Apr 5, 2022
f9addc8
create expirationChecker if deletionMode is set
MichelHollands Apr 5, 2022
85a32ea
Address review comments
MichelHollands Apr 6, 2022
a12b157
Update pkg/storage/stores/shipper/compactor/compactor.go
MichelHollands Apr 6, 2022
9d6dc0d
Rename AddQuery to SetQuery
MichelHollands Apr 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Re-add IsDeleted method
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
  • Loading branch information
MichelHollands committed Apr 6, 2022
commit f6eb37e3cb92a2a8a4f65fe73482e51191b4f89b
65 changes: 63 additions & 2 deletions pkg/storage/stores/shipper/compactor/deletion/delete_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deletion

import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
Expand All @@ -13,6 +14,66 @@ type DeleteRequest struct {
Status DeleteRequestStatus `json:"status"`
CreatedAt model.Time `json:"created_at"`

UserID string `json:"-"`
Matchers [][]*labels.Matcher `json:"-"`
UserID string `json:"-"`
matchers []*labels.Matcher `json:"-"`
}

func (d *DeleteRequest) AddLogQL(logQL string) error {
d.LogQLRequest = logQL
matchers, err := parseLogQLExpressionForDeletion(logQL)
if err != nil {
return err
}
d.matchers = matchers
return nil
}

func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Interval) {
if d.UserID != unsafeGetString(entry.UserID) {
return false, nil
}

if !intervalsOverlap(model.Interval{
Start: entry.From,
End: entry.Through,
}, model.Interval{
Start: d.StartTime,
End: d.EndTime,
}) {
return false, nil
}

if !labels.Selector(d.matchers).Matches(entry.Labels) {
return false, nil
}

if d.StartTime <= entry.From && d.EndTime >= entry.Through {
return true, nil
}

intervals := make([]model.Interval, 0, 2)

if d.StartTime > entry.From {
intervals = append(intervals, model.Interval{
Start: entry.From,
End: d.StartTime - 1,
})
}

if d.EndTime < entry.Through {
intervals = append(intervals, model.Interval{
Start: d.EndTime + 1,
End: entry.Through,
})
}

return true, intervals
}

func intervalsOverlap(interval1, interval2 model.Interval) bool {
if interval1.Start > interval2.End || interval2.Start > interval1.End {
return false
}

return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)

const (
statusSuccess = "success"
statusFail = "fail"
)

type DeleteRequestsManager struct {
deleteRequestsStore DeleteRequestsStore
deleteRequestCancelPeriod time.Duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (m mockDeleteRequestsStore) UpdateStatus(ctx context.Context, userID, reque
return nil
}

func (m mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
func (m mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, LogQLRequest string) error {
panic("implement me")
}

Expand Down Expand Up @@ -89,10 +89,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "no relevant delete requests",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: "different-user",
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
UserID: "different-user",
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
Expand All @@ -104,10 +104,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "whole chunk deleted by single request",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
Expand All @@ -119,10 +119,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "deleted interval out of range",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
},
},
expectedResp: resp{
Expand All @@ -134,16 +134,16 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "multiple delete requests with one deleting the whole chunk",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
Expand All @@ -155,28 +155,28 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "multiple delete requests causing multiple holes",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-10 * time.Hour),
EndTime: now.Add(-8 * time.Hour),
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-10 * time.Hour),
EndTime: now.Add(-8 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-5 * time.Hour),
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-5 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
Expand All @@ -201,16 +201,16 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "multiple overlapping requests deleting the whole chunk",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
Expand All @@ -222,22 +222,22 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
name: "multiple non-overlapping requests deleting the whole chunk",
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
UserID: testUserID,
LogQLRequest: lblFoo.String(),
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"time"
"unsafe"

"github.com/prometheus/common/model"

Expand Down Expand Up @@ -200,7 +201,7 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu
return false
}

deleteRequest.LogQLRequest = string(itr.Value())
deleteRequest.AddLogQL(string(itr.Value()))
deleteRequests[i] = deleteRequest

return true
Expand Down Expand Up @@ -289,3 +290,8 @@ func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) {

return
}

// unsafeGetString is like yolostring but with a meaningful name
func unsafeGetString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
return
}

err = checkLogQLExpressionForDeletion(logQLStatement)
_, err = parseLogQLExpressionForDeletion(logQLStatement)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
22 changes: 7 additions & 15 deletions pkg/storage/stores/shipper/compactor/deletion/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,24 @@ import (
"errors"

"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/prometheus/model/labels"
)

var (
errInvalidLogQL = errors.New("invalid LogQL expression")
errUnsupportedLogQL = errors.New("unsupported LogQL expression")
)

// checkLogQLExpressionForDeletion checks if the given logQL is valid for deletions
func checkLogQLExpressionForDeletion(logQL string) error {
// parseLogQLExpressionForDeletion checks if the given logQL is valid for deletions
func parseLogQLExpressionForDeletion(logQL string) ([]*labels.Matcher, error) {
expr, err := logql.ParseExpr(logQL)
if err != nil {
return errInvalidLogQL
return nil, errInvalidLogQL
}

if _, ok := expr.(*logql.MatchersExpr); ok {
return nil
}
if pipelineExpr, ok := expr.(*logql.PipelineExpr); ok {
// Only support line filters for now
for _, stage := range pipelineExpr.MultiStages {
if _, ok := stage.(*logql.LineFilterExpr); !ok {
return errUnsupportedLogQL
}
}
return nil
if matchersExpr, ok := expr.(*logql.MatchersExpr); ok {
return matchersExpr.Matchers(), nil
}

return errUnsupportedLogQL
return nil, errUnsupportedLogQL
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,34 @@ import (
"github.com/stretchr/testify/require"
)

func TestInvalidLogQLExpressionForDeletion(t *testing.T) {
func TestParseLogQLExpressionForDeletion(t *testing.T) {
t.Run("invalid logql", func(t *testing.T) {
err := checkLogQLExpressionForDeletion("gjgjg ggj")
matchers, err := parseLogQLExpressionForDeletion("gjgjg ggj")
require.Nil(t, matchers)
require.ErrorIs(t, err, errInvalidLogQL)
})

t.Run("matcher expression", func(t *testing.T) {
err := checkLogQLExpressionForDeletion(`{env="dev", secret="true"}`)
matchers, err := parseLogQLExpressionForDeletion(`{env="dev", secret="true"}`)
require.NotNil(t, matchers)
require.NoError(t, err)
})

t.Run("pipeline expression with line filter", func(t *testing.T) {
err := checkLogQLExpressionForDeletion(`{env="dev", secret="true"} |= "social sec number"`)
require.NoError(t, err)
matchers, err := parseLogQLExpressionForDeletion(`{env="dev", secret="true"} |= "social sec number"`)
require.Nil(t, matchers)
require.ErrorIs(t, err, errUnsupportedLogQL)
})

t.Run("pipeline expression with label filter ", func(t *testing.T) {
err := checkLogQLExpressionForDeletion(`{env="dev", secret="true"} | json bob="top.params[0]"`)
matchers, err := parseLogQLExpressionForDeletion(`{env="dev", secret="true"} | json bob="top.params[0]"`)
require.Nil(t, matchers)
require.ErrorIs(t, err, errUnsupportedLogQL)
})

t.Run("pipeline expression with ", func(t *testing.T) {
err := checkLogQLExpressionForDeletion(`count_over_time({job="mysql"}[5m])`)
t.Run("metrics query", func(t *testing.T) {
matchers, err := parseLogQLExpressionForDeletion(`count_over_time({job="mysql"}[5m])`)
require.Nil(t, matchers)
require.ErrorIs(t, err, errUnsupportedLogQL)
})
}