From 4809285c916d3df02402630f16077420ccb58b7a Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 17 Jun 2020 08:20:05 -0600 Subject: [PATCH] fix: remove urm create from task create request. (#18519) * fix: remove urm create from task create request. This will greatly reduce the amount of urms created in the system. To make this change to the system we need to also remove our direct reliance on the urm's in tasks. Remove the create and delete portion of the task actions that are creating and deleting urms Remove reliance on urm's when we are doing FindTaskByUser and instead rely on the user filter matching the task.OwnerID One test had to be changed because the test was explicitly hacking the task to remove the owner ID and then trying to successfully look up the task by ownerID * fix: add in feature flag * fix: apply cursor cleanliness and permission error handling --- flags.yml | 7 ++++ kit/feature/list.go | 16 ++++++++ kv/task.go | 95 ++++++++++++++++++++++++++++++++++++++++++--- kv/task_test.go | 4 +- 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/flags.yml b/flags.yml index 6b53cc13f5e..34a207aefad 100644 --- a/flags.yml +++ b/flags.yml @@ -123,3 +123,10 @@ default: false contact: Query Team lifetime: temporary + +- name: Urm Free Tasks + description: allow task system to operate without creating additional urms + key: urmFreeTasks + default: false + contact: Lyon Hill + lifetime: temporary diff --git a/kit/feature/list.go b/kit/feature/list.go index 2a656b008c7..68090e9fce4 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -226,6 +226,20 @@ func MemoryOptimizedFill() BoolFlag { return memoryOptimizedFill } +var urmFreeTasks = MakeBoolFlag( + "Urm Free Tasks", + "urmFreeTasks", + "Lyon Hill", + false, + Temporary, + false, +) + +// UrmFreeTasks - allow task system to operate without creating additional urms +func UrmFreeTasks() BoolFlag { + return urmFreeTasks +} + var all = []Flag{ appMetrics, backendExample, @@ -243,6 +257,7 @@ var all = []Flag{ newLabels, hydratevars, memoryOptimizedFill, + urmFreeTasks, } var byKey = map[string]Flag{ @@ -262,4 +277,5 @@ var byKey = map[string]Flag{ "newLabels": newLabels, "hydratevars": hydratevars, "memoryOptimizedFill": memoryOptimizedFill, + "urmFreeTasks": urmFreeTasks, } diff --git a/kv/task.go b/kv/task.go index 1fd6a140d5c..d61cbe1c9b7 100644 --- a/kv/task.go +++ b/kv/task.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" + "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/resource" "github.com/influxdata/influxdb/v2/task/options" "go.uber.org/zap" @@ -230,7 +231,6 @@ func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([] } func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { - var org *influxdb.Organization var err error if filter.OrganizationID != nil { @@ -262,7 +262,9 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt userAuth, err := icontext.GetAuthorizer(ctx) if err == nil { userID := userAuth.GetUserID() - filter.User = &userID + if userID.Valid() { + filter.User = &userID + } } } @@ -278,6 +280,14 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt // findTasksByUser is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + if feature.UrmFreeTasks().Enabled(ctx) { + return s.findTasksByUserUrmFree(ctx, tx, filter) + } + return s.findTasksByUserWithURM(ctx, tx, filter) +} + +// findTasksByUser is a subset of the find tasks function. Used for cleanliness +func (s *Service) findTasksByUserWithURM(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { if filter.User == nil { return nil, 0, influxdb.ErrTaskNotFound } @@ -352,6 +362,70 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta return ts, len(ts), nil } +func (s *Service) findTasksByUserUrmFree(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { + var ts []*influxdb.Task + + taskBucket, err := tx.Bucket(taskBucket) + if err != nil { + return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) + } + + var ( + seek []byte + opts []CursorOption + ) + + if filter.After != nil { + seek, err = taskKey(*filter.After) + if err != nil { + return nil, 0, err + } + + opts = append(opts, WithCursorSkipFirstItem()) + } + + c, err := taskBucket.ForwardCursor(seek, opts...) + if err != nil { + return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) + } + + ps, err := s.maxPermissions(ctx, tx, *filter.User) + if err != nil { + return nil, 0, err + } + + matchFn := newTaskMatchFn(filter, nil) + + for k, v := c.Next(); k != nil; k, v = c.Next() { + kvTask := &kvTask{} + if err := json.Unmarshal(v, kvTask); err != nil { + return nil, 0, influxdb.ErrInternalTaskServiceError(err) + } + + t := kvToInfluxTask(kvTask) + if matchFn == nil || matchFn(t) { + t.Authorization = &influxdb.Authorization{ + Status: influxdb.Active, + UserID: t.OwnerID, + ID: influxdb.ID(1), + OrgID: t.OrganizationID, + Permissions: ps, + } + + ts = append(ts, t) + + if len(ts) >= filter.Limit { + break + } + } + } + if err := c.Err(); err != nil { + return nil, 0, err + } + + return ts, len(ts), c.Close() +} + // findTasksByOrg is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var org *influxdb.Organization @@ -488,6 +562,14 @@ func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *i } } + if f.User != nil { + prevFn := fn + fn = func(t *influxdb.Task) bool { + res := prevFn == nil || prevFn(t) + return res && t.OwnerID == *f.User + } + } + return fn } @@ -496,7 +578,6 @@ func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *i // Enforcing filters should be done in a validation layer. func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var ts []*influxdb.Task - taskBucket, err := tx.Bucket(taskBucket) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) @@ -667,8 +748,10 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } - if err := s.createTaskURM(ctx, tx, task); err != nil { - s.log.Info("Error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) + if !feature.UrmFreeTasks().Enabled(ctx) { + if err := s.createTaskURM(ctx, tx, task); err != nil { + s.log.Info("Error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) + } } // populate permissions so the task can be used immediately @@ -937,7 +1020,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { if err := s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{ ResourceID: task.ID, }); err != nil { - s.log.Info("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) + s.log.Debug("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } uid, _ := icontext.GetUserID(ctx) diff --git a/kv/task_test.go b/kv/task_test.go index 012446f652e..f7c8868e4bc 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -175,7 +175,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) { t.Fatal("miss matching taskID's") } - tasks, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{}) + tasks, _, err := ts.Service.FindTasks(context.Background(), influxdb.TaskFilter{}) if err != nil { t.Fatal(err) } @@ -185,7 +185,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) { // test status filter active := string(influxdb.TaskActive) - tasksWithActiveFilter, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{Status: &active}) + tasksWithActiveFilter, _, err := ts.Service.FindTasks(context.Background(), influxdb.TaskFilter{Status: &active}) if err != nil { t.Fatal("could not find tasks") }