diff --git a/gen/go/v1/operations.pb.go b/gen/go/v1/operations.pb.go index 172b80b2..9b88c83a 100644 --- a/gen/go/v1/operations.pb.go +++ b/gen/go/v1/operations.pb.go @@ -201,7 +201,7 @@ type Operation struct { Status OperationStatus `protobuf:"varint,4,opt,name=status,proto3,enum=v1.OperationStatus" json:"status,omitempty"` // required, unix time in milliseconds of the operation's creation (ID is derived from this) UnixTimeStartMs int64 `protobuf:"varint,5,opt,name=unix_time_start_ms,json=unixTimeStartMs,proto3" json:"unix_time_start_ms,omitempty"` - // optional, unix time in milliseconds of the operation's completion + // ptional, unix time in milliseconds of the operation's completion UnixTimeEndMs int64 `protobuf:"varint,6,opt,name=unix_time_end_ms,json=unixTimeEndMs,proto3" json:"unix_time_end_ms,omitempty"` // optional, human readable context message, typically an error message. DisplayMessage string `protobuf:"bytes,7,opt,name=display_message,json=displayMessage,proto3" json:"display_message,omitempty"` @@ -889,6 +889,7 @@ type OperationRunHook struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + ParentOp int64 `protobuf:"varint,4,opt,name=parent_op,json=parentOp,proto3" json:"parent_op,omitempty"` // ID of the operation that ran the hook. Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // description of the hook that was run. typically repo/hook_idx or plan/hook_idx. OutputLogref string `protobuf:"bytes,2,opt,name=output_logref,json=outputLogref,proto3" json:"output_logref,omitempty"` // logref of the hook's output. DEPRECATED. Condition Hook_Condition `protobuf:"varint,3,opt,name=condition,proto3,enum=v1.Hook_Condition" json:"condition,omitempty"` // triggering condition of the hook. @@ -926,6 +927,13 @@ func (*OperationRunHook) Descriptor() ([]byte, []int) { return file_v1_operations_proto_rawDescGZIP(), []int{10} } +func (x *OperationRunHook) GetParentOp() int64 { + if x != nil { + return x.ParentOp + } + return 0 +} + func (x *OperationRunHook) GetName() string { if x != nil { return x.Name @@ -1061,36 +1069,38 @@ var file_v1_operations_proto_rawDesc = []byte{ 0x0e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x23, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, - 0x74, 0x61, 0x74, 0x73, 0x22, 0x7d, 0x0a, 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x75, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, - 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x72, 0x65, - 0x66, 0x12, 0x30, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x6f, 0x6f, 0x6b, 0x2e, 0x43, - 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x2a, 0x60, 0x0a, 0x12, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, - 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, - 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, - 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, - 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x44, 0x45, 0x4c, 0x45, - 0x54, 0x45, 0x44, 0x10, 0x03, 0x2a, 0xc2, 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, - 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x12, 0x0a, - 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, - 0x01, 0x12, 0x15, 0x0a, 0x11, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x49, 0x4e, 0x50, 0x52, - 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x02, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, - 0x55, 0x53, 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x03, 0x12, 0x12, 0x0a, 0x0e, - 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x57, 0x41, 0x52, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, - 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, - 0x10, 0x04, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x59, 0x53, - 0x54, 0x45, 0x4d, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, - 0x19, 0x0a, 0x15, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x53, 0x45, 0x52, 0x5f, 0x43, - 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x06, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, - 0x65, 0x6f, 0x72, 0x67, 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, - 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x61, 0x74, 0x73, 0x22, 0x9a, 0x01, 0x0a, 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x72, + 0x65, 0x6e, 0x74, 0x5f, 0x6f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x61, + 0x72, 0x65, 0x6e, 0x74, 0x4f, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x6f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x12, + 0x30, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x6f, 0x6f, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, + 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x2a, 0x60, 0x0a, 0x12, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, + 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, + 0x45, 0x4e, 0x54, 0x5f, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x11, 0x0a, + 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x02, + 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, + 0x44, 0x10, 0x03, 0x2a, 0xc2, 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, + 0x53, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x53, + 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, + 0x15, 0x0a, 0x11, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x49, 0x4e, 0x50, 0x52, 0x4f, 0x47, + 0x52, 0x45, 0x53, 0x53, 0x10, 0x02, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, + 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x03, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, + 0x41, 0x54, 0x55, 0x53, 0x5f, 0x57, 0x41, 0x52, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x12, 0x10, + 0x0a, 0x0c, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, + 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x59, 0x53, 0x54, 0x45, + 0x4d, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, 0x19, 0x0a, + 0x15, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x53, 0x45, 0x52, 0x5f, 0x43, 0x41, 0x4e, + 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x06, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, + 0x72, 0x67, 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, + 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/api/backresthandler_test.go b/internal/api/backresthandler_test.go index ed72ce99..970e7a5a 100644 --- a/internal/api/backresthandler_test.go +++ b/internal/api/backresthandler_test.go @@ -20,6 +20,7 @@ import ( "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" "github.com/garethgeorge/backrest/internal/resticinstaller" "github.com/garethgeorge/backrest/internal/rotatinglog" "golang.org/x/sync/errgroup" @@ -334,6 +335,98 @@ func TestHookExecution(t *testing.T) { } } +func TestHookCancellation(t *testing.T) { + t.Parallel() + + if runtime.GOOS == "windows" { + t.Skip("skipping test on windows") + } + + sut := createSystemUnderTest(t, &config.MemoryStore{ + Config: &v1.Config{ + Modno: 1234, + Instance: "test", + Repos: []*v1.Repo{ + { + Id: "local", + Uri: t.TempDir(), + Password: "test", + }, + }, + Plans: []*v1.Plan{ + { + Id: "test", + Repo: "local", + Paths: []string{ + t.TempDir(), + }, + Schedule: &v1.Schedule{ + Schedule: &v1.Schedule_Disabled{Disabled: true}, + }, + Hooks: []*v1.Hook{ + { + Conditions: []v1.Hook_Condition{ + v1.Hook_CONDITION_SNAPSHOT_START, + }, + Action: &v1.Hook_ActionCommand{ + ActionCommand: &v1.Hook_Command{ + Command: "exit 123", + }, + }, + OnError: v1.Hook_ON_ERROR_CANCEL, + }, + }, + }, + }, + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + sut.orch.Run(ctx) + }() + + _, err := sut.handler.Backup(context.Background(), connect.NewRequest(&types.StringValue{Value: "test"})) + if !errors.Is(err, tasks.ErrTaskCancelled) { + t.Fatalf("Backup() error = %v, want errors.Is(err, tasks.ErrTaskCancelled)", err) + } + + // Wait for a hook operation to appear in the oplog + if err := retry(t, 10, 2*time.Second, func() error { + hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { + _, ok := op.GetOp().(*v1.Operation_OperationRunHook) + return !ok + }) + if len(hookOps) != 1 { + return fmt.Errorf("expected 1 hook operations, got %d", len(hookOps)) + } + if hookOps[0].Status != v1.OperationStatus_STATUS_ERROR { + return fmt.Errorf("expected hook operation error status, got %v", hookOps[0].Status) + } + return nil + }); err != nil { + t.Fatalf("Couldn't find hooks in oplog: %v", err) + } + + // assert that the backup operation is in the log and is cancelled + if err := retry(t, 10, 2*time.Second, func() error { + backupOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool { + _, ok := op.GetOp().(*v1.Operation_OperationBackup) + return !ok + }) + if len(backupOps) != 1 { + return fmt.Errorf("expected 1 backup operation, got %d", len(backupOps)) + } + if backupOps[0].Status != v1.OperationStatus_STATUS_USER_CANCELLED { + return fmt.Errorf("expected backup operation cancelled status, got %v", backupOps[0].Status) + } + return nil + }); err != nil { + t.Fatalf("Couldn't find hooks in oplog: %v", err) + } +} + func TestCancelBackup(t *testing.T) { t.Parallel() diff --git a/internal/config/configutil.go b/internal/config/configutil.go new file mode 100644 index 00000000..70eb9023 --- /dev/null +++ b/internal/config/configutil.go @@ -0,0 +1,21 @@ +package config + +import v1 "github.com/garethgeorge/backrest/gen/go/v1" + +func FindPlan(cfg *v1.Config, planID string) *v1.Plan { + for _, plan := range cfg.Plans { + if plan.Id == planID { + return plan + } + } + return nil +} + +func FindRepo(cfg *v1.Config, repoID string) *v1.Repo { + for _, repo := range cfg.Repos { + if repo.Id == repoID { + return repo + } + } + return nil +} diff --git a/internal/hook/hook.go b/internal/hook/hook.go index 2bf73411..d95b331f 100644 --- a/internal/hook/hook.go +++ b/internal/hook/hook.go @@ -1,116 +1,114 @@ package hook import ( - "bytes" + "context" "errors" "fmt" - "io" + "reflect" "slices" - "strings" - "text/template" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/oplog" - "github.com/garethgeorge/backrest/internal/rotatinglog" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" + cfg "github.com/garethgeorge/backrest/internal/config" + "github.com/garethgeorge/backrest/internal/hook/types" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" ) -var ( - defaultTemplate = `{{ .Summary }}` -) - -type HookExecutor struct { - config *v1.Config - oplog *oplog.OpLog - logStore *rotatinglog.RotatingLog -} +func TasksTriggeredByEvent(config *v1.Config, repoID string, planID string, parentOp *v1.Operation, events []v1.Hook_Condition, vars interface{}) ([]tasks.Task, error) { + var taskSet []tasks.Task -func NewHookExecutor(config *v1.Config, oplog *oplog.OpLog, bigOutputStore *rotatinglog.RotatingLog) *HookExecutor { - return &HookExecutor{ - config: config, - oplog: oplog, - logStore: bigOutputStore, + repo := cfg.FindRepo(config, repoID) + if repo == nil { + return nil, fmt.Errorf("repo %v not found", repoID) } -} - -// ExecuteHooks schedules tasks for the hooks subscribed to the given event. The vars map is used to substitute variables -// Hooks are pulled both from the provided plan and from the repo config. -func (e *HookExecutor) ExecuteHooks(flowID int64, repo *v1.Repo, plan *v1.Plan, events []v1.Hook_Condition, vars HookVars) error { - planId := plan.GetId() - if planId == "" { - planId = "_system_" // TODO: clean this up when refactoring hook execution - } - - operationBase := v1.Operation{ - Status: v1.OperationStatus_STATUS_INPROGRESS, - PlanId: planId, - RepoId: repo.GetId(), - InstanceId: e.config.Instance, - FlowId: flowID, + plan := cfg.FindPlan(config, planID) + if plan == nil && planID != "" { + return nil, fmt.Errorf("plan %v not found", planID) } - vars.Repo = repo - vars.Plan = plan - vars.CurTime = time.Now() - for idx, hook := range repo.GetHooks() { - h := (*Hook)(hook) - event := firstMatchingCondition(h, events) + event := firstMatchingCondition(hook, events) if event == v1.Hook_CONDITION_UNKNOWN { continue } name := fmt.Sprintf("repo/%v/hook/%v", repo.Id, idx) - operation := proto.Clone(&operationBase).(*v1.Operation) - operation.DisplayMessage = "running " + name - operation.UnixTimeStartMs = curTimeMs() - operation.Op = &v1.Operation_OperationRunHook{ - OperationRunHook: &v1.OperationRunHook{ - Name: name, - Condition: event, - }, - } - zap.L().Info("running hook", zap.String("plan", repo.Id), zap.Int64("opId", operation.Id), zap.String("hook", name)) - if err := e.executeHook(operation, h, event, vars); err != nil { - zap.S().Errorf("error on repo hook %v on condition %v: %v", idx, event.String(), err) - if isHaltingError(err) { - return fmt.Errorf("repo hook %v on condition %v: %w", idx, event.String(), err) - } + task, err := newOneoffRunHookTask(name, config.Instance, repoID, planID, parentOp, time.Now(), hook, event, vars) + if err != nil { + return nil, err } + taskSet = append(taskSet, task) } for idx, hook := range plan.GetHooks() { - h := (*Hook)(hook) - event := firstMatchingCondition(h, events) + event := firstMatchingCondition(hook, events) if event == v1.Hook_CONDITION_UNKNOWN { continue } name := fmt.Sprintf("plan/%v/hook/%v", plan.Id, idx) - operation := proto.Clone(&operationBase).(*v1.Operation) - operation.DisplayMessage = "running " + name - operation.UnixTimeStartMs = curTimeMs() - operation.Op = &v1.Operation_OperationRunHook{ - OperationRunHook: &v1.OperationRunHook{ - Name: name, - Condition: event, - }, + task, err := newOneoffRunHookTask(name, config.Instance, repoID, planID, parentOp, time.Now(), hook, event, vars) + if err != nil { + return nil, err } + taskSet = append(taskSet, task) + } - zap.L().Info("running hook", zap.String("plan", plan.Id), zap.Int64("opId", operation.Id), zap.String("hook", name)) - if err := e.executeHook(operation, h, event, vars); err != nil { - zap.S().Errorf("error on plan hook %v on condition %v: %v", idx, event.String(), err) - if isHaltingError(err) { - return fmt.Errorf("plan hook %v on condition %v: %w", idx, event.String(), err) - } - } + return taskSet, nil +} + +func newOneoffRunHookTask(title, instanceID, repoID, planID string, parentOp *v1.Operation, at time.Time, hook *v1.Hook, event v1.Hook_Condition, vars interface{}) (tasks.Task, error) { + h, err := types.DefaultRegistry().GetHandler(hook) + if err != nil { + return nil, fmt.Errorf("no handler for hook type %T", hook.Action) } - return nil + + title = h.Name() + " hook " + title + + return &tasks.GenericOneoffTask{ + OneoffTask: tasks.OneoffTask{ + BaseTask: tasks.BaseTask{ + TaskName: fmt.Sprintf("run hook %v", title), + TaskRepoID: repoID, + TaskPlanID: planID, + }, + FlowID: parentOp.GetFlowId(), + RunAt: at, + ProtoOp: &v1.Operation{ + InstanceId: instanceID, + RepoId: repoID, + PlanId: planID, + FlowId: parentOp.GetFlowId(), + + DisplayMessage: fmt.Sprintf("running %v triggered by %v", title, event.String()), + Op: &v1.Operation_OperationRunHook{ + OperationRunHook: &v1.OperationRunHook{ + Name: title, + Condition: event, + ParentOp: parentOp.GetId(), + }, + }, + }, + }, + Do: func(ctx context.Context, st tasks.ScheduledTask, taskRunner tasks.TaskRunner) error { + // TODO: this is a hack to get around the fact that vars is an interface{} . + v := reflect.ValueOf(&vars).Elem() + clone := reflect.New(v.Elem().Type()).Elem() + clone.Set(v.Elem()) // copy vars to clone + if field := v.Elem().FieldByName("Event"); field.IsValid() { + clone.FieldByName("Event").Set(reflect.ValueOf(event)) + } + + if err := h.Execute(ctx, hook, clone, taskRunner); err != nil { + err = applyHookErrorPolicy(hook.OnError, err) + return err + } + return nil + }, + }, nil } -func firstMatchingCondition(hook *Hook, events []v1.Hook_Condition) v1.Hook_Condition { +func firstMatchingCondition(hook *v1.Hook, events []v1.Hook_Condition) v1.Hook_Condition { for _, event := range events { if slices.Contains(hook.Conditions, event) { return event @@ -119,95 +117,12 @@ func firstMatchingCondition(hook *Hook, events []v1.Hook_Condition) v1.Hook_Cond return v1.Hook_CONDITION_UNKNOWN } -func (e *HookExecutor) executeHook(op *v1.Operation, hook *Hook, event v1.Hook_Condition, vars HookVars) error { - if err := e.oplog.Add(op); err != nil { - zap.S().Errorf("execute hook: add operation: %v", err) - return errors.New("couldn't create operation") - } - - output := &bytes.Buffer{} - fmt.Fprintf(output, "triggering condition: %v\n", event.String()) - - var retErr error - if err := hook.Do(event, vars, io.MultiWriter(output)); err != nil { - output.Write([]byte(fmt.Sprintf("Error: %v", err))) - err = applyHookErrorPolicy(hook.OnError, err) - var cancelErr *HookErrorRequestCancel - if errors.As(err, &cancelErr) { - // if it was a cancel then it successfully indicated it's intent to the caller - // no error should be displayed in the UI. - op.Status = v1.OperationStatus_STATUS_SUCCESS - } else { - op.Status = v1.OperationStatus_STATUS_ERROR - } - retErr = err - } else { - op.Status = v1.OperationStatus_STATUS_SUCCESS - } - - outputRef, err := e.logStore.Write(output.Bytes()) - if err != nil { - retErr = errors.Join(retErr, fmt.Errorf("write logstore: %w", err)) - } - op.Logref = outputRef - - op.UnixTimeEndMs = curTimeMs() - if err := e.oplog.Update(op); err != nil { - retErr = errors.Join(retErr, fmt.Errorf("update oplog: %w", err)) - } - return retErr -} - func curTimeMs() int64 { return time.Now().UnixNano() / 1000000 } type Hook v1.Hook -func (h *Hook) Do(event v1.Hook_Condition, vars HookVars, output io.Writer) error { - if !slices.Contains(h.Conditions, event) { - return nil - } - - vars.Event = event - - switch action := h.Action.(type) { - case *v1.Hook_ActionCommand: - return h.doCommand(action, vars, output) - case *v1.Hook_ActionDiscord: - return h.doDiscord(action, vars, output) - case *v1.Hook_ActionGotify: - return h.doGotify(action, vars, output) - case *v1.Hook_ActionSlack: - return h.doSlack(action, vars, output) - case *v1.Hook_ActionShoutrrr: - return h.doShoutrrr(action, vars, output) - default: - return fmt.Errorf("unknown hook action: %v", action) - } -} - -func (h *Hook) renderTemplate(text string, vars HookVars) (string, error) { - template, err := template.New("template").Parse(text) - if err != nil { - return "", fmt.Errorf("parse template: %w", err) - } - - buf := &bytes.Buffer{} - if err := template.Execute(buf, vars); err != nil { - return "", fmt.Errorf("execute template: %w", err) - } - - return buf.String(), nil -} - -func (h *Hook) renderTemplateOrDefault(template string, defaultTmpl string, vars HookVars) (string, error) { - if strings.Trim(template, " ") == "" { - return h.renderTemplate(defaultTmpl, vars) - } - return h.renderTemplate(template, vars) -} - func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error { if err == nil || errors.As(err, &HookErrorFatal{}) || errors.As(err, &HookErrorRequestCancel{}) { return err @@ -221,8 +136,8 @@ func applyHookErrorPolicy(onError v1.Hook_OnError, err error) error { return err } -// isHaltingError returns true if the error is a fatal error or a request to cancel the operation -func isHaltingError(err error) bool { +// IsHaltingError returns true if the error is a fatal error or a request to cancel the operation +func IsHaltingError(err error) bool { var fatalErr *HookErrorFatal var cancelErr *HookErrorRequestCancel return errors.As(err, &fatalErr) || errors.As(err, &cancelErr) diff --git a/internal/hook/hook_test.go b/internal/hook/hook_test.go deleted file mode 100644 index 79f6fe96..00000000 --- a/internal/hook/hook_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package hook - -import ( - "bytes" - "errors" - "os/exec" - "runtime" - "testing" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -func TestHookCommandInDefaultShell(t *testing.T) { - hook := Hook(v1.Hook{ - Conditions: []v1.Hook_Condition{v1.Hook_CONDITION_SNAPSHOT_START}, - Action: &v1.Hook_ActionCommand{ - ActionCommand: &v1.Hook_Command{ - Command: "exit 2", - }, - }, - }) - - err := hook.Do(v1.Hook_CONDITION_SNAPSHOT_START, HookVars{}, &bytes.Buffer{}) - if err == nil { - t.Fatal("expected error") - } - if err.(*exec.ExitError).ExitCode() != 2 { - t.Fatalf("expected exit code 2, got %v", err.(*exec.ExitError).ExitCode()) - } -} - -func TestHookCommandInBashShell(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping test on windows") - } - - hook := Hook(v1.Hook{ - Conditions: []v1.Hook_Condition{v1.Hook_CONDITION_SNAPSHOT_START}, - Action: &v1.Hook_ActionCommand{ - ActionCommand: &v1.Hook_Command{ - Command: `#!/bin/bash -counter=0 -# Start a while loop that will run until the counter is equal to 10 -while [ $counter -lt 10 ]; do - ((counter++)) -done -exit $counter`, - }, - }, - }) - - err := hook.Do(v1.Hook_CONDITION_SNAPSHOT_START, HookVars{}, &bytes.Buffer{}) - if err == nil { - t.Fatal("expected error") - } - if err.(*exec.ExitError).ExitCode() != 10 { - t.Fatalf("expected exit code 3, got %v", err.(*exec.ExitError).ExitCode()) - } -} - -func TestCommandHookErrorHandling(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping test on windows") - } - - hook := Hook(v1.Hook{ - Conditions: []v1.Hook_Condition{ - v1.Hook_CONDITION_SNAPSHOT_START, - }, - Action: &v1.Hook_ActionCommand{ - ActionCommand: &v1.Hook_Command{ - Command: "exit 1", - }, - }, - OnError: v1.Hook_ON_ERROR_CANCEL, - }) - - err := applyHookErrorPolicy(hook.OnError, hook.Do(v1.Hook_CONDITION_SNAPSHOT_START, HookVars{}, &bytes.Buffer{})) - if err == nil { - t.Fatal("expected error") - } - var cancelErr *HookErrorRequestCancel - if !errors.As(err, &cancelErr) { - t.Fatalf("expected HookErrorRequestCancel, got %v", err) - } -} diff --git a/internal/hook/hookcommand.go b/internal/hook/hookcommand.go deleted file mode 100644 index e677a9de..00000000 --- a/internal/hook/hookcommand.go +++ /dev/null @@ -1,40 +0,0 @@ -package hook - -import ( - "fmt" - "io" - "os/exec" - "strings" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -func (h *Hook) doCommand(cmd *v1.Hook_ActionCommand, vars HookVars, output io.Writer) error { - command, err := h.renderTemplate(cmd.ActionCommand.Command, vars) - if err != nil { - return fmt.Errorf("template rendering: %w", err) - } - - // Parse out the shell to use if a #! prefix is present - shell := "sh" - if len(command) > 2 && command[0:2] == "#!" { - nextLine := strings.Index(command, "\n") - if nextLine == -1 { - nextLine = len(command) - } - shell = strings.Trim(command[2:nextLine], " ") - command = command[nextLine+1:] - } - - output.Write([]byte(fmt.Sprintf("------- script -------\n#! %v\n%v\n", shell, command))) - output.Write([]byte("------- output -------\n")) - - // Run the command in the specified shell - execCmd := exec.Command(shell) - execCmd.Stdin = strings.NewReader(command) - - execCmd.Stderr = output - execCmd.Stdout = output - - return execCmd.Run() -} diff --git a/internal/hook/hookdiscord.go b/internal/hook/hookdiscord.go deleted file mode 100644 index a53e7c11..00000000 --- a/internal/hook/hookdiscord.go +++ /dev/null @@ -1,33 +0,0 @@ -package hook - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -func (h *Hook) doDiscord(cmd *v1.Hook_ActionDiscord, vars HookVars, output io.Writer) error { - payload, err := h.renderTemplateOrDefault(cmd.ActionDiscord.GetTemplate(), defaultTemplate, vars) - if err != nil { - return fmt.Errorf("template rendering: %w", err) - } - - type Message struct { - Content string `json:"content"` - } - - request := Message{ - Content: payload, // leading newline looks better in discord. - } - - requestBytes, _ := json.Marshal(request) - - fmt.Fprintf(output, "Sending Discord message to %s\n---- payload ----\n", cmd.ActionDiscord.GetWebhookUrl()) - output.Write(requestBytes) - - _, err = post(cmd.ActionDiscord.GetWebhookUrl(), "application/json", bytes.NewReader(requestBytes)) - return err -} diff --git a/internal/hook/hookgotify.go b/internal/hook/hookgotify.go deleted file mode 100644 index bf8f2808..00000000 --- a/internal/hook/hookgotify.go +++ /dev/null @@ -1,62 +0,0 @@ -package hook - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/url" - "strings" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -func (h *Hook) doGotify(cmd *v1.Hook_ActionGotify, vars HookVars, output io.Writer) error { - payload, err := h.renderTemplateOrDefault(cmd.ActionGotify.GetTemplate(), defaultTemplate, vars) - if err != nil { - return fmt.Errorf("template rendering: %w", err) - } - - title, err := h.renderTemplateOrDefault(cmd.ActionGotify.GetTitleTemplate(), "Backrest Event", vars) - if err != nil { - return fmt.Errorf("title template rendering: %w", err) - } - - message := struct { - Message string `json:"message"` - Title string `json:"title"` - Priority int `json:"priority"` - }{ - Title: title, - Priority: 5, - Message: payload, - } - - b, err := json.Marshal(message) - if err != nil { - return fmt.Errorf("json marshal: %w", err) - } - - baseUrl := strings.Trim(cmd.ActionGotify.GetBaseUrl(), "/") - - postUrl := fmt.Sprintf( - "%s/message?token=%s", - baseUrl, - url.QueryEscape(cmd.ActionGotify.GetToken())) - - fmt.Fprintf(output, "Sending gotify message to %s\n", postUrl) - fmt.Fprintf(output, "---- payload ----\n") - output.Write(b) - - body, err := post(postUrl, "application/json", bytes.NewReader(b)) - - if err != nil { - return fmt.Errorf("send gotify message: %w", err) - } - - if body != "" { - output.Write([]byte(body)) - } - - return nil -} diff --git a/internal/hook/hookshoutrrr.go b/internal/hook/hookshoutrrr.go deleted file mode 100644 index 36f20af1..00000000 --- a/internal/hook/hookshoutrrr.go +++ /dev/null @@ -1,24 +0,0 @@ -package hook - -import ( - "fmt" - "io" - - "github.com/containrrr/shoutrrr" - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -func (h *Hook) doShoutrrr(cmd *v1.Hook_ActionShoutrrr, vars HookVars, output io.Writer) error { - payload, err := h.renderTemplateOrDefault(cmd.ActionShoutrrr.GetTemplate(), defaultTemplate, vars) - if err != nil { - return fmt.Errorf("template rendering: %w", err) - } - - fmt.Fprintf(output, "Sending notification to %s\nContents:\n", cmd.ActionShoutrrr.GetShoutrrrUrl()) - output.Write([]byte(payload)) - - if err := shoutrrr.Send(cmd.ActionShoutrrr.GetShoutrrrUrl(), payload); err != nil { - return fmt.Errorf("send notification to %q: %w", cmd.ActionShoutrrr.GetShoutrrrUrl(), err) - } - return nil -} diff --git a/internal/hook/hookslack.go b/internal/hook/hookslack.go deleted file mode 100644 index 075115f0..00000000 --- a/internal/hook/hookslack.go +++ /dev/null @@ -1,33 +0,0 @@ -package hook - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -func (h *Hook) doSlack(cmd *v1.Hook_ActionSlack, vars HookVars, output io.Writer) error { - payload, err := h.renderTemplateOrDefault(cmd.ActionSlack.GetTemplate(), defaultTemplate, vars) - if err != nil { - return fmt.Errorf("template rendering: %w", err) - } - - type Message struct { - Text string `json:"text"` - } - - request := Message{ - Text: "Backrest Notification\n" + payload, // leading newline looks better in discord. - } - - requestBytes, _ := json.Marshal(request) - - fmt.Fprintf(output, "Sending Slack message to %s\n---- payload ----\n", cmd.ActionSlack.GetWebhookUrl()) - output.Write(requestBytes) - - _, err = post(cmd.ActionSlack.GetWebhookUrl(), "application/json", bytes.NewReader(requestBytes)) - return err -} diff --git a/internal/hook/httputil.go b/internal/hook/hookutil/httputil.go similarity index 82% rename from internal/hook/httputil.go rename to internal/hook/hookutil/httputil.go index 0ee68850..9d006388 100644 --- a/internal/hook/httputil.go +++ b/internal/hook/hookutil/httputil.go @@ -1,4 +1,4 @@ -package hook +package hookutil import ( "fmt" @@ -6,7 +6,7 @@ import ( "net/http" ) -func post(url string, contentType string, body io.Reader) (string, error) { +func PostRequest(url string, contentType string, body io.Reader) (string, error) { r, err := http.Post(url, contentType, body) if err != nil { return "", fmt.Errorf("send request %v: %w", url, err) diff --git a/internal/hook/hookutil/templateutil.go b/internal/hook/hookutil/templateutil.go new file mode 100644 index 00000000..eee8c81f --- /dev/null +++ b/internal/hook/hookutil/templateutil.go @@ -0,0 +1,33 @@ +package hookutil + +import ( + "bytes" + "fmt" + "strings" + "text/template" +) + +var ( + DefaultTemplate = `{{ .Summary }}` +) + +func RenderTemplate(text string, vars interface{}) (string, error) { + template, err := template.New("template").Parse(text) + if err != nil { + return "", fmt.Errorf("parse template: %w", err) + } + + buf := &bytes.Buffer{} + if err := template.Execute(buf, vars); err != nil { + return "", fmt.Errorf("execute template: %w", err) + } + + return buf.String(), nil +} + +func RenderTemplateOrDefault(template string, defaultTmpl string, vars interface{}) (string, error) { + if strings.Trim(template, " ") == "" { + return RenderTemplate(defaultTmpl, vars) + } + return RenderTemplate(template, vars) +} diff --git a/internal/hook/types/command.go b/internal/hook/types/command.go new file mode 100644 index 00000000..6db05533 --- /dev/null +++ b/internal/hook/types/command.go @@ -0,0 +1,77 @@ +package types + +import ( + "context" + "errors" + "fmt" + "os/exec" + "reflect" + "runtime" + "strings" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/hook/hookutil" + "github.com/garethgeorge/backrest/internal/ioutil" + "github.com/garethgeorge/backrest/internal/orchestrator/logging" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" + "github.com/google/shlex" +) + +type commandHandler struct{} + +func (commandHandler) Name() string { + return "command" +} + +func (commandHandler) Execute(ctx context.Context, h *v1.Hook, vars interface{}, runner tasks.TaskRunner) error { + command, err := hookutil.RenderTemplate(h.GetActionCommand().GetCommand(), vars) + if err != nil { + return fmt.Errorf("template rendering: %w", err) + } + + writer := logging.WriterFromContext(ctx) + + // Parse out the shell to use if a #! prefix is present + shell := []string{"sh"} + if runtime.GOOS == "windows" { + shell = []string{"powershell", "-nologo", "-noprofile"} + } + + if len(command) > 2 && command[0:2] == "#!" { + nextLine := strings.Index(command, "\n") + if nextLine == -1 { + nextLine = len(command) + } + shell, err = shlex.Split(strings.Trim(command[2:nextLine], " ")) + if err != nil { + return fmt.Errorf("parsing shell for command: %w", err) + } else if len(shell) == 0 { + return errors.New("must specify shell for command") + } + command = command[nextLine+1:] + } + + scriptWriter := &ioutil.LinePrefixer{W: writer, Prefix: []byte("[script] ")} + fmt.Fprintf(scriptWriter, "%v\n%v\n", shell, command) + scriptWriter.Close() + outputWriter := &ioutil.LinePrefixer{W: writer, Prefix: []byte("[output] ")} + defer outputWriter.Close() + + // Run the command in the specified shell + execCmd := exec.Command(shell[0], shell[1:]...) + execCmd.Stdin = strings.NewReader(command) + + stdout := &ioutil.SynchronizedWriter{W: outputWriter} + execCmd.Stderr = stdout + execCmd.Stdout = stdout + + return execCmd.Run() +} + +func (commandHandler) ActionType() reflect.Type { + return reflect.TypeOf(&v1.Hook_ActionCommand{}) +} + +func init() { + DefaultRegistry().RegisterHandler(&commandHandler{}) +} diff --git a/internal/hook/types/discord.go b/internal/hook/types/discord.go new file mode 100644 index 00000000..1c755ff4 --- /dev/null +++ b/internal/hook/types/discord.go @@ -0,0 +1,50 @@ +package types + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "reflect" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/hook/hookutil" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" +) + +type discordHandler struct{} + +func (discordHandler) Name() string { + return "discord" +} + +func (discordHandler) Execute(ctx context.Context, h *v1.Hook, vars interface{}, runner tasks.TaskRunner) error { + payload, err := hookutil.RenderTemplateOrDefault(h.GetActionDiscord().GetTemplate(), hookutil.DefaultTemplate, vars) + if err != nil { + return fmt.Errorf("template rendering: %w", err) + } + + writer := runner.RawLogWriter(ctx) + fmt.Fprintf(writer, "Sending discord message to %s\n", h.GetActionDiscord().GetWebhookUrl()) + fmt.Fprintf(writer, "---- payload ----\n%s\n", payload) + + type Message struct { + Content string `json:"content"` + } + + request := Message{ + Content: payload, // leading newline looks better in discord. + } + + requestBytes, _ := json.Marshal(request) + _, err = hookutil.PostRequest(h.GetActionDiscord().GetWebhookUrl(), "application/json", bytes.NewReader(requestBytes)) + return err +} + +func (discordHandler) ActionType() reflect.Type { + return reflect.TypeOf(&v1.Hook_ActionDiscord{}) +} + +func init() { + DefaultRegistry().RegisterHandler(&discordHandler{}) +} diff --git a/internal/hook/types/gotify.go b/internal/hook/types/gotify.go new file mode 100644 index 00000000..54ce0e04 --- /dev/null +++ b/internal/hook/types/gotify.go @@ -0,0 +1,83 @@ +package types + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + "reflect" + "strings" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/hook/hookutil" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" +) + +type gotifyHandler struct{} + +func (gotifyHandler) Name() string { + return "gotify" +} + +func (gotifyHandler) Execute(ctx context.Context, h *v1.Hook, vars interface{}, runner tasks.TaskRunner) error { + g := h.GetActionGotify() + + payload, err := hookutil.RenderTemplateOrDefault(g.GetTemplate(), hookutil.DefaultTemplate, vars) + if err != nil { + return fmt.Errorf("template rendering: %w", err) + } + + title, err := hookutil.RenderTemplateOrDefault(g.GetTitleTemplate(), "Backrest Event", vars) + if err != nil { + return fmt.Errorf("title template rendering: %w", err) + } + + output := runner.RawLogWriter(ctx) + + message := struct { + Message string `json:"message"` + Title string `json:"title"` + Priority int `json:"priority"` + }{ + Title: title, + Priority: 5, + Message: payload, + } + + b, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("json marshal: %w", err) + } + + baseUrl := strings.Trim(g.GetBaseUrl(), "/") + + postUrl := fmt.Sprintf( + "%s/message?token=%s", + baseUrl, + url.QueryEscape(g.GetToken())) + + fmt.Fprintf(output, "Sending gotify message to %s\n", postUrl) + fmt.Fprintf(output, "---- payload ----\n") + output.Write(b) + + body, err := hookutil.PostRequest(postUrl, "application/json", bytes.NewReader(b)) + + if err != nil { + return fmt.Errorf("send gotify message: %w", err) + } + + if body != "" { + output.Write([]byte(body)) + } + + return nil +} + +func (gotifyHandler) ActionType() reflect.Type { + return reflect.TypeOf(&v1.Hook_ActionGotify{}) +} + +func init() { + DefaultRegistry().RegisterHandler(&gotifyHandler{}) +} diff --git a/internal/hook/types/registry.go b/internal/hook/types/registry.go new file mode 100644 index 00000000..cb62fbe5 --- /dev/null +++ b/internal/hook/types/registry.go @@ -0,0 +1,45 @@ +package types + +import ( + "context" + "errors" + "fmt" + "reflect" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" +) + +var ErrHandlerNotFound = errors.New("handler not found") + +// defaultRegistry is the default handler registry. +var defaultRegistry = &HandlerRegistry{ + actionHandlers: make(map[reflect.Type]Handler), +} + +func DefaultRegistry() *HandlerRegistry { + return defaultRegistry +} + +type HandlerRegistry struct { + actionHandlers map[reflect.Type]Handler +} + +// RegisterHandler registers a handler with the default registry. +func (r *HandlerRegistry) RegisterHandler(handler Handler) { + r.actionHandlers[handler.ActionType()] = handler +} + +func (r *HandlerRegistry) GetHandler(hook *v1.Hook) (Handler, error) { + handler, ok := r.actionHandlers[reflect.TypeOf(hook.Action)] + if !ok { + return nil, fmt.Errorf("hook type %T: %w", hook.Action, ErrHandlerNotFound) + } + return handler, nil +} + +type Handler interface { + Name() string + Execute(ctx context.Context, hook *v1.Hook, vars interface{}, runner tasks.TaskRunner) error + ActionType() reflect.Type +} diff --git a/internal/hook/types/shoutrrr.go b/internal/hook/types/shoutrrr.go new file mode 100644 index 00000000..98eddfab --- /dev/null +++ b/internal/hook/types/shoutrrr.go @@ -0,0 +1,43 @@ +package types + +import ( + "context" + "fmt" + "reflect" + + "github.com/containrrr/shoutrrr" + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/hook/hookutil" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" +) + +type shoutrrrHandler struct{} + +func (shoutrrrHandler) Name() string { + return "shoutrrr" +} + +func (shoutrrrHandler) Execute(ctx context.Context, h *v1.Hook, vars interface{}, runner tasks.TaskRunner) error { + payload, err := hookutil.RenderTemplateOrDefault(h.GetActionShoutrrr().GetTemplate(), hookutil.DefaultTemplate, vars) + if err != nil { + return fmt.Errorf("template rendering: %w", err) + } + + writer := runner.RawLogWriter(ctx) + fmt.Fprintf(writer, "Sending shoutrrr message to %s\n", h.GetActionShoutrrr().GetShoutrrrUrl()) + fmt.Fprintf(writer, "---- payload ----\n%s\n", payload) + + if err := shoutrrr.Send(h.GetActionShoutrrr().GetShoutrrrUrl(), payload); err != nil { + return fmt.Errorf("sending shoutrrr message to %q: %w", h.GetActionShoutrrr().GetShoutrrrUrl(), err) + } + + return nil +} + +func (shoutrrrHandler) ActionType() reflect.Type { + return reflect.TypeOf(&v1.Hook_ActionShoutrrr{}) +} + +func init() { + DefaultRegistry().RegisterHandler(&shoutrrrHandler{}) +} diff --git a/internal/hook/types/slack.go b/internal/hook/types/slack.go new file mode 100644 index 00000000..995aa6bb --- /dev/null +++ b/internal/hook/types/slack.go @@ -0,0 +1,51 @@ +package types + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "reflect" + + v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/hook/hookutil" + "github.com/garethgeorge/backrest/internal/orchestrator/tasks" +) + +type slackHandler struct{} + +func (slackHandler) Name() string { + return "slack" +} + +func (slackHandler) Execute(ctx context.Context, cmd *v1.Hook, vars interface{}, runner tasks.TaskRunner) error { + payload, err := hookutil.RenderTemplateOrDefault(cmd.GetActionSlack().GetTemplate(), hookutil.DefaultTemplate, vars) + if err != nil { + return fmt.Errorf("template rendering: %w", err) + } + + writer := runner.RawLogWriter(ctx) + fmt.Fprintf(writer, "Sending slack message to %s\n", cmd.GetActionSlack().GetWebhookUrl()) + fmt.Fprintf(writer, "---- payload ----\n%s\n", payload) + + type Message struct { + Text string `json:"text"` + } + + request := Message{ + Text: "Backrest Notification\n" + payload, // leading newline looks better in discord. + } + + requestBytes, _ := json.Marshal(request) + + _, err = hookutil.PostRequest(cmd.GetActionSlack().GetWebhookUrl(), "application/json", bytes.NewReader(requestBytes)) + return err +} + +func (slackHandler) ActionType() reflect.Type { + return reflect.TypeOf(&v1.Hook_ActionSlack{}) +} + +func init() { + DefaultRegistry().RegisterHandler(&slackHandler{}) +} diff --git a/internal/oplog/oplog.go b/internal/oplog/oplog.go index 36f94b2d..ead79e6d 100644 --- a/internal/oplog/oplog.go +++ b/internal/oplog/oplog.go @@ -222,7 +222,7 @@ func (o *OpLog) notifyHelper(old *v1.Operation, new *v1.Operation) { o.subscribersMu.RUnlock() for _, sub := range subscribers { - (*sub)(old, new) + go (*sub)(old, new) } } diff --git a/internal/orchestrator/logging/logging.go b/internal/orchestrator/logging/logging.go index 03c98f60..7554bf11 100644 --- a/internal/orchestrator/logging/logging.go +++ b/internal/orchestrator/logging/logging.go @@ -39,7 +39,7 @@ func Logger(ctx context.Context) *zap.Logger { fe := zapcore.NewConsoleEncoder(p) l := zap.New(zapcore.NewTee( zap.L().Core(), - zapcore.NewCore(fe, zapcore.AddSync(&ioutil.LinePrefixer{W: writer, Prefix: []byte("[tasklog]")}), zapcore.DebugLevel), + zapcore.NewCore(fe, zapcore.AddSync(&ioutil.LinePrefixer{W: writer, Prefix: []byte("[tasklog] ")}), zapcore.DebugLevel), )) return l } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index d62af28d..30cfe6b3 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -10,7 +10,7 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" + "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator/logging" @@ -28,13 +28,13 @@ var ErrPlanNotFound = errors.New("plan not found") // Orchestrator is responsible for managing repos and backups. type Orchestrator struct { - mu sync.Mutex - config *v1.Config - OpLog *oplog.OpLog - repoPool *resticRepoPool - taskQueue *queue.TimePriorityQueue[stContainer] - hookExecutor *hook.HookExecutor - logStore *rotatinglog.RotatingLog + mu sync.Mutex + config *v1.Config + OpLog *oplog.OpLog + repoPool *resticRepoPool + taskQueue *queue.TimePriorityQueue[stContainer] + readyTaskQueues map[string]chan tasks.Task + logStore *rotatinglog.RotatingLog // cancelNotify is a list of channels that are notified when a task should be cancelled. cancelNotify []chan int64 @@ -43,6 +43,8 @@ type Orchestrator struct { now func() time.Time } +var _ tasks.TaskExecutor = &Orchestrator{} + type stContainer struct { tasks.ScheduledTask configModno int32 @@ -191,30 +193,26 @@ func (o *Orchestrator) GetRepoOrchestrator(repoId string) (repo *repo.RepoOrches return r, nil } -func (o *Orchestrator) GetRepo(repoId string) (*v1.Repo, error) { +func (o *Orchestrator) GetRepo(repoID string) (*v1.Repo, error) { o.mu.Lock() defer o.mu.Unlock() - for _, r := range o.config.Repos { - if r.GetId() == repoId { - return r, nil - } + repo := config.FindRepo(o.config, repoID) + if repo == nil { + return nil, fmt.Errorf("get repo %q: %w", repoID, ErrRepoNotFound) } - - return nil, fmt.Errorf("get repo %q: %w", repoId, ErrRepoNotFound) + return repo, nil } -func (o *Orchestrator) GetPlan(planId string) (*v1.Plan, error) { +func (o *Orchestrator) GetPlan(planID string) (*v1.Plan, error) { o.mu.Lock() defer o.mu.Unlock() - for _, p := range o.config.Plans { - if p.Id == planId { - return p, nil - } + plan := config.FindPlan(o.config, planID) + if plan == nil { + return nil, fmt.Errorf("get plan %q: %w", planID, ErrPlanNotFound) } - - return nil, fmt.Errorf("get plan %q: %w", planId, ErrPlanNotFound) + return plan, nil } func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationStatus) error { @@ -285,12 +283,7 @@ func (o *Orchestrator) Run(ctx context.Context) { continue } - zap.L().Info("running task", zap.String("task", t.Task.Name())) - - logs := bytes.NewBuffer(nil) taskCtx, cancelTaskCtx := context.WithCancel(ctx) - taskCtx = logging.ContextWithWriter(taskCtx, &ioutil.SynchronizedWriter{W: logs}) - go func() { for { select { @@ -304,85 +297,95 @@ func (o *Orchestrator) Run(ctx context.Context) { } }() - start := time.Now() - runner := newTaskRunnerImpl(o, t.Task, t.Op) + err := o.RunTask(taskCtx, t.ScheduledTask) - op := t.Op - if op != nil { - op.UnixTimeStartMs = time.Now().UnixMilli() - if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_UNKNOWN { - op.Status = v1.OperationStatus_STATUS_INPROGRESS - } - if op.Id != 0 { - if err := o.OpLog.Update(op); err != nil { - zap.S().Errorf("failed to add operation to oplog: %w", err) - } - } else { - if err := o.OpLog.Add(op); err != nil { - zap.S().Errorf("failed to add operation to oplog: %w", err) - } + o.mu.Lock() + curCfgModno := o.config.Modno + o.mu.Unlock() + if t.configModno == curCfgModno { + // Only reschedule tasks if the config hasn't changed since the task was scheduled. + if err := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); err != nil { + zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(err)) } } + cancelTaskCtx() - err := t.Task.Run(taskCtx, t.ScheduledTask, runner) - if err != nil { - fmt.Fprintf(logs, "\ntask %q returned error: %v\n", t.Task.Name(), err) - } else { - fmt.Fprintf(logs, "\ntask %q completed successfully\n", t.Task.Name()) + for _, cb := range t.callbacks { + go cb(err) } + } +} - if op != nil { - // write logs to log storage for this task. - if logs.Len() > 0 { - ref, err := o.logStore.Write(logs.Bytes()) - if err != nil { - zap.S().Errorf("failed to write logs for task %q to log store: %v", t.Task.Name(), err) - } else { - op.Logref = ref - } - } +func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) error { + logs := bytes.NewBuffer(nil) + ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs}) - if err != nil { - if taskCtx.Err() != nil { - // task was cancelled - op.Status = v1.OperationStatus_STATUS_USER_CANCELLED - } else { - op.Status = v1.OperationStatus_STATUS_ERROR - } - op.DisplayMessage = err.Error() - } - op.UnixTimeEndMs = time.Now().UnixMilli() - if op.Status == v1.OperationStatus_STATUS_INPROGRESS { - op.Status = v1.OperationStatus_STATUS_SUCCESS + runner := newTaskRunnerImpl(o, st.Task, st.Op) + + zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339))) + + op := st.Op + if op != nil { + op.UnixTimeStartMs = time.Now().UnixMilli() + if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_UNKNOWN { + op.Status = v1.OperationStatus_STATUS_INPROGRESS + } + if op.Id != 0 { + if err := o.OpLog.Update(op); err != nil { + zap.S().Errorf("failed to add operation to oplog: %w", err) } - if e := o.OpLog.Update(op); e != nil { - zap.S().Errorf("failed to update operation in oplog: %v", e) + } else { + if err := o.OpLog.Add(op); err != nil { + zap.S().Errorf("failed to add operation to oplog: %w", err) } } + } - if err != nil { - zap.L().Error("task failed", zap.String("task", t.Task.Name()), zap.Error(err), zap.Duration("duration", time.Since(start))) - } else { - zap.L().Info("task finished", zap.String("task", t.Task.Name()), zap.Duration("duration", time.Since(start))) - } + start := time.Now() + err := st.Task.Run(ctx, st, runner) + if err != nil { + zap.L().Error("task failed", zap.String("task", st.Task.Name()), zap.Error(err), zap.Duration("duration", time.Since(start))) + fmt.Fprintf(logs, "\ntask %q returned error: %v\n", st.Task.Name(), err) + } else { + zap.L().Info("task finished", zap.String("task", st.Task.Name()), zap.Duration("duration", time.Since(start))) + fmt.Fprintf(logs, "\ntask %q completed successfully\n", st.Task.Name()) + } - o.mu.Lock() - curCfgModno := o.config.Modno - o.mu.Unlock() - if t.configModno == curCfgModno { - // Only reschedule tasks if the config hasn't changed since the task was scheduled. - if err := o.ScheduleTask(t.Task, tasks.TaskPriorityDefault); err != nil { - zap.L().Error("reschedule task", zap.String("task", t.Task.Name()), zap.Error(err)) + if op != nil { + // write logs to log storage for this task. + if logs.Len() > 0 { + ref, err := o.logStore.Write(logs.Bytes()) + if err != nil { + zap.S().Errorf("failed to write logs for task %q to log store: %v", st.Task.Name(), err) + } else { + op.Logref = ref } } - cancelTaskCtx() + if err != nil { + if ctx.Err() != nil || errors.Is(err, tasks.ErrTaskCancelled) { + // task was cancelled + op.Status = v1.OperationStatus_STATUS_USER_CANCELLED + } else if err != nil { + op.Status = v1.OperationStatus_STATUS_ERROR + } - go func() { - for _, cb := range t.callbacks { - cb(err) + // prepend the error to the display + if op.DisplayMessage != "" { + op.DisplayMessage = err.Error() + "\n\n" + op.DisplayMessage + } else { + op.DisplayMessage = err.Error() } - }() + } + op.UnixTimeEndMs = time.Now().UnixMilli() + if op.Status == v1.OperationStatus_STATUS_INPROGRESS { + op.Status = v1.OperationStatus_STATUS_SUCCESS + } + if e := o.OpLog.Update(op); e != nil { + zap.S().Errorf("failed to update operation in oplog: %v", e) + } } + + return err } // ScheduleTask schedules a task to run at the next available time. @@ -418,8 +421,8 @@ func (o *Orchestrator) scheduleTaskHelper(t tasks.Task, priority int, curTime ti } } - zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.RunAt.Format(time.RFC3339))) o.taskQueue.Enqueue(nextRun.RunAt, priority, stc) + zap.L().Info("scheduled task", zap.String("task", t.Name()), zap.String("runAt", nextRun.RunAt.Format(time.RFC3339))) return nil } @@ -474,8 +477,3 @@ func (rp *resticRepoPool) GetRepo(repoId string) (*repo.RepoOrchestrator, error) rp.repos[repoId] = r return r, nil } - -type taskExecutionInfo struct { - operationId int64 - cancel func() -} diff --git a/internal/orchestrator/taskrunnerimpl.go b/internal/orchestrator/taskrunnerimpl.go index 396aa143..0a8e3b3f 100644 --- a/internal/orchestrator/taskrunnerimpl.go +++ b/internal/orchestrator/taskrunnerimpl.go @@ -2,6 +2,9 @@ package orchestrator import ( "context" + "errors" + "fmt" + "io" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" @@ -25,7 +28,15 @@ type taskRunnerImpl struct { var _ tasks.TaskRunner = &taskRunnerImpl{} -func (t *taskRunnerImpl) FindRepo() (*v1.Repo, error) { +func newTaskRunnerImpl(orchestrator *Orchestrator, task tasks.Task, op *v1.Operation) *taskRunnerImpl { + return &taskRunnerImpl{ + orchestrator: orchestrator, + t: task, + op: op, + } +} + +func (t *taskRunnerImpl) findRepo() (*v1.Repo, error) { if t.repo != nil { return t.repo, nil } @@ -34,7 +45,7 @@ func (t *taskRunnerImpl) FindRepo() (*v1.Repo, error) { return t.repo, err } -func (t *taskRunnerImpl) FindPlan() (*v1.Plan, error) { +func (t *taskRunnerImpl) findPlan() (*v1.Plan, error) { if t.plan != nil { return t.plan, nil } @@ -43,14 +54,6 @@ func (t *taskRunnerImpl) FindPlan() (*v1.Plan, error) { return t.plan, err } -func newTaskRunnerImpl(orchestrator *Orchestrator, task tasks.Task, op *v1.Operation) *taskRunnerImpl { - return &taskRunnerImpl{ - orchestrator: orchestrator, - t: task, - op: op, - } -} - func (t *taskRunnerImpl) CreateOperation(op *v1.Operation) error { op.InstanceId = t.orchestrator.config.Instance return t.orchestrator.OpLog.Add(op) @@ -69,39 +72,64 @@ func (t *taskRunnerImpl) OpLog() *oplog.OpLog { return t.orchestrator.OpLog } -func (t *taskRunnerImpl) ExecuteHooks(events []v1.Hook_Condition, vars hook.HookVars) error { +func (t *taskRunnerImpl) ExecuteHooks(ctx context.Context, events []v1.Hook_Condition, vars tasks.HookVars) error { vars.Task = t.t.Name() if t.op != nil { vars.Duration = time.Since(time.UnixMilli(t.op.UnixTimeStartMs)) } + vars.CurTime = time.Now() + repoID := t.t.RepoID() planID := t.t.PlanID() var repo *v1.Repo var plan *v1.Plan if repoID != "" { var err error - repo, err = t.FindRepo() + repo, err = t.findRepo() if err != nil { return err } + vars.Repo = repo } if planID != "" { - plan, _ = t.FindPlan() + plan, _ = t.findPlan() + vars.Plan = plan } - var flowID int64 - if t.op != nil { - flowID = t.op.FlowId + + hookTasks, err := hook.TasksTriggeredByEvent(t.Config(), repoID, planID, t.op, events, vars) + if err != nil { + return err } - executor := hook.NewHookExecutor(t.Config(), t.orchestrator.OpLog, t.orchestrator.logStore) - return executor.ExecuteHooks(flowID, repo, plan, events, vars) + + for _, task := range hookTasks { + st, _ := task.Next(time.Now(), t) + st.Task = task + if err := t.OpLog().Add(st.Op); err != nil { + return fmt.Errorf("%v: %w", task.Name(), err) + } + if err := t.orchestrator.RunTask(ctx, st); hook.IsHaltingError(err) { + var cancelErr *hook.HookErrorRequestCancel + if errors.As(err, &cancelErr) { + return fmt.Errorf("%w: %w", tasks.ErrTaskCancelled, err) + } + return fmt.Errorf("%v: %w", task.Name(), err) + } + } + return nil } func (t *taskRunnerImpl) GetRepo(repoID string) (*v1.Repo, error) { + if repoID == t.t.RepoID() { + return t.findRepo() // optimization for the common case of the current repo + } return t.orchestrator.GetRepo(repoID) } func (t *taskRunnerImpl) GetPlan(planID string) (*v1.Plan, error) { + if planID == t.t.PlanID() { + return t.findPlan() // optimization for the common case of the current plan + } return t.orchestrator.GetPlan(planID) } @@ -124,3 +152,7 @@ func (t *taskRunnerImpl) Config() *v1.Config { func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger { return logging.Logger(ctx).Named(t.t.Name()) } + +func (t *taskRunnerImpl) RawLogWriter(ctx context.Context) io.Writer { + return logging.WriterFromContext(ctx) +} diff --git a/internal/orchestrator/tasks/errors.go b/internal/orchestrator/tasks/errors.go new file mode 100644 index 00000000..ca02b9f4 --- /dev/null +++ b/internal/orchestrator/tasks/errors.go @@ -0,0 +1,8 @@ +package tasks + +import "errors" + +// ErrTaskCancelled signals that the task is beign cancelled gracefully. +// This error is handled by marking the task as user cancelled. +// By default a task returning an error will be marked as failed otherwise. +var ErrTaskCancelled = errors.New("cancel task") diff --git a/internal/hook/hookvars.go b/internal/orchestrator/tasks/hookvars.go similarity index 99% rename from internal/hook/hookvars.go rename to internal/orchestrator/tasks/hookvars.go index f6f0b5a7..cc80d982 100644 --- a/internal/hook/hookvars.go +++ b/internal/orchestrator/tasks/hookvars.go @@ -1,4 +1,4 @@ -package hook +package tasks import ( "bytes" diff --git a/internal/orchestrator/tasks/task.go b/internal/orchestrator/tasks/task.go index c3b904cd..c7fe9c01 100644 --- a/internal/orchestrator/tasks/task.go +++ b/internal/orchestrator/tasks/task.go @@ -2,10 +2,10 @@ package tasks import ( "context" + "io" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator/repo" "go.uber.org/zap" @@ -35,7 +35,7 @@ type TaskRunner interface { // UpdateOperation updates the operation in storage. It must be called after CreateOperation. UpdateOperation(*v1.Operation) error // ExecuteHooks - ExecuteHooks(events []v1.Hook_Condition, vars hook.HookVars) error + ExecuteHooks(ctx context.Context, events []v1.Hook_Condition, vars HookVars) error // OpLog returns the oplog for the operations. OpLog() *oplog.OpLog // GetRepo returns the repo with the given ID. @@ -50,6 +50,12 @@ type TaskRunner interface { Config() *v1.Config // Logger returns the logger. Logger(ctx context.Context) *zap.Logger + // RawLogWriter returns a writer for raw logs. + RawLogWriter(ctx context.Context) io.Writer +} + +type TaskExecutor interface { + RunTask(ctx context.Context, st ScheduledTask) error } // ScheduledTask is a task that is scheduled to run at a specific time. @@ -114,8 +120,8 @@ func (o *OneoffTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, erro var op *v1.Operation if o.ProtoOp != nil { op = proto.Clone(o.ProtoOp).(*v1.Operation) - op.PlanId = o.PlanID() op.RepoId = o.RepoID() + op.PlanId = o.PlanID() op.FlowId = o.FlowID op.UnixTimeStartMs = timeToUnixMillis(o.RunAt) // TODO: this should be updated before Run is called. op.Status = v1.OperationStatus_STATUS_PENDING @@ -128,7 +134,6 @@ func (o *OneoffTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, erro } type GenericOneoffTask struct { - BaseTask OneoffTask Do func(ctx context.Context, st ScheduledTask, runner TaskRunner) error } diff --git a/internal/orchestrator/tasks/taskbackup.go b/internal/orchestrator/tasks/taskbackup.go index bd44c0f5..5201de0a 100644 --- a/internal/orchestrator/tasks/taskbackup.go +++ b/internal/orchestrator/tasks/taskbackup.go @@ -9,7 +9,6 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/protoutil" "github.com/garethgeorge/backrest/pkg/restic" "go.uber.org/zap" @@ -106,16 +105,10 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne return err } - if err := runner.ExecuteHooks([]v1.Hook_Condition{ + if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_SNAPSHOT_START, - }, hook.HookVars{}); err != nil { - var cancelErr *hook.HookErrorRequestCancel - if errors.As(err, &cancelErr) { - op.Status = v1.OperationStatus_STATUS_USER_CANCELLED // user visible cancelled status - op.DisplayMessage = err.Error() - return nil - } - return fmt.Errorf("hook failed: %w", err) + }, HookVars{}); err != nil { + return fmt.Errorf("snapshot start hook: %w", err) } var sendWg sync.WaitGroup @@ -169,7 +162,7 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne summary = &restic.BackupProgressEntry{} } - vars := hook.HookVars{ + vars := HookVars{ Task: t.Name(), SnapshotStats: summary, SnapshotId: summary.SnapshotId, @@ -178,7 +171,7 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne if err != nil { vars.Error = err.Error() if !errors.Is(err, restic.ErrPartialBackup) { - runner.ExecuteHooks([]v1.Hook_Condition{ + runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_SNAPSHOT_ERROR, v1.Hook_CONDITION_ANY_ERROR, v1.Hook_CONDITION_SNAPSHOT_END, @@ -190,12 +183,12 @@ func (t *BackupTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunne op.Status = v1.OperationStatus_STATUS_WARNING op.DisplayMessage = "Partial backup, some files may not have been read completely." - runner.ExecuteHooks([]v1.Hook_Condition{ + runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_SNAPSHOT_WARNING, v1.Hook_CONDITION_SNAPSHOT_END, }, vars) } else { - runner.ExecuteHooks([]v1.Hook_Condition{ + runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_SNAPSHOT_SUCCESS, v1.Hook_CONDITION_SNAPSHOT_END, }, vars) diff --git a/internal/orchestrator/tasks/taskcheck.go b/internal/orchestrator/tasks/taskcheck.go index e0243c02..b753bc1d 100644 --- a/internal/orchestrator/tasks/taskcheck.go +++ b/internal/orchestrator/tasks/taskcheck.go @@ -9,7 +9,6 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/indexutil" @@ -75,8 +74,6 @@ func (t *CheckTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, error lastRan = time.Now() } - zap.L().Debug("last check time", zap.Time("time", lastRan), zap.String("repo", t.RepoID())) - runAt, err := protoutil.ResolveSchedule(repo.CheckPolicy.GetSchedule(), lastRan) if errors.Is(err, protoutil.ErrScheduleDisabled) { return NeverScheduledTask, nil @@ -101,18 +98,10 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner return fmt.Errorf("couldn't get repo %q: %w", t.RepoID(), err) } - if err := runner.ExecuteHooks([]v1.Hook_Condition{ + if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_CHECK_START, - }, hook.HookVars{}); err != nil { - // TODO: generalize this logic - op.DisplayMessage = err.Error() - var cancelErr *hook.HookErrorRequestCancel - if errors.As(err, &cancelErr) { - op.Status = v1.OperationStatus_STATUS_USER_CANCELLED // user visible cancelled status - return nil - } - op.Status = v1.OperationStatus_STATUS_ERROR - return fmt.Errorf("execute check start hooks: %w", err) + }, HookVars{}); err != nil { + return fmt.Errorf("check start hook: %w", err) } err = repo.UnlockIfAutoEnabled(ctx) @@ -157,10 +146,10 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner if err := repo.Check(ctx, bufWriter); err != nil { cancel() - runner.ExecuteHooks([]v1.Hook_Condition{ + runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_CHECK_ERROR, v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Error: err.Error(), }) @@ -176,9 +165,9 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner zap.L().Error("schedule stats task", zap.Error(err)) } - if err := runner.ExecuteHooks([]v1.Hook_Condition{ + if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_CHECK_SUCCESS, - }, hook.HookVars{}); err != nil { + }, HookVars{}); err != nil { return fmt.Errorf("execute prune success hooks: %w", err) } diff --git a/internal/orchestrator/tasks/taskforget.go b/internal/orchestrator/tasks/taskforget.go index bee1cfdd..9335726d 100644 --- a/internal/orchestrator/tasks/taskforget.go +++ b/internal/orchestrator/tasks/taskforget.go @@ -6,7 +6,6 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/indexutil" "github.com/garethgeorge/backrest/internal/orchestrator/repo" @@ -16,12 +15,12 @@ import ( func NewOneoffForgetTask(repoID, planID string, flowID int64, at time.Time) Task { return &GenericOneoffTask{ - BaseTask: BaseTask{ - TaskName: fmt.Sprintf("forget for plan %q in repo %q", repoID, planID), - TaskRepoID: repoID, - TaskPlanID: planID, - }, OneoffTask: OneoffTask{ + BaseTask: BaseTask{ + TaskName: fmt.Sprintf("forget for plan %q in repo %q", repoID, planID), + TaskRepoID: repoID, + TaskPlanID: planID, + }, FlowID: flowID, RunAt: at, ProtoOp: &v1.Operation{ @@ -36,9 +35,9 @@ func NewOneoffForgetTask(repoID, planID string, flowID int64, at time.Time) Task } if err := forgetHelper(ctx, st, taskRunner); err != nil { - taskRunner.ExecuteHooks([]v1.Hook_Condition{ + taskRunner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Error: err.Error(), }) return err diff --git a/internal/orchestrator/tasks/taskforgetsnapshot.go b/internal/orchestrator/tasks/taskforgetsnapshot.go index 8d565c9b..46c4351d 100644 --- a/internal/orchestrator/tasks/taskforgetsnapshot.go +++ b/internal/orchestrator/tasks/taskforgetsnapshot.go @@ -6,17 +6,16 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" ) func NewOneoffForgetSnapshotTask(repoID, planID string, flowID int64, at time.Time, snapshotID string) Task { return &GenericOneoffTask{ - BaseTask: BaseTask{ - TaskName: fmt.Sprintf("forget snapshot %q for plan %q in repo %q", snapshotID, planID, repoID), - TaskRepoID: repoID, - TaskPlanID: planID, - }, OneoffTask: OneoffTask{ + BaseTask: BaseTask{ + TaskName: fmt.Sprintf("forget snapshot %q for plan %q in repo %q", snapshotID, planID, repoID), + TaskRepoID: repoID, + TaskPlanID: planID, + }, FlowID: flowID, RunAt: at, ProtoOp: &v1.Operation{ @@ -31,9 +30,9 @@ func NewOneoffForgetSnapshotTask(repoID, planID string, flowID int64, at time.Ti } if err := forgetSnapshotHelper(ctx, st, taskRunner, snapshotID); err != nil { - taskRunner.ExecuteHooks([]v1.Hook_Condition{ + taskRunner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Error: err.Error(), }) return err diff --git a/internal/orchestrator/tasks/taskindexsnapshots.go b/internal/orchestrator/tasks/taskindexsnapshots.go index 40a80d53..3c9ec155 100644 --- a/internal/orchestrator/tasks/taskindexsnapshots.go +++ b/internal/orchestrator/tasks/taskindexsnapshots.go @@ -8,7 +8,6 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/indexutil" "github.com/garethgeorge/backrest/internal/orchestrator/repo" @@ -19,19 +18,19 @@ import ( func NewOneoffIndexSnapshotsTask(repoID string, at time.Time) Task { return &GenericOneoffTask{ - BaseTask: BaseTask{ - TaskName: fmt.Sprintf("index snapshots for repo %q", repoID), - TaskRepoID: repoID, - }, OneoffTask: OneoffTask{ + BaseTask: BaseTask{ + TaskName: fmt.Sprintf("index snapshots for repo %q", repoID), + TaskRepoID: repoID, + }, RunAt: at, ProtoOp: nil, }, Do: func(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error { if err := indexSnapshotsHelper(ctx, st, taskRunner); err != nil { - taskRunner.ExecuteHooks([]v1.Hook_Condition{ + taskRunner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Task: st.Task.Name(), Error: err.Error(), }) diff --git a/internal/orchestrator/tasks/taskprune.go b/internal/orchestrator/tasks/taskprune.go index 526aca99..574a2320 100644 --- a/internal/orchestrator/tasks/taskprune.go +++ b/internal/orchestrator/tasks/taskprune.go @@ -9,7 +9,6 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/indexutil" @@ -75,8 +74,6 @@ func (t *PruneTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, error lastRan = time.Now() } - zap.L().Debug("last prune time", zap.Time("time", lastRan), zap.String("repo", t.RepoID())) - runAt, err := protoutil.ResolveSchedule(repo.PrunePolicy.GetSchedule(), lastRan) if errors.Is(err, protoutil.ErrScheduleDisabled) { return NeverScheduledTask, nil @@ -101,18 +98,10 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner return fmt.Errorf("couldn't get repo %q: %w", t.RepoID(), err) } - if err := runner.ExecuteHooks([]v1.Hook_Condition{ + if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_PRUNE_START, - }, hook.HookVars{}); err != nil { - op.DisplayMessage = err.Error() - // TODO: generalize this logic - var cancelErr *hook.HookErrorRequestCancel - if errors.As(err, &cancelErr) { - op.Status = v1.OperationStatus_STATUS_USER_CANCELLED // user visible cancelled status - return nil - } - op.Status = v1.OperationStatus_STATUS_ERROR - return fmt.Errorf("execute prune start hooks: %w", err) + }, HookVars{}); err != nil { + return fmt.Errorf("prune start hook: %w", err) } err = repo.UnlockIfAutoEnabled(ctx) @@ -157,9 +146,9 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner if err := repo.Prune(ctx, bufWriter); err != nil { cancel() - runner.ExecuteHooks([]v1.Hook_Condition{ + runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Error: err.Error(), }) @@ -175,9 +164,9 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner zap.L().Error("schedule stats task", zap.Error(err)) } - if err := runner.ExecuteHooks([]v1.Hook_Condition{ + if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_PRUNE_SUCCESS, - }, hook.HookVars{}); err != nil { + }, HookVars{}); err != nil { return fmt.Errorf("execute prune end hooks: %w", err) } diff --git a/internal/orchestrator/tasks/taskrestore.go b/internal/orchestrator/tasks/taskrestore.go index 478eb89f..a10c0b1d 100644 --- a/internal/orchestrator/tasks/taskrestore.go +++ b/internal/orchestrator/tasks/taskrestore.go @@ -8,18 +8,17 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "go.uber.org/zap" ) func NewOneoffRestoreTask(repoID, planID string, flowID int64, at time.Time, snapshotID, path, target string) Task { return &GenericOneoffTask{ - BaseTask: BaseTask{ - TaskName: fmt.Sprintf("restore snapshot %q in repo %q", snapshotID, repoID), - TaskRepoID: repoID, - TaskPlanID: planID, - }, OneoffTask: OneoffTask{ + BaseTask: BaseTask{ + TaskName: fmt.Sprintf("restore snapshot %q in repo %q", snapshotID, repoID), + TaskRepoID: repoID, + TaskPlanID: planID, + }, FlowID: flowID, RunAt: at, ProtoOp: &v1.Operation{ @@ -34,9 +33,9 @@ func NewOneoffRestoreTask(repoID, planID string, flowID int64, at time.Time, sna }, Do: func(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error { if err := restoreHelper(ctx, st, taskRunner, snapshotID, path, target); err != nil { - taskRunner.ExecuteHooks([]v1.Hook_Condition{ + taskRunner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Task: st.Task.Name(), Error: err.Error(), }) diff --git a/internal/orchestrator/tasks/taskstats.go b/internal/orchestrator/tasks/taskstats.go index 3dcb405f..f1cd8aac 100644 --- a/internal/orchestrator/tasks/taskstats.go +++ b/internal/orchestrator/tasks/taskstats.go @@ -6,7 +6,6 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/indexutil" ) @@ -70,9 +69,9 @@ func (t *StatsTask) Next(now time.Time, runner TaskRunner) (ScheduledTask, error func (t *StatsTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner) error { if err := statsHelper(ctx, st, runner); err != nil { - runner.ExecuteHooks([]v1.Hook_Condition{ + runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_ANY_ERROR, - }, hook.HookVars{ + }, HookVars{ Task: st.Task.Name(), Error: err.Error(), }) diff --git a/internal/queue/genheap.go b/internal/queue/genheap.go index dde1c23e..4e46e2e6 100644 --- a/internal/queue/genheap.go +++ b/internal/queue/genheap.go @@ -1,23 +1,23 @@ package queue // genericHeap is a generic heap implementation that can be used with any type that satisfies the constraints.Ordered interface. -type genericHeap[T comparable[T]] []T +type GenericHeap[T Comparable[T]] []T -func (h genericHeap[T]) Len() int { +func (h GenericHeap[T]) Len() int { return len(h) } -func (h genericHeap[T]) Swap(i, j int) { +func (h GenericHeap[T]) Swap(i, j int) { h[i], h[j] = h[j], h[i] } // Push pushes an element onto the heap. Do not call directly, use heap.Push -func (h *genericHeap[T]) Push(x interface{}) { +func (h *GenericHeap[T]) Push(x interface{}) { *h = append(*h, x.(T)) } // Pop pops an element from the heap. Do not call directly, use heap.Pop -func (h *genericHeap[T]) Pop() interface{} { +func (h *GenericHeap[T]) Pop() interface{} { old := *h n := len(old) x := old[n-1] @@ -25,7 +25,7 @@ func (h *genericHeap[T]) Pop() interface{} { return x } -func (h genericHeap[T]) Peek() T { +func (h GenericHeap[T]) Peek() T { if len(h) == 0 { var zero T return zero @@ -33,10 +33,10 @@ func (h genericHeap[T]) Peek() T { return h[0] } -func (h genericHeap[T]) Less(i, j int) bool { +func (h GenericHeap[T]) Less(i, j int) bool { return h[i].Less(h[j]) } -type comparable[T any] interface { +type Comparable[T any] interface { Less(other T) bool } diff --git a/internal/queue/genheap_test.go b/internal/queue/genheap_test.go index 61bee966..e7167a62 100644 --- a/internal/queue/genheap_test.go +++ b/internal/queue/genheap_test.go @@ -19,7 +19,7 @@ func (v val) Eq(other val) bool { func TestGenericHeapInit(t *testing.T) { t.Parallel() - genHeap := genericHeap[val]{{v: 3}, {v: 2}, {v: 1}} + genHeap := GenericHeap[val]{{v: 3}, {v: 2}, {v: 1}} heap.Init(&genHeap) if genHeap.Len() != 3 { @@ -36,7 +36,7 @@ func TestGenericHeapInit(t *testing.T) { func TestGenericHeapPushPop(t *testing.T) { t.Parallel() - genHeap := genericHeap[val]{} // empty heap + genHeap := GenericHeap[val]{} // empty heap heap.Push(&genHeap, val{v: 3}) heap.Push(&genHeap, val{v: 2}) heap.Push(&genHeap, val{v: 1}) diff --git a/internal/queue/timepriorityqueue.go b/internal/queue/timepriorityqueue.go index de511ac0..a3f60fe7 100644 --- a/internal/queue/timepriorityqueue.go +++ b/internal/queue/timepriorityqueue.go @@ -9,13 +9,13 @@ import ( // TimePriorityQueue is a priority queue that dequeues elements at (or after) a specified time, and prioritizes elements based on a priority value. It is safe for concurrent use. type TimePriorityQueue[T equals[T]] struct { tqueue TimeQueue[priorityEntry[T]] - ready genericHeap[priorityEntry[T]] + ready GenericHeap[priorityEntry[T]] } func NewTimePriorityQueue[T equals[T]]() *TimePriorityQueue[T] { return &TimePriorityQueue[T]{ tqueue: TimeQueue[priorityEntry[T]]{}, - ready: genericHeap[priorityEntry[T]]{}, + ready: GenericHeap[priorityEntry[T]]{}, } } diff --git a/internal/queue/timequeue.go b/internal/queue/timequeue.go index 0e9b409c..5674103b 100644 --- a/internal/queue/timequeue.go +++ b/internal/queue/timequeue.go @@ -10,7 +10,7 @@ import ( // TimeQueue is a priority queue that dequeues elements at (or after) a specified time. It is safe for concurrent use. type TimeQueue[T equals[T]] struct { - heap genericHeap[timeQueueEntry[T]] + heap GenericHeap[timeQueueEntry[T]] dequeueMu sync.Mutex mu sync.Mutex @@ -19,7 +19,7 @@ type TimeQueue[T equals[T]] struct { func NewTimeQueue[T equals[T]]() *TimeQueue[T] { return &TimeQueue[T]{ - heap: genericHeap[timeQueueEntry[T]]{}, + heap: GenericHeap[timeQueueEntry[T]]{}, } } diff --git a/proto/v1/operations.proto b/proto/v1/operations.proto index aaa84dda..01a8e62b 100644 --- a/proto/v1/operations.proto +++ b/proto/v1/operations.proto @@ -24,7 +24,7 @@ message Operation { OperationStatus status = 4; // required, unix time in milliseconds of the operation's creation (ID is derived from this) int64 unix_time_start_ms = 5; - // optional, unix time in milliseconds of the operation's completion + // ptional, unix time in milliseconds of the operation's completion int64 unix_time_end_ms = 6; // optional, human readable context message, typically an error message. string display_message = 7; @@ -40,7 +40,7 @@ message Operation { OperationStats operation_stats = 105; OperationRunHook operation_run_hook = 106; OperationCheck operation_check = 107; - } + } } // OperationEvent is used in the wireformat to stream operation changes to clients @@ -110,6 +110,7 @@ message OperationStats { // OperationRunHook tracks a hook that was run. message OperationRunHook { + int64 parent_op = 4; // ID of the operation that ran the hook. string name = 1; // description of the hook that was run. typically repo/hook_idx or plan/hook_idx. string output_logref = 2; // logref of the hook's output. DEPRECATED. Hook.Condition condition = 3; // triggering condition of the hook. diff --git a/webui/gen/ts/v1/operations_pb.ts b/webui/gen/ts/v1/operations_pb.ts index 0b8782dd..fc0ec563 100644 --- a/webui/gen/ts/v1/operations_pb.ts +++ b/webui/gen/ts/v1/operations_pb.ts @@ -206,7 +206,7 @@ export class Operation extends Message { unixTimeStartMs = protoInt64.zero; /** - * optional, unix time in milliseconds of the operation's completion + * ptional, unix time in milliseconds of the operation's completion * * @generated from field: int64 unix_time_end_ms = 6; */ @@ -699,6 +699,13 @@ export class OperationStats extends Message { * @generated from message v1.OperationRunHook */ export class OperationRunHook extends Message { + /** + * ID of the operation that ran the hook. + * + * @generated from field: int64 parent_op = 4; + */ + parentOp = protoInt64.zero; + /** * description of the hook that was run. typically repo/hook_idx or plan/hook_idx. * @@ -728,6 +735,7 @@ export class OperationRunHook extends Message { static readonly runtime: typeof proto3 = proto3; static readonly typeName = "v1.OperationRunHook"; static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 4, name: "parent_op", kind: "scalar", T: 3 /* ScalarType.INT64 */ }, { no: 1, name: "name", kind: "scalar", T: 9 /* ScalarType.STRING */ }, { no: 2, name: "output_logref", kind: "scalar", T: 9 /* ScalarType.STRING */ }, { no: 3, name: "condition", kind: "enum", T: proto3.getEnumType(Hook_Condition) }, diff --git a/webui/src/components/OperationList.tsx b/webui/src/components/OperationList.tsx index 66e8ba5c..0c0be7d2 100644 --- a/webui/src/components/OperationList.tsx +++ b/webui/src/components/OperationList.tsx @@ -10,6 +10,7 @@ import { BackupInfoCollector, getOperations, matchSelector, + shouldHideStatus, subscribeToOperations, unsubscribeFromOperations, } from "../state/oplog"; @@ -23,13 +24,13 @@ import { OperationRow } from "./OperationRow"; export const OperationList = ({ req, useBackups, + useOperations, showPlan, - filter, }: React.PropsWithoutRef<{ req?: GetOperationsRequest; - useBackups?: BackupInfo[]; + useBackups?: BackupInfo[]; // a backup to display; some operations will be filtered out e.g. hook executions. + useOperations?: Operation[]; // exact set of operations to display; no filtering will be applied. showPlan?: boolean; - filter?: (op: Operation) => boolean; // if provided, only operations that pass this filter will be displayed. }>) => { const alertApi = useAlertApi(); @@ -40,7 +41,9 @@ export const OperationList = ({ // track backups for this operation tree view. useEffect(() => { - const backupCollector = new BackupInfoCollector(); + const backupCollector = new BackupInfoCollector( + (op) => !shouldHideStatus(op.status) + ); backupCollector.subscribe( _.debounce( () => { @@ -63,7 +66,7 @@ export const OperationList = ({ backups = [...(useBackups || [])]; } - if (backups.length === 0) { + if (backups.length === 0 && !useOperations) { return ( b.operations); + const hookExecutionsForOperation: Map = new Map(); + let operations: Operation[] = []; + if (useOperations) { + operations = useOperations; + } else { + operations = backups + .flatMap((b) => b.operations) + .filter((op) => { + if (op.op.case === "operationRunHook") { + const parentOp = op.op.value.parentOp; + if (!hookExecutionsForOperation.has(parentOp)) { + hookExecutionsForOperation.set(parentOp, []); + } + hookExecutionsForOperation.get(parentOp)!.push(op); + return false; + } + return true; + }); + } operations.sort((a, b) => { return Number(b.unixTimeStartMs - a.unixTimeStartMs); }); @@ -81,13 +102,14 @@ export const OperationList = ({ itemLayout="horizontal" size="small" dataSource={operations} - renderItem={(op, index) => { + renderItem={(op) => { return ( ); }} diff --git a/webui/src/components/OperationRow.tsx b/webui/src/components/OperationRow.tsx index e41aa639..43e9a871 100644 --- a/webui/src/components/OperationRow.tsx +++ b/webui/src/components/OperationRow.tsx @@ -1,23 +1,21 @@ import React, { useEffect, useState } from "react"; import { Operation, - OperationEvent, - OperationEventType, OperationForget, - OperationRunHook, + OperationRestore, OperationStatus, } from "../../gen/ts/v1/operations_pb"; import { Button, Col, Collapse, - Empty, List, Modal, Progress, Row, Typography, } from "antd"; +import type { ItemType } from "rc-collapse/es/interface"; import { PaperClipOutlined, SaveOutlined, @@ -47,15 +45,19 @@ import { backrestService } from "../api"; import { useShowModal } from "./ModalManager"; import { proto3 } from "@bufbuild/protobuf"; import { Hook_Condition } from "../../gen/ts/v1/config_pb"; +import { useAlertApi } from "./Alerts"; +import { OperationList } from "./OperationList"; export const OperationRow = ({ operation, alertApi, showPlan, + hookOperations, }: React.PropsWithoutRef<{ operation: Operation; alertApi?: MessageInstance; - showPlan: boolean; + showPlan?: boolean; + hookOperations?: Operation[]; }>) => { const showModal = useShowModal(); const details = detailsForOperation(operation); @@ -108,6 +110,38 @@ export const OperationRow = ({ break; } + const doCancel = () => { + backrestService + .cancel({ value: operation.id! }) + .then(() => { + alertApi?.success("Requested to cancel operation"); + }) + .catch((e) => { + alertApi?.error("Failed to cancel operation: " + e.message); + }); + }; + + const doShowLogs = () => { + showModal( + { + showModal(null); + }} + > + + + ); + }; + const opName = displayTypeToString(getTypeForDisplay(operation)); let title = ( <> @@ -125,16 +159,7 @@ export const OperationRow = ({ type="link" size="small" className="backrest operation-details" - onClick={() => { - backrestService - .cancel({ value: operation.id! }) - .then(() => { - alertApi?.success("Requested to cancel operation"); - }) - .catch((e) => { - alertApi?.error("Failed to cancel operation: " + e.message); - }); - }} + onClick={doCancel} > [Cancel Operation] @@ -151,26 +176,7 @@ export const OperationRow = ({ type="link" size="middle" className="backrest operation-details" - onClick={() => { - showModal( - { - showModal(null); - }} - > - - - ); - }} + onClick={doShowLogs} > [View Logs] @@ -179,22 +185,23 @@ export const OperationRow = ({ ); } - let body: React.ReactNode | undefined; let displayMessage = operation.displayMessage; + const bodyItems: ItemType[] = []; + const expandedBodyItems: string[] = []; + if (operation.op.case === "operationBackup") { + expandedBodyItems.push("details"); const backupOp = operation.op.value; - const items: { key: number; label: string; children: React.ReactNode }[] = [ - { - key: 1, - label: "Backup Details", - children: , - }, - ]; + bodyItems.push({ + key: "details", + label: "Backup Details", + children: , + }); if (backupOp.errors.length > 0) { - items.splice(0, 0, { - key: 2, + bodyItems.push({ + key: "errors", label: "Item Errors", children: (
@@ -203,196 +210,188 @@ export const OperationRow = ({
         ),
       });
     }
