Skip to content

Commit

Permalink
fix: improve operation ordering to fix snapshots indexed before forge…
Browse files Browse the repository at this point in the history
…t operation (#21)
  • Loading branch information
garethgeorge authored Dec 2, 2023
1 parent 38bc107 commit b513b08
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 69 deletions.
2 changes: 1 addition & 1 deletion internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (s *Server) Backup(ctx context.Context, req *types.StringValue) (*emptypb.E
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Value, err)
}
s.orchestrator.ScheduleTask(orchestrator.NewOneofBackupTask(s.orchestrator, plan, time.Now()))
s.orchestrator.ScheduleTask(orchestrator.NewOneofBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive)
return &emptypb.Empty{}, nil
}

Expand Down
15 changes: 3 additions & 12 deletions internal/orchestrator/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@ func (t *BackupTask) Cancel(status v1.OperationStatus) error {
if t.op == nil {
return nil
}

cancel := t.cancel.Load()
if cancel != nil && status == v1.OperationStatus_STATUS_USER_CANCELLED {
(*cancel)() // try to interrupt the running operation.
}

t.op.Status = status
t.op.UnixTimeEndMs = curTimeMillis()
return t.orchestrator.OpLog.Update(t.op)
Expand Down Expand Up @@ -154,14 +148,11 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
return fmt.Errorf("backup operation: %w", err)
}

// this could alternatively be scheduled as a separate task, but it probably makes sense to index snapshots immediately after a backup.
if err := indexSnapshotsHelper(ctx, orchestrator, plan); err != nil {
return fmt.Errorf("reindexing snapshots after backup operation: %w", err)
}

if plan.Retention != nil {
orchestrator.ScheduleTask(NewOneofForgetTask(orchestrator, plan, op.SnapshotId, time.Now()))
orchestrator.ScheduleTask(NewOneofForgetTask(orchestrator, plan, op.SnapshotId, time.Now()), taskPriorityForget)
}

orchestrator.ScheduleTask(NewOneofIndexSnapshotsTask(orchestrator, plan, time.Now()), taskPriorityIndexSnapshots)

return nil
}
12 changes: 0 additions & 12 deletions internal/orchestrator/forget.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

v1 "github.com/garethgeorge/resticui/gen/go/v1"
Expand All @@ -21,7 +20,6 @@ type ForgetTask struct {
linkSnapshot string // snapshot to link the task to.
op *v1.Operation
at *time.Time
cancel atomic.Pointer[context.CancelFunc] // nil unless operation is running.
}

var _ Task = &ForgetTask{}
Expand Down Expand Up @@ -56,10 +54,6 @@ func (t *ForgetTask) Next(now time.Time) *time.Time {
}

func (t *ForgetTask) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
t.cancel.Store(&cancel)
defer t.cancel.Store(nil)

