forked from harness/harness
-
Notifications
You must be signed in to change notification settings - Fork 0
/
executor.go
159 lines (133 loc) · 4.46 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"
"github.com/harness/gitness/pubsub"
"github.com/rs/zerolog/log"
)
// Executor holds map of Handler objects per each job type registered.
// The Scheduler uses the Executor to start execution of jobs.
type Executor struct {
handlerMap map[string]Handler
handlerComplete bool
store Store
publisher pubsub.Publisher
}
const (
ProgressMin = 0
ProgressMax = 100
)
// ProgressReporter can be used by a job Handler to report back the execution progress.
type ProgressReporter func(progress int, result string) error
// Handler is a job executor for a specific job type.
// An implementation should try to honor the context and
// try to abort the execution as soon as the context is done.
type Handler interface {
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
}
var errNoHandlerDefined = errors.New("no handler registered for the job type")
// NewExecutor creates new Executor.
func NewExecutor(store Store, publisher pubsub.Publisher) *Executor {
return &Executor{
handlerMap: make(map[string]Handler),
handlerComplete: false,
store: store,
publisher: publisher,
}
}
// Register registers a job Handler for the provided job type.
// This function is not thread safe. All calls are expected to be made
// in a single thread during the application boot time.
func (e *Executor) Register(jobType string, exec Handler) error {
if jobType == "" {
return errors.New("jobType must not be empty")
}
if e.handlerComplete {
return errors.New("job handler registration is complete")
}
if exec == nil {
return errors.New("provided Handler is nil")
}
if _, ok := e.handlerMap[jobType]; ok {
return fmt.Errorf("a Handler is already defined to run the '%s' job types", jobType)
}
e.handlerMap[jobType] = exec
return nil
}
// finishRegistration forbids further registration of job types.
// It is called by the Scheduler when it starts.
func (e *Executor) finishRegistration() {
e.handlerComplete = true
}
// exec runs a single job. This function is synchronous,
// so the caller is responsible to run it in a separate go-routine.
func (e *Executor) exec(
ctx context.Context,
jobUID, jobType string,
input string,
) (result string, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(
"panic while processing job=%s type=%s: %v\n%s",
jobUID, jobType, r, debug.Stack())
}
}()
exec, ok := e.handlerMap[jobType]
if !ok {
return "", errNoHandlerDefined
}
// progressReporter is the function with which the job can update its progress.
// This function will be executed in the job executor's Go-routine.
// It uses the job's context.
progressReporter := func(progress int, result string) error {
if progress < ProgressMin || progress > ProgressMax {
return errors.New("progress must be between 0 and 100")
}
jobDummy := &Job{
UID: jobUID,
Type: jobType,
Updated: time.Now().UnixMilli(),
Result: result,
State: JobStateRunning,
RunProgress: progress,
}
// This doesn't need to be behind the global lock because it only updates the single row.
// While a job is running no other process should touch it.
// Even this call will fail if the context deadline has been exceeded.
// The job parameter is a dummy Job object that just holds fields that should be updated.
if err := e.store.UpdateProgress(ctx, jobDummy); err != nil {
return err
}
// tell everybody that a job progress has been updated
if err := publishStateChange(ctx, e.publisher, jobDummy); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
return nil
}
return exec.Handle(ctx, input, progressReporter) // runs the job
}
func FailProgress() Progress {
return Progress{
State: JobStateFailed,
Progress: ProgressMax,
Result: "",
Failure: "",
}
}