Skip to content

Tenant deletion: ruler support. #3750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* `cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total`
* `cortex_ha_tracker_replicas_cleanup_deleted_total`
* `cortex_ha_tracker_replicas_cleanup_delete_failed_total`
* [ENHANCEMENT] Tenant deletion endpoints now support deletion of ruler groups. This only works when using rule store that supports deletion. #3750
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761
Expand Down
19 changes: 19 additions & 0 deletions pkg/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ func NewMockStorage() *MockStorage {
}
}

func (m *MockStorage) GetSortedObjectKeys() []string {
m.mtx.RLock()
defer m.mtx.RUnlock()

keys := make([]string, 0, len(m.objects))
for k := range m.objects {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

func (m *MockStorage) GetObjectCount() int {
m.mtx.RLock()
defer m.mtx.RUnlock()

return len(m.objects)
}

// Stop doesn't do anything.
func (*MockStorage) Stop() {
}
Expand Down
63 changes: 55 additions & 8 deletions pkg/chunk/purger/blocks_purger_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/ruler/rules"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
Expand All @@ -21,20 +22,21 @@ import (

type BlocksPurgerAPI struct {
bucketClient objstore.Bucket
ruleStore rules.RuleStore
logger log.Logger
}

func NewBlocksPurgerAPI(storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (*BlocksPurgerAPI, error) {
func NewBlocksPurgerAPI(storageCfg cortex_tsdb.BlocksStorageConfig, ruleStore rules.RuleStore, logger log.Logger, reg prometheus.Registerer) (*BlocksPurgerAPI, error) {
bucketClient, err := createBucketClient(storageCfg, logger, reg)
if err != nil {
return nil, err
}

return newBlocksPurgerAPI(bucketClient, logger), nil
return newBlocksPurgerAPI(bucketClient, ruleStore, logger), nil
}

func newBlocksPurgerAPI(bkt objstore.Bucket, logger log.Logger) *BlocksPurgerAPI {
return &BlocksPurgerAPI{bucketClient: bkt, logger: logger}
func newBlocksPurgerAPI(bkt objstore.Bucket, ruleStore rules.RuleStore, logger log.Logger) *BlocksPurgerAPI {
return &BlocksPurgerAPI{bucketClient: bkt, ruleStore: ruleStore, logger: logger}
}

func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request) {
Expand All @@ -47,19 +49,45 @@ func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request)

err = cortex_tsdb.WriteTenantDeletionMark(r.Context(), api.bucketClient, userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))
if err != nil {
level.Error(api.logger).Log("msg", "failed to write tenant deletion mark", "user", userID, "err", err)

http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

level.Info(api.logger).Log("msg", "tenant deletion marker created", "user", userID)
level.Info(api.logger).Log("msg", "tenant deletion mark in blocks storage created", "user", userID)

if api.ruleStore != nil {
err := api.deleteRules(r.Context(), userID)
if err != nil {
level.Error(api.logger).Log("msg", "failed to delete tenant rule groups", "user", userID, "err", err)
http.Error(w, errors.Wrapf(err, "failed to delete tenant rule groups").Error(), http.StatusInternalServerError)
return
}
}

w.WriteHeader(http.StatusOK)
}

func (api *BlocksPurgerAPI) deleteRules(ctx context.Context, userID string) error {
if !api.ruleStore.SupportsModifications() {
level.Warn(api.logger).Log("msg", "cannot delete tenant rule groups, using read-only rule store", "user", userID)
return nil
}

err := api.ruleStore.DeleteNamespace(ctx, userID, "") // Empty namespace = delete all rule groups.
if err != nil && !errors.Is(err, rules.ErrGroupNamespaceNotFound) {
return err
}

level.Info(api.logger).Log("msg", "deleted all tenant rule groups", "user", userID)
return nil
}

type DeleteTenantStatusResponse struct {
TenantID string `json:"tenant_id"`
BlocksDeleted bool `json:"blocks_deleted"`
RuleGroupsDeleted bool `json:"rule_groups_deleted,omitempty"` // Not yet supported.
RuleGroupsDeleted bool `json:"rule_groups_deleted"`
AlertManagerConfigDeleted bool `json:"alert_manager_config_deleted,omitempty"` // Not yet supported.
}

Expand All @@ -73,8 +101,13 @@ func (api *BlocksPurgerAPI) DeleteTenantStatus(w http.ResponseWriter, r *http.Re

result := DeleteTenantStatusResponse{}
result.TenantID = userID
result.BlocksDeleted, err = api.checkBlocksForUser(ctx, userID)
result.BlocksDeleted, err = api.isBlocksForUserDeleted(ctx, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

result.RuleGroupsDeleted, err = api.isRulesForUserDeleted(ctx, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -83,7 +116,21 @@ func (api *BlocksPurgerAPI) DeleteTenantStatus(w http.ResponseWriter, r *http.Re
util.WriteJSONResponse(w, result)
}

func (api *BlocksPurgerAPI) checkBlocksForUser(ctx context.Context, userID string) (bool, error) {
func (api *BlocksPurgerAPI) isRulesForUserDeleted(ctx context.Context, userID string) (bool, error) {
if api.ruleStore == nil {
// If purger doesn't have access to rule store, then we cannot say that rules have been deleted.
return false, nil
}

list, err := api.ruleStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
if err != nil {
return false, errors.Wrap(err, "failed to list rule groups for tenant")
}

return len(list) == 0, nil
}

func (api *BlocksPurgerAPI) isBlocksForUserDeleted(ctx context.Context, userID string) (bool, error) {
var errBlockFound = errors.New("block found")

userBucket := bucket.NewUserBucketClient(userID, api.bucketClient)
Expand Down
122 changes: 119 additions & 3 deletions pkg/chunk/purger/blocks_purger_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@ package purger
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"path"
"testing"

"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ruler/rules"
"github.com/cortexproject/cortex/pkg/ruler/rules/objectclient"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestDeleteTenant(t *testing.T) {
bkt := objstore.NewInMemBucket()
api := newBlocksPurgerAPI(bkt, log.NewNopLogger())
api := newBlocksPurgerAPI(bkt, nil, log.NewNopLogger())

{
resp := httptest.NewRecorder()
Expand Down Expand Up @@ -80,11 +85,122 @@ func TestDeleteTenantStatus(t *testing.T) {
require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data)))
}

api := newBlocksPurgerAPI(bkt, log.NewNopLogger())
api := newBlocksPurgerAPI(bkt, nil, log.NewNopLogger())

res, err := api.checkBlocksForUser(context.Background(), username)
res, err := api.isBlocksForUserDeleted(context.Background(), username)
require.NoError(t, err)
require.Equal(t, tc.expectedBlocksDeleted, res)
})
}
}

func TestDeleteTenantRuleGroups(t *testing.T) {
ruleGroups := []ruleGroupKey{
{user: "userA", namespace: "namespace", group: "group"},
{user: "userB", namespace: "namespace1", group: "group"},
{user: "userB", namespace: "namespace2", group: "group"},
}

obj, rs := setupRuleGroupsStore(t, ruleGroups)
require.Equal(t, 3, obj.GetObjectCount())

api := newBlocksPurgerAPI(objstore.NewInMemBucket(), rs, log.NewNopLogger())

{
callDeleteTenantAPI(t, api, "user-with-no-rule-groups")
require.Equal(t, 3, obj.GetObjectCount())

verifyExpectedDeletedRuleGroupsForUser(t, api, "user-with-no-rule-groups", true) // Has no rule groups
verifyExpectedDeletedRuleGroupsForUser(t, api, "userA", false)
verifyExpectedDeletedRuleGroupsForUser(t, api, "userB", false)
}

{
callDeleteTenantAPI(t, api, "userA")
require.Equal(t, 2, obj.GetObjectCount())

verifyExpectedDeletedRuleGroupsForUser(t, api, "user-with-no-rule-groups", true) // Has no rule groups
verifyExpectedDeletedRuleGroupsForUser(t, api, "userA", true) // Just deleted.
verifyExpectedDeletedRuleGroupsForUser(t, api, "userB", false)
}

{
callDeleteTenantAPI(t, api, "userB")
require.Equal(t, 0, obj.GetObjectCount())

verifyExpectedDeletedRuleGroupsForUser(t, api, "user-with-no-rule-groups", true) // Has no rule groups
verifyExpectedDeletedRuleGroupsForUser(t, api, "userA", true) // Deleted previously
verifyExpectedDeletedRuleGroupsForUser(t, api, "userB", true) // Just deleted
}
}

func TestDeleteTenantRuleGroupsWithReadOnlyStore(t *testing.T) {
ruleGroups := []ruleGroupKey{
{user: "userA", namespace: "namespace", group: "group"},
{user: "userB", namespace: "namespace1", group: "group"},
{user: "userB", namespace: "namespace2", group: "group"},
}

obj, rs := setupRuleGroupsStore(t, ruleGroups)
require.Equal(t, 3, obj.GetObjectCount())

rs = &readOnlyRuleStore{RuleStore: rs}

api := newBlocksPurgerAPI(objstore.NewInMemBucket(), rs, log.NewNopLogger())

// Make sure there is no error reported.
callDeleteTenantAPI(t, api, "userA")
require.Equal(t, 3, obj.GetObjectCount())

verifyExpectedDeletedRuleGroupsForUser(t, api, "userA", false) // Cannot delete from read-only store.
verifyExpectedDeletedRuleGroupsForUser(t, api, "userB", false)
}

func callDeleteTenantAPI(t *testing.T, api *BlocksPurgerAPI, userID string) {
ctx := user.InjectOrgID(context.Background(), userID)

req := &http.Request{}
resp := httptest.NewRecorder()
api.DeleteTenant(resp, req.WithContext(ctx))

require.Equal(t, http.StatusOK, resp.Code)
}

func verifyExpectedDeletedRuleGroupsForUser(t *testing.T, api *BlocksPurgerAPI, userID string, expected bool) {
ctx := user.InjectOrgID(context.Background(), userID)

req := &http.Request{}
resp := httptest.NewRecorder()
api.DeleteTenantStatus(resp, req.WithContext(ctx))

require.Equal(t, http.StatusOK, resp.Code)

deleteResp := &DeleteTenantStatusResponse{}
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), deleteResp))
require.Equal(t, expected, deleteResp.RuleGroupsDeleted)
}

func setupRuleGroupsStore(t *testing.T, ruleGroups []ruleGroupKey) (*chunk.MockStorage, rules.RuleStore) {
obj := chunk.NewMockStorage()
rs := objectclient.NewRuleStore(obj, 5)

// "upload" rule groups
for _, key := range ruleGroups {
desc := rules.ToProto(key.user, key.namespace, rulefmt.RuleGroup{Name: key.group})
require.NoError(t, rs.SetRuleGroup(context.Background(), key.user, key.namespace, desc))
}

return obj, rs
}

type ruleGroupKey struct {
user, namespace, group string
}

type readOnlyRuleStore struct {
rules.RuleStore
}

func (r *readOnlyRuleStore) SupportsModifications() bool {
return false
}
14 changes: 11 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,14 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
level.Info(util_log.Logger).Log("msg", "Ruler storage is not configured in single binary mode and will not be started.")
return
}

