diff --git a/internal/api/server.go b/internal/api/server.go index ee993deb..813f084f 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -240,7 +240,7 @@ func (s *Server) Backup(ctx context.Context, req *types.StringValue) (*emptypb.E if err != nil { return nil, fmt.Errorf("failed to get plan %q: %w", req.Value, err) } - s.orchestrator.ScheduleTask(orchestrator.NewOneofBackupTask(s.orchestrator, plan, time.Now())) + s.orchestrator.ScheduleTask(orchestrator.NewOneofBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive) return &emptypb.Empty{}, nil } diff --git a/internal/orchestrator/backup.go b/internal/orchestrator/backup.go index b06e253e..8d45bb7d 100644 --- a/internal/orchestrator/backup.go +++ b/internal/orchestrator/backup.go @@ -97,12 +97,6 @@ func (t *BackupTask) Cancel(status v1.OperationStatus) error { if t.op == nil { return nil } - - cancel := t.cancel.Load() - if cancel != nil && status == v1.OperationStatus_STATUS_USER_CANCELLED { - (*cancel)() // try to interrupt the running operation. - } - t.op.Status = status t.op.UnixTimeEndMs = curTimeMillis() return t.orchestrator.OpLog.Update(t.op) @@ -154,14 +148,11 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan return fmt.Errorf("backup operation: %w", err) } - // this could alternatively be scheduled as a separate task, but it probably makes sense to index snapshots immediately after a backup. - if err := indexSnapshotsHelper(ctx, orchestrator, plan); err != nil { - return fmt.Errorf("reindexing snapshots after backup operation: %w", err) - } - if plan.Retention != nil { - orchestrator.ScheduleTask(NewOneofForgetTask(orchestrator, plan, op.SnapshotId, time.Now())) + orchestrator.ScheduleTask(NewOneofForgetTask(orchestrator, plan, op.SnapshotId, time.Now()), taskPriorityForget) } + orchestrator.ScheduleTask(NewOneofIndexSnapshotsTask(orchestrator, plan, time.Now()), taskPriorityIndexSnapshots) + return nil } diff --git a/internal/orchestrator/forget.go b/internal/orchestrator/forget.go index 2c59ecb4..8a5b2145 100644 --- a/internal/orchestrator/forget.go +++ b/internal/orchestrator/forget.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync/atomic" "time" v1 "github.com/garethgeorge/resticui/gen/go/v1" @@ -19,7 +18,6 @@ type ForgetTask struct { linkSnapshot string // snapshot to link the task to. op *v1.Operation at *time.Time - cancel atomic.Pointer[context.CancelFunc] // nil unless operation is running. } var _ Task = &ForgetTask{} @@ -54,10 +52,6 @@ func (t *ForgetTask) Next(now time.Time) *time.Time { } func (t *ForgetTask) Run(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - t.cancel.Store(&cancel) - defer t.cancel.Store(nil) - if t.plan.Retention == nil { return errors.New("plan does not have a retention policy") } @@ -101,12 +95,6 @@ func (t *ForgetTask) Cancel(status v1.OperationStatus) error { if t.op == nil { return nil } - - cancel := t.cancel.Load() - if cancel != nil && status == v1.OperationStatus_STATUS_USER_CANCELLED { - (*cancel)() // try to interrupt the running operation. - } - t.op.Status = status t.op.UnixTimeEndMs = curTimeMillis() return t.orchestrator.OpLog.Update(t.op) diff --git a/internal/orchestrator/indexsnapshots.go b/internal/orchestrator/indexsnapshots.go index 8cdcea7b..4b2447ac 100644 --- a/internal/orchestrator/indexsnapshots.go +++ b/internal/orchestrator/indexsnapshots.go @@ -12,6 +12,43 @@ import ( "go.uber.org/zap" ) +// ForgetTask tracks a forget operation. +type IndexSnapshotsTask struct { + orchestrator *Orchestrator // owning orchestrator + plan *v1.Plan + at *time.Time +} + +var _ Task = &ForgetTask{} + +func NewOneofIndexSnapshotsTask(orchestrator *Orchestrator, plan *v1.Plan, at time.Time) *IndexSnapshotsTask { + return &IndexSnapshotsTask{ + orchestrator: orchestrator, + plan: plan, + at: &at, + } +} + +func (t *IndexSnapshotsTask) Name() string { + return fmt.Sprintf("index snapshots for plan %q", t.plan.Id) +} + +func (t *IndexSnapshotsTask) Next(now time.Time) *time.Time { + ret := t.at + if ret != nil { + t.at = nil + } + return ret +} + +func (t *IndexSnapshotsTask) Run(ctx context.Context) error { + return indexSnapshotsHelper(ctx, t.orchestrator, t.plan) +} + +func (t *IndexSnapshotsTask) Cancel(withStatus v1.OperationStatus) error { + return nil +} + // indexSnapshotsHelper indexes all snapshots for a plan. // - If the snapshot is already indexed, it is skipped. // - If the snapshot is not indexed, an index snapshot operation with it's metadata is added. diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 558a730a..467303e1 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -19,6 +19,13 @@ var ErrRepoNotFound = errors.New("repo not found") var ErrRepoInitializationFailed = errors.New("repo initialization failed") var ErrPlanNotFound = errors.New("plan not found") +const ( + TaskPriorityDefault = iota + TaskPriorityInteractive // higher priority than default scheduled operations e.g. the user clicked to run an operation. + taskPriorityForget + taskPriorityIndexSnapshots // higher priority than interactive operations e.g. a system operation like a forget or index (typically scheduled by another task that wants work done immediately after it's completion). +) + // Orchestrator is responsible for managing repos and backups. type Orchestrator struct { mu sync.Mutex @@ -39,10 +46,7 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orc repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}), taskQueue: taskQueue{ Now: func() time.Time { - if o.now != nil { - return o.now() - } - return time.Now() + return o.curTime() }, }, } @@ -52,6 +56,13 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orc return o, nil } +func (o *Orchestrator) curTime() time.Time { + if o.now != nil { + return o.now() + } + return time.Now() +} + func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error { o.mu.Lock() defer o.mu.Unlock() @@ -64,7 +75,7 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error { return fmt.Errorf("failed to update repo pool config: %w", err) } - // reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress are not cancelled. + // reset queued tasks, this may loose any ephemeral operations scheduled by RPC. Tasks in progress aren't returned by Reset() so they will not be cancelled. removedTasks := o.taskQueue.Reset() for _, t := range removedTasks { if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil { @@ -80,7 +91,7 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error { if err != nil { return fmt.Errorf("schedule backup task for plan %q: %w", plan.Id, err) } - o.ScheduleTask(t) + o.ScheduleTask(t, 0) } return nil @@ -136,12 +147,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) { zap.L().Info("task finished", zap.String("task", t.task.Name())) } - curTime := time.Now() - if o.now != nil { - curTime = o.now() - } - - if nextTime := t.task.Next(curTime); nextTime != nil { + if nextTime := t.task.Next(o.curTime()); nextTime != nil { o.taskQueue.Push(scheduledTask{ task: t.task, runAt: *nextTime, @@ -150,12 +156,8 @@ func (o *Orchestrator) Run(mainCtx context.Context) { } } -func (o *Orchestrator) ScheduleTask(t Task) { - curTime := time.Now() - if o.now != nil { - curTime = o.now() - } - nextRun := t.Next(curTime) +func (o *Orchestrator) ScheduleTask(t Task, priority int) { + nextRun := t.Next(o.curTime()) if nextRun == nil { return } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 77c86bba..4caddd20 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -64,7 +64,7 @@ func TestTaskScheduling(t *testing.T) { }() // Act - orch.ScheduleTask(task) + orch.ScheduleTask(task, TaskPriorityDefault) // Assert passes if all tasks run and the orchestrator exists when cancelled. wg.Wait() @@ -109,7 +109,7 @@ func TestTaskRescheduling(t *testing.T) { } return nil }, - }) + }, TaskPriorityDefault) wg.Wait() @@ -164,7 +164,7 @@ func TestSchedulerWait(t *testing.T) { close(ran) return nil }, - }) + }, TaskPriorityDefault) // Act go orch.Run(context.Background()) @@ -188,7 +188,7 @@ func TestSchedulerWait(t *testing.T) { t.Fatalf("should never run") return nil }, - }) + }, TaskPriorityDefault) select { case <-time.NewTimer(1000 * time.Millisecond).C: diff --git a/internal/orchestrator/scheduledtaskheap.go b/internal/orchestrator/scheduledtaskheap.go index 8bb6679c..5322fbd4 100644 --- a/internal/orchestrator/scheduledtaskheap.go +++ b/internal/orchestrator/scheduledtaskheap.go @@ -10,17 +10,18 @@ import ( type taskQueue struct { dequeueMu sync.Mutex mu sync.Mutex - heap scheduledTaskHeap + heap scheduledTaskHeapByTime notify chan struct{} + ready scheduledTaskHeapByPriorityThenTime Now func() time.Time } func (t *taskQueue) curTime() time.Time { - if t.Now == nil { - return time.Now() + if t.Now != nil { + return t.Now() } - return t.Now() + return time.Now() } func (t *taskQueue) Push(task scheduledTask) { @@ -42,7 +43,10 @@ func (t *taskQueue) Reset() []*scheduledTask { defer t.mu.Unlock() oldTasks := t.heap.tasks + oldTasks = append(oldTasks, t.ready.tasks...) t.heap.tasks = nil + t.ready.tasks = nil + if t.notify != nil { t.notify <- struct{}{} } @@ -62,6 +66,10 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { for { first, ok := t.heap.Peek().(*scheduledTask) if !ok { // no tasks in heap. + if t.ready.Len() > 0 { + t.mu.Unlock() + return heap.Pop(&t.ready).(*scheduledTask) + } t.mu.Unlock() select { case <-ctx.Done(): @@ -71,22 +79,45 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { t.mu.Lock() continue } + + now := t.curTime() + + // if there's a task in the ready queue AND the first task isn't ready yet then immediately return the ready task. + ready, ok := t.ready.Peek().(*scheduledTask) + if ok && now.Before(first.runAt) { + heap.Pop(&t.ready) + t.mu.Unlock() + return ready + } + t.mu.Unlock() - timer := time.NewTimer(first.runAt.Sub(t.curTime())) + timer := time.NewTimer(first.runAt.Sub(now)) + select { case <-timer.C: t.mu.Lock() if t.heap.Len() == 0 { break } - first = t.heap.Peek().(*scheduledTask) - if first.runAt.After(t.curTime()) { - // task is not yet ready to run + + for { + first, ok := t.heap.Peek().(*scheduledTask) + if !ok { + break + } + if first.runAt.After(t.curTime()) { + // task is not yet ready to run + break + } + heap.Pop(&t.heap) // remove the task from the heap + heap.Push(&t.ready, first) + } + + if t.ready.Len() == 0 { break } - heap.Pop(&t.heap) // remove the task from the heap t.mu.Unlock() - return first + return heap.Pop(&t.ready).(*scheduledTask) case <-t.notify: // new task was added, loop again to ensure we have the earliest task. t.mu.Lock() if !timer.Stop() { @@ -102,24 +133,20 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { } type scheduledTask struct { - task Task - runAt time.Time + task Task + runAt time.Time + priority int } type scheduledTaskHeap struct { - tasks []*scheduledTask + tasks []*scheduledTask + comparator func(i, j *scheduledTask) bool } -var _ heap.Interface = &scheduledTaskHeap{} - func (h *scheduledTaskHeap) Len() int { return len(h.tasks) } -func (h *scheduledTaskHeap) Less(i, j int) bool { - return h.tasks[i].runAt.Before(h.tasks[j].runAt) -} - func (h *scheduledTaskHeap) Swap(i, j int) { h.tasks[i], h.tasks[j] = h.tasks[j], h.tasks[i] } @@ -142,3 +169,26 @@ func (h *scheduledTaskHeap) Peek() interface{} { } return h.tasks[0] } + +type scheduledTaskHeapByTime struct { + scheduledTaskHeap +} + +var _ heap.Interface = &scheduledTaskHeapByTime{} + +func (h *scheduledTaskHeapByTime) Less(i, j int) bool { + return h.tasks[i].runAt.Before(h.tasks[j].runAt) +} + +type scheduledTaskHeapByPriorityThenTime struct { + scheduledTaskHeap +} + +var _ heap.Interface = &scheduledTaskHeapByPriorityThenTime{} + +func (h *scheduledTaskHeapByPriorityThenTime) Less(i, j int) bool { + if h.tasks[i].priority != h.tasks[j].priority { + return h.tasks[i].priority > h.tasks[j].priority + } + return h.tasks[i].runAt.Before(h.tasks[j].runAt) +} diff --git a/internal/orchestrator/scheduledtaskheap_test.go b/internal/orchestrator/scheduledtaskheap_test.go index 79ab79c9..aa5a6b21 100644 --- a/internal/orchestrator/scheduledtaskheap_test.go +++ b/internal/orchestrator/scheduledtaskheap_test.go @@ -2,7 +2,10 @@ package orchestrator import ( "context" + "math/rand" "reflect" + "sort" + "strconv" "testing" "time" @@ -89,3 +92,69 @@ func TestTaskQueueReset(t *testing.T) { t.Fatal("expected nil task") } } + +func TestTasksOrderedByPriority(t *testing.T) { + h := taskQueue{} + + now := time.Now() + h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "4"}, priority: 1}) + h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "3"}, priority: 2}) + h.Push(scheduledTask{runAt: now.Add(10 * time.Millisecond), task: &heapTestTask{name: "5"}, priority: 3}) + h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "2"}, priority: 3}) + h.Push(scheduledTask{runAt: now.Add(-10 * time.Millisecond), task: &heapTestTask{name: "1"}, priority: 3}) + + wantSeq := []string{"1", "2", "3", "4", "5"} + + seq := []string{} + + for i := 0; i < 5; i++ { + task := h.Dequeue(context.Background()) + if task == nil || task.task == nil { + t.Fatal("expected task") + } + seq = append(seq, task.task.Name()) + } + + if !reflect.DeepEqual(seq, wantSeq) { + t.Errorf("got %v, want %v", seq, wantSeq) + } +} + +func TestFuzzTaskQueue(t *testing.T) { + h := taskQueue{} + + count := 100 + + // Setup a bunch of tasks with random priorities and run times. + tasks := make([]scheduledTask, count) + for i := 0; i < count; i++ { + at := time.Now().Add(time.Duration(rand.Intn(200)-50) * time.Millisecond) + tasks[i] = scheduledTask{runAt: at, priority: 0, task: &heapTestTask{name: strconv.Itoa(i)}} + h.Push(tasks[i]) + } + + seq := []string{} + for i := 0; i < count; i++ { + task := h.Dequeue(context.Background()) + if task == nil || task.task == nil { + t.Fatal("expected task") + } + seq = append(seq, task.task.Name()) + } + + var expectOrdering []string + sort.SliceStable(tasks, func(i, j int) bool { + if tasks[i].runAt.Equal(tasks[j].runAt) { + return tasks[i].priority < tasks[j].priority + } + return tasks[i].runAt.Before(tasks[j].runAt) + }) + + for _, task := range tasks { + expectOrdering = append(expectOrdering, task.task.Name()) + } + + if !reflect.DeepEqual(seq, expectOrdering) { + t.Errorf("got %v, want %v", seq, expectOrdering) + } +} diff --git a/webui/src/components/OperationList.tsx b/webui/src/components/OperationList.tsx index 42ac9f9d..656b64be 100644 --- a/webui/src/components/OperationList.tsx +++ b/webui/src/components/OperationList.tsx @@ -41,6 +41,7 @@ export const OperationList = ({ useEffect(() => { const lis = buildOperationListListener(req, (event, operation, opList) => { + console.log("got list: ", JSON.stringify(opList, null, 2)); setOperations(opList); }); subscribeToOperations(lis); diff --git a/webui/src/state/oplog.ts b/webui/src/state/oplog.ts index 35f62553..39327095 100644 --- a/webui/src/state/oplog.ts +++ b/webui/src/state/oplog.ts @@ -180,7 +180,7 @@ export class BackupInfoCollector { existing.status = newInfo.status; // use the latest status } existing.operations = _.uniqBy( - [...existing.operations, ...newInfo.operations], + [...newInfo.operations, ...existing.operations], (o) => o.id! ); if (newInfo.backupLastStatus) { diff --git a/webui/src/views/AddPlanModal.tsx b/webui/src/views/AddPlanModal.tsx index 076e7e28..104d60ae 100644 --- a/webui/src/views/AddPlanModal.tsx +++ b/webui/src/views/AddPlanModal.tsx @@ -339,7 +339,7 @@ export const AddPlanModal = ({ {/* Plan.retention */} - + {() => ( @@ -354,12 +354,14 @@ export const AddPlanModal = ({ ); }; -const RetentionPolicyView = (policy: RetentionPolicy) => { +const RetentionPolicyView = ({ policy }: { policy?: RetentionPolicy }) => { enum PolicyType { TimeBased, CountBased, } + policy = policy || {}; + const [policyType, setPolicyType] = useState( policy.keepLastN ? PolicyType.CountBased : PolicyType.TimeBased );