if t.plan.Retention == nil {
return errors.New("plan does not have a retention policy")
}
Expand Down Expand Up @@ -111,12 +105,6 @@ func (t *ForgetTask) Cancel(status v1.OperationStatus) error {
if t.op == nil {
return nil
}

cancel := t.cancel.Load()
if cancel != nil && status == v1.OperationStatus_STATUS_USER_CANCELLED {
(*cancel)() // try to interrupt the running operation.
}

t.op.Status = status
t.op.UnixTimeEndMs = curTimeMillis()
return t.orchestrator.OpLog.Update(t.op)
Expand Down
37 changes: 37 additions & 0 deletions internal/orchestrator/indexsnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,43 @@ import (
"go.uber.org/zap"
)

// ForgetTask tracks a forget operation.
type IndexSnapshotsTask struct {
orchestrator *Orchestrator // owning orchestrator
plan *v1.Plan
at *time.Time
}

var _ Task = &ForgetTask{}

func NewOneofIndexSnapshotsTask(orchestrator *Orchestrator, plan *v1.Plan, at time.Time) *IndexSnapshotsTask {
return &IndexSnapshotsTask{
orchestrator: orchestrator,
plan: plan,
at: &at,
}
}

func (t *IndexSnapshotsTask) Name() string {
return fmt.Sprintf("index snapshots for plan %q", t.plan.Id)
}

func (t *IndexSnapshotsTask) Next(now time.Time) *time.Time {
ret := t.at
if ret != nil {
t.at = nil
}
return ret
}

func (t *IndexSnapshotsTask) Run(ctx context.Context) error {
return indexSnapshotsHelper(ctx, t.orchestrator, t.plan)
}

func (t *IndexSnapshotsTask) Cancel(withStatus v1.OperationStatus) error {
return nil
}

// indexSnapshotsHelper indexes all snapshots for a plan.
// - If the snapshot is already indexed, it is skipped.
// - If the snapshot is not indexed, an index snapshot operation with it's metadata is added.
Expand Down
38 changes: 20 additions & 18 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ var ErrRepoNotFound = errors.New("repo not found")
var ErrRepoInitializationFailed = errors.New("repo initialization failed")
var ErrPlanNotFound = errors.New("plan not found")

const (
TaskPriorityDefault = iota
TaskPriorityInteractive // higher priority than default scheduled operations e.g. the user clicked to run an operation.
taskPriorityForget
taskPriorityIndexSnapshots // higher priority than interactive operations e.g. a system operation like a forget or index (typically scheduled by another task that wants work done immediately after it's completion).
)

// Orchestrator is responsible for managing repos and backups.
type Orchestrator struct {
mu sync.Mutex
Expand All @@ -39,10 +46,7 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orc
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: taskQueue{
Now: func() time.Time {
if o.now != nil {
return o.now()
}
return time.Now()
return o.curTime()
},
},
}
Expand All @@ -52,6 +56,13 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orc
return o, nil
}

func (o *Orchestrator) curTime() time.Time {
if o.now != nil {
return o.now()
}
return time.Now()
}

func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
o.mu.Lock()
defer o.mu.Unlock()
Expand All @@ -64,7 +75,7 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
return fmt.Errorf("failed to update repo pool config: %w", err)
}

// reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress are not cancelled.
// reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress aren't returned by Reset() so they will not be cancelled.
removedTasks := o.taskQueue.Reset()
for _, t := range removedTasks {
if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil {
Expand All @@ -80,7 +91,7 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
if err != nil {
return fmt.Errorf("schedule backup task for plan %q: %w", plan.Id, err)
}
o.ScheduleTask(t)
o.ScheduleTask(t, 0)
}

return nil
Expand Down Expand Up @@ -136,12 +147,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
zap.L().Info("task finished", zap.String("task", t.task.Name()))
}

curTime := time.Now()
if o.now != nil {
curTime = o.now()
}

if nextTime := t.task.Next(curTime); nextTime != nil {
if nextTime := t.task.Next(o.curTime()); nextTime != nil {
o.taskQueue.Push(scheduledTask{
task: t.task,
runAt: *nextTime,
Expand All @@ -150,12 +156,8 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}
}

func (o *Orchestrator) ScheduleTask(t Task) {
curTime := time.Now()
if o.now != nil {
curTime = o.now()
}
nextRun := t.Next(curTime)
func (o *Orchestrator) ScheduleTask(t Task, priority int) {
nextRun := t.Next(o.curTime())
if nextRun == nil {
return
}
Expand Down
8 changes: 4 additions & 4 deletions internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestTaskScheduling(t *testing.T) {
}()

// Act
orch.ScheduleTask(task)
orch.ScheduleTask(task, TaskPriorityDefault)

// Assert passes if all tasks run and the orchestrator exists when cancelled.
wg.Wait()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestTaskRescheduling(t *testing.T) {
}
return nil
},
})
}, TaskPriorityDefault)

wg.Wait()

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestSchedulerWait(t *testing.T) {
close(ran)
return nil
},
})
}, TaskPriorityDefault)

// Act
go orch.Run(context.Background())
Expand All @@ -188,7 +188,7 @@ func TestSchedulerWait(t *testing.T) {
t.Fatalf("should never run")
return nil
},
})
}, TaskPriorityDefault)

select {
case <-time.NewTimer(1000 * time.Millisecond).C:
Expand Down
Loading

0 comments on commit b513b08

Please sign in to comment.