Skip to content

Commit

Permalink
fix: remove urm create from task create request. (#18519)
Browse files Browse the repository at this point in the history
* fix: remove urm create from task create request.

This will greatly reduce the amount of urms created in the system. To make this change to the system we need to also remove our direct reliance on the urm's in tasks.
Remove the create and delete portion of the task actions that are creating and deleting urms
Remove reliance on urm's when we are doing FindTaskByUser and instead rely on the user filter matching the task.OwnerID

One test had to be changed because the test was explicitly hacking the task to remove the owner ID and then trying to successfully look up the task by ownerID

* fix: add in feature flag

* fix: apply cursor cleanliness and permission error handling
  • Loading branch information
lyondhill authored Jun 17, 2020
1 parent 5d1a759 commit 4809285
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 8 deletions.
7 changes: 7 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,10 @@
default: false
contact: Query Team
lifetime: temporary

- name: Urm Free Tasks
description: allow task system to operate without creating additional urms
key: urmFreeTasks
default: false
contact: Lyon Hill
lifetime: temporary
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 89 additions & 6 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/resource"
"github.com/influxdata/influxdb/v2/task/options"
"go.uber.org/zap"
Expand Down Expand Up @@ -230,7 +231,6 @@ func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]
}

func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {

var org *influxdb.Organization
var err error
if filter.OrganizationID != nil {
Expand Down Expand Up @@ -262,7 +262,9 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt
userAuth, err := icontext.GetAuthorizer(ctx)
if err == nil {
userID := userAuth.GetUserID()
filter.User = &userID
if userID.Valid() {
filter.User = &userID
}
}
}

Expand All @@ -278,6 +280,14 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt

// findTasksByUser is a subset of the find tasks function. Used for cleanliness
func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
if feature.UrmFreeTasks().Enabled(ctx) {
return s.findTasksByUserUrmFree(ctx, tx, filter)
}
return s.findTasksByUserWithURM(ctx, tx, filter)
}

// findTasksByUser is a subset of the find tasks function. Used for cleanliness
func (s *Service) findTasksByUserWithURM(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
if filter.User == nil {
return nil, 0, influxdb.ErrTaskNotFound
}
Expand Down Expand Up @@ -352,6 +362,70 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
return ts, len(ts), nil
}

func (s *Service) findTasksByUserUrmFree(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var ts []*influxdb.Task

taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}

var (
seek []byte
opts []CursorOption
)

if filter.After != nil {
seek, err = taskKey(*filter.After)
if err != nil {
return nil, 0, err
}

opts = append(opts, WithCursorSkipFirstItem())
}

c, err := taskBucket.ForwardCursor(seek, opts...)
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}

ps, err := s.maxPermissions(ctx, tx, *filter.User)
if err != nil {
return nil, 0, err
}

matchFn := newTaskMatchFn(filter, nil)

for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return nil, 0, influxdb.ErrInternalTaskServiceError(err)
}

t := kvToInfluxTask(kvTask)
if matchFn == nil || matchFn(t) {
t.Authorization = &influxdb.Authorization{
Status: influxdb.Active,
UserID: t.OwnerID,
ID: influxdb.ID(1),
OrgID: t.OrganizationID,
Permissions: ps,
}

ts = append(ts, t)

if len(ts) >= filter.Limit {
break
}
}
}
if err := c.Err(); err != nil {
return nil, 0, err
}

return ts, len(ts), c.Close()
}

// findTasksByOrg is a subset of the find tasks function. Used for cleanliness
func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var org *influxdb.Organization
Expand Down Expand Up @@ -488,6 +562,14 @@ func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *i
}
}

if f.User != nil {
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res && t.OwnerID == *f.User
}
}

return fn
}

Expand All @@ -496,7 +578,6 @@ func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *i
// Enforcing filters should be done in a validation layer.
func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var ts []*influxdb.Task

taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
Expand Down Expand Up @@ -667,8 +748,10 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}

if err := s.createTaskURM(ctx, tx, task); err != nil {
s.log.Info("Error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
if !feature.UrmFreeTasks().Enabled(ctx) {
if err := s.createTaskURM(ctx, tx, task); err != nil {
s.log.Info("Error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
}
}

// populate permissions so the task can be used immediately
Expand Down Expand Up @@ -937,7 +1020,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
if err := s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: task.ID,
}); err != nil {
s.log.Info("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
s.log.Debug("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
}

uid, _ := icontext.GetUserID(ctx)
Expand Down
4 changes: 2 additions & 2 deletions kv/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
t.Fatal("miss matching taskID's")
}

tasks, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{})
tasks, _, err := ts.Service.FindTasks(context.Background(), influxdb.TaskFilter{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -185,7 +185,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {

// test status filter
active := string(influxdb.TaskActive)
tasksWithActiveFilter, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{Status: &active})
tasksWithActiveFilter, _, err := ts.Service.FindTasks(context.Background(), influxdb.TaskFilter{Status: &active})
if err != nil {
t.Fatal("could not find tasks")
}
Expand Down

0 comments on commit 4809285

Please sign in to comment.