// Purger didn't use ruler storage before, but now it does. However empty configuration just causes error,
// so to preserve previous purger behaviour, we simply disable it.
if t.Cfg.isModuleEnabled(Purger) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "Ruler storage is not configured. If you want to use tenant deletion API and delete rule groups, please configure ruler storage.")
return
}

Expand Down Expand Up @@ -780,7 +787,8 @@ func (t *Cortex) initBlocksPurger() (services.Service, error) {
return nil, nil
}

purgerAPI, err := purger.NewBlocksPurgerAPI(t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer)
// t.RulerStorage can be nil when running in single-binary mode, and rule storage is not configured.
purgerAPI, err := purger.NewBlocksPurgerAPI(t.Cfg.BlocksStorage, t.RulerStorage, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -864,7 +872,7 @@ func (t *Cortex) setupModuleManager() error {
Compactor: {API, MemberlistKV},
StoreGateway: {API, Overrides, MemberlistKV},
ChunksPurger: {Store, DeleteRequestsStore, API},
BlocksPurger: {Store, API},
BlocksPurger: {Store, API, RulerStorage},
Purger: {ChunksPurger, BlocksPurger},
TenantFederation: {Queryable},
All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Purger, StoreGateway, Ruler},
Expand Down
4 changes: 3 additions & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
if _, exists := ruleGroups[userID]; !exists {
go mngr.Stop()
delete(r.userManagers, userID)

r.mapper.cleanupUser(userID)
r.lastReloadSuccessful.DeleteLabelValues(userID)
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
r.configUpdatesTotal.DeleteLabelValues(userID)
r.userManagerMetrics.RemoveUserRegistry(userID)
level.Info(r.logger).Log("msg", "deleting rule manager", "user", userID)
level.Info(r.logger).Log("msg", "deleted rule manager and local rule files", "user", userID)
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ func TestSyncRuleGroups(t *testing.T) {
return mgr.(*mockRulesManager).running.Load()
})

// Verify that user rule groups are now cached locally.
{
users, err := m.mapper.users()
require.NoError(t, err)
require.Equal(t, []string{user}, users)
}

// Passing empty map / nil stops all managers.
m.SyncRuleGroups(context.Background(), nil)
require.Nil(t, getManager(m, user))
Expand All @@ -59,6 +66,13 @@ func TestSyncRuleGroups(t *testing.T) {
return mgr.(*mockRulesManager).running.Load()
})

// Verify that local rule groups were removed.
{
users, err := m.mapper.users()
require.NoError(t, err)
require.Equal(t, []string(nil), users)
}

// Resync same rules as before. Previously this didn't restart the manager.
m.SyncRuleGroups(context.Background(), userRules)

Expand All @@ -70,6 +84,13 @@ func TestSyncRuleGroups(t *testing.T) {
return newMgr.(*mockRulesManager).running.Load()
})

// Verify that user rule groups are cached locally again.
{
users, err := m.mapper.users()
require.NoError(t, err)
require.Equal(t, []string{user}, users)
}

m.Stop()

test.Poll(t, 1*time.Second, false, func() interface{} {
Expand Down
Loading