-
-    body = (
-      <>
-        
-      
-    );
   } else if (operation.op.case === "operationIndexSnapshot") {
+    expandedBodyItems.push("details");
     const snapshotOp = operation.op.value;
-    body = (
-      
-    );
+    bodyItems.push({
+      key: "details",
+      label: "Details",
+      children: ,
+    });
+    bodyItems.push({
+      key: "browser",
+      label: "Snapshot Browser",
+      children: (
+        
+      ),
+    });
   } else if (operation.op.case === "operationForget") {
     const forgetOp = operation.op.value;
-    body = ;
+    bodyItems.push({
+      key: "forgot",
+      label: "Removed " + forgetOp.forget?.length + " Snapshots",
+      children: ,
+    });
   } else if (operation.op.case === "operationPrune") {
     const prune = operation.op.value;
-    body = (
-      {prune.output}
, - }, - ]} - /> - ); + bodyItems.push({ + key: "prune", + label: "Prune Output", + children:
{prune.output}
, + }); } else if (operation.op.case === "operationCheck") { const check = operation.op.value; - body = ( - {check.output}, - }, - ]} - /> - ); + bodyItems.push({ + key: "check", + label: "Check Output", + children:
{check.output}
, + }); } else if (operation.op.case === "operationRestore") { - const restore = operation.op.value; - const progress = Math.round((details.percentage || 0) * 10) / 10; - const st = restore.lastStatus! || {}; - - body = ( - <> - Restore {restore.path} to {restore.target} - {details.percentage !== undefined ? ( - - ) : null} - {operation.status == OperationStatus.STATUS_SUCCESS ? ( - <> - - - ) : null} -
- Snapshot ID: {normalizeSnapshotId(operation.snapshotId!)} - - - Bytes Done/Total -
- {formatBytes(Number(st.bytesRestored))}/ - {formatBytes(Number(st.totalBytes))} - - - Files Done/Total -
- {Number(st.filesRestored)}/{Number(st.totalFiles)} - -
- - ); + expandedBodyItems.push("restore"); + bodyItems.push({ + key: "restore", + label: "Restore Details", + children: , + }); } else if (operation.op.case === "operationRunHook") { const hook = operation.op.value; - const triggeringCondition = proto3 - .getEnumType(Hook_Condition) - .findNumber(hook.condition); - if (triggeringCondition !== undefined) { - displayMessage += "\ntriggered by condition: " + triggeringCondition.name; + if (operation.logref) { + bodyItems.push({ + key: "logref", + label: "Hook Output", + children: , + }); } } - const children = []; + if (hookOperations) { + bodyItems.push({ + key: "hookOperations", + label: "Hooks Triggered", + children: , + }); - if (operation.displayMessage) { - children.push( -
-
-          {details.state ? details.state + ": " : null}
-          {displayMessage}
-        
-
- ); + for (const op of hookOperations) { + if (op.status !== OperationStatus.STATUS_SUCCESS) { + expandedBodyItems.push("hookOperations"); + break; + } + } } - children.push(
{body}
); - return ( - + + {operation.displayMessage && ( +
+
+                  {details.state ? details.state + ": " : null}
+                  {displayMessage}
+                
+
+ )} + + + } + />
); }; -const SnapshotInfo = ({ - snapshot, - repoId, - planId, -}: { - snapshot: ResticSnapshot; - repoId: string; - planId?: string; -}) => { +const SnapshotDetails = ({ snapshot }: { snapshot: ResticSnapshot }) => { return ( - - - Snapshot ID: - {normalizeSnapshotId(snapshot.id!)} - - - - Host -
- {snapshot.hostname} - - - Username -
- {snapshot.hostname} - - - Tags -
- {snapshot.tags?.join(", ")} - -
- - ), - }, - { - key: 2, - label: "Browse and Restore Files in Backup", - children: ( - - ), - }, - ]} - /> + <> + + Snapshot ID: + {normalizeSnapshotId(snapshot.id!)} + + + + Host +
+ {snapshot.hostname} + + + Username +
+ {snapshot.hostname} + + + Tags +
+ {snapshot.tags?.join(", ")} + +
+ + ); +}; + +const RestoreOperationStatus = ({ operation }: { operation: Operation }) => { + const restoreOp = operation.op.value as OperationRestore; + const isDone = restoreOp.lastStatus?.messageType === "summary"; + const progress = restoreOp.lastStatus?.percentDone || 0; + const alertApi = useAlertApi(); + const lastStatus = restoreOp.lastStatus; + + return ( + <> + Restore {restoreOp.path} to {restoreOp.target} + {!isDone ? ( + + ) : null} + {operation.status == OperationStatus.STATUS_SUCCESS ? ( + <> + + + ) : null} +
+ Restored Snapshot ID: {normalizeSnapshotId(operation.snapshotId!)} + {lastStatus && ( + + + Bytes Done/Total +
+ {formatBytes(Number(lastStatus.bytesRestored))}/ + {formatBytes(Number(lastStatus.totalBytes))} + + + Files Done/Total +
+ {Number(lastStatus.filesRestored)}/{Number(lastStatus.totalFiles)} + +
+ )} + ); }; @@ -510,38 +509,26 @@ const ForgetOperationDetails = ({ } return ( - - Removed snapshots: -
-                {forgetOp.forget?.map((f) => (
-                  
- {"removed snapshot " + - normalizeSnapshotId(f.id!) + - " taken at " + - formatTime(Number(f.unixTimeMs))}{" "} -
-
- ))} -
- {/* Policy: + <> + Removed snapshots: +
+        {forgetOp.forget?.map((f) => (
+          
+ {"removed snapshot " + + normalizeSnapshotId(f.id!) + + " taken at " + + formatTime(Number(f.unixTimeMs))}{" "} +
+
+ ))} +
+ {/* Policy:
    {policyDesc.map((desc, idx) => (
  • {desc}
  • ))}
*/} - - ), - }, - ]} - /> + ); }; diff --git a/webui/src/components/OperationTree.tsx b/webui/src/components/OperationTree.tsx index 521c6520..f487163b 100644 --- a/webui/src/components/OperationTree.tsx +++ b/webui/src/components/OperationTree.tsx @@ -1,18 +1,13 @@ -import React, { useEffect, useMemo, useRef, useState } from "react"; +import React, { useEffect, useRef, useState } from "react"; import { BackupInfo, BackupInfoCollector, colorForStatus, detailsForOperation, displayTypeToString, - getOperations, getTypeForDisplay, - matchSelector, - shouldHideOperation, - subscribeToOperations, - unsubscribeFromOperations, } from "../state/oplog"; -import { Button, Col, Divider, Empty, Modal, Row, Tooltip, Tree } from "antd"; +import { Col, Empty, Modal, Row, Tooltip, Tree } from "antd"; import _ from "lodash"; import { DataNode } from "antd/es/tree"; import { @@ -28,11 +23,7 @@ import { QuestionOutlined, SaveOutlined, } from "@ant-design/icons"; -import { - OperationEvent, - OperationEventType, - OperationStatus, -} from "../../gen/ts/v1/operations_pb"; +import { OperationStatus } from "../../gen/ts/v1/operations_pb"; import { useAlertApi } from "./Alerts"; import { OperationList } from "./OperationList"; import { @@ -128,10 +119,10 @@ export const OperationTree = ({ } }} titleRender={(node: OpTreeNode): React.ReactNode => { - if (node.title) { + if (node.title !== undefined) { return <>{node.title}; } - if (node.backup) { + if (node.backup !== undefined) { const b = node.backup; const details: string[] = []; @@ -160,7 +151,7 @@ export const OperationTree = ({ ); } else if (b.backupLastStatus.entry.case === "status") { const s = b.backupLastStatus.entry.value; - const percent = Number(s.bytesDone / s.totalBytes) * 100; + const percent = Number(s.percentDone) * 100; details.push( `${percent.toFixed(1)}% processed ${formatBytes( Number(s.bytesDone) diff --git a/webui/src/state/oplog.ts b/webui/src/state/oplog.ts index c06fe17c..28707362 100644 --- a/webui/src/state/oplog.ts +++ b/webui/src/state/oplog.ts @@ -6,7 +6,7 @@ import { } from "../../gen/ts/v1/operations_pb"; import { GetOperationsRequest, OpSelector } from "../../gen/ts/v1/service_pb"; import { BackupProgressEntry, ResticSnapshot, RestoreProgressEntry } from "../../gen/ts/v1/restic_pb"; -import _ from "lodash"; +import _, { flow } from "lodash"; import { formatDuration, formatTime } from "../lib/formatting"; import { backrestService } from "../api"; import { @@ -63,7 +63,7 @@ export const unsubscribeFromOperations = ( export const getStatusForSelector = async (sel: OpSelector) => { const req = new GetOperationsRequest({ selector: sel, - lastN: BigInt(STATUS_OPERATION_HISTORY), + lastN: BigInt(20), }); return await getStatus(req); }; @@ -74,25 +74,21 @@ const getStatus = async (req: GetOperationsRequest) => { ops.sort((a, b) => { return Number(b.unixTimeStartMs - a.unixTimeStartMs); }); - if (ops.length === 0) { - return OperationStatus.STATUS_SUCCESS; - } - const flowId = ops.find((op) => op.status !== OperationStatus.STATUS_PENDING)?.flowId; - if (!flowId) { - return OperationStatus.STATUS_SUCCESS; - } + + let flowID: BigInt | undefined = undefined; for (const op of ops) { - if (op.status === OperationStatus.STATUS_PENDING) { + if (op.status === OperationStatus.STATUS_PENDING || op.status === OperationStatus.STATUS_SYSTEM_CANCELLED) { continue; } - if (op.flowId !== flowId) { + if (op.status !== OperationStatus.STATUS_SUCCESS) { + return op.status; + } + if (!flowID) { + flowID = op.flowId; + } else if (flowID !== op.flowId) { break; } - if ( - op.status !== OperationStatus.STATUS_SUCCESS && - op.status !== OperationStatus.STATUS_USER_CANCELLED && - op.status !== OperationStatus.STATUS_SYSTEM_CANCELLED - ) { + if (op.status !== OperationStatus.STATUS_SUCCESS) { return op.status; } } @@ -196,17 +192,10 @@ export class BackupInfoCollector { }); // use the lowest ID of all operations as the ID of the backup, this will be the first created operation. - const id = operations.reduce((prev, curr) => { - return prev < curr.id ? prev : curr.id; - }, operations[0].id!); - const startTimeMs = Number(operations[0].unixTimeStartMs); const endTimeMs = Number(operations[operations.length - 1].unixTimeEndMs!); const displayTime = new Date(startTimeMs); - let displayType = DisplayType.SNAPSHOT; - if (operations.length === 1) { - displayType = getTypeForDisplay(operations[0]); - } + const displayType = getTypeForDisplay(operations[0]); // use the latest status that is not a hidden status let statusIdx = operations.length - 1; @@ -369,8 +358,6 @@ export class BackupInfoCollector { export const shouldHideOperation = (operation: Operation) => { return ( operation.op.case === "operationStats" || - (operation.op.case === "operationRunHook" && - operation.status === OperationStatus.STATUS_SUCCESS) || shouldHideStatus(operation.status) ); }; @@ -437,7 +424,7 @@ export const colorForStatus = (status: OperationStatus) => { case OperationStatus.STATUS_SUCCESS: return "green"; case OperationStatus.STATUS_USER_CANCELLED: - return "yellow"; + return "orange"; default: return "grey"; } @@ -481,7 +468,7 @@ export const detailsForOperation = ( break; case OperationStatus.STATUS_USER_CANCELLED: state = "cancelled"; - color = "yellow"; + color = "orange"; break; default: state = ""; diff --git a/webui/src/views/App.tsx b/webui/src/views/App.tsx index ac476561..0110f86c 100644 --- a/webui/src/views/App.tsx +++ b/webui/src/views/App.tsx @@ -306,11 +306,14 @@ const IconForResource = ({ setStatus(await getStatusForSelector(new OpSelector({ planId, repoId }))); }; load(); - const refresh = _.debounce(load, 1000, { maxWait: 5000, trailing: true }); + const refresh = _.debounce(load, 1000, { maxWait: 10000, trailing: true }); const callback = (event?: OperationEvent, err?: Error) => { if (!event || !event.operation) return; const operation = event.operation; - if (operation.planId === planId || operation.repoId === repoId) { + if ( + (planId && operation.planId === planId) || + (repoId && operation.repoId === repoId) + ) { refresh(); } }; diff --git a/webui/src/views/PlanView.tsx b/webui/src/views/PlanView.tsx index 82c0f36c..c407e7a6 100644 --- a/webui/src/views/PlanView.tsx +++ b/webui/src/views/PlanView.tsx @@ -128,7 +128,6 @@ export const PlanView = ({ plan }: React.PropsWithChildren<{ plan: Plan }>) => { lastN: BigInt(MAX_OPERATION_HISTORY), }) } - filter={(op) => !shouldHideStatus(op.status)} /> ), diff --git a/webui/src/views/RepoView.tsx b/webui/src/views/RepoView.tsx index 9c9a7fb5..db35b24f 100644 --- a/webui/src/views/RepoView.tsx +++ b/webui/src/views/RepoView.tsx @@ -141,7 +141,6 @@ export const RepoView = ({ repo }: React.PropsWithChildren<{ repo: Repo }>) => { }) } showPlan={true} - filter={(op) => !shouldHideStatus(op.status)} /> ),