forked from tsuru/tsuru
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaction.go
190 lines (165 loc) · 5.66 KB
/
action.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Copyright 2013 tsuru authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package action
import (
"fmt"
"sync"
"github.com/pkg/errors"
"github.com/tsuru/tsuru/log"
)
// Result is the value returned by Forward. It is used in the call of the next
// action, and also when rolling back the actions.
type Result interface{}
// Forward is the function called by the pipeline executor in the forward
// phase. It receives a FWContext instance, that contains the list of
// parameters given to the pipeline executor and the result of the previous
// action in the pipeline (which will be nil for the first action in the
// pipeline).
type Forward func(context FWContext) (Result, error)
// Backward is the function called by the pipeline executor when in the
// backward phase. It receives the context instance, that contains the list of
// parameters given to the pipeline executor and the result of the forward
// phase.
type Backward func(context BWContext)
type OnErrorFunc func(FWContext, error)
// FWContext is the context used in calls to Forward functions (forward phase).
type FWContext struct {
// Result of the previous action.
Previous Result
// List of parameters given to the executor.
Params []interface{}
}
// BWContext is the context used in calls to Backward functions (backward
// phase).
type BWContext struct {
// Result of the forward phase (for the current action).
FWResult Result
// List of parameters given to the executor.
Params []interface{}
}
// Action defines actions that should be . It is composed of two functions:
// Forward and Backward.
//
// Each action should do only one thing, and do it well. All information that
// is needed to undo the action should be returned by the Forward function.
type Action struct {
// Name is the action name. Used by the log.
Name string
// Function that will be invoked in the forward phase. This value
// cannot be nil.
Forward Forward
// Function that will be invoked in the backward phase. For actions
// that are not undoable, this attribute should be nil.
Backward Backward
// Minimum number of parameters that this action requires to run.
MinParams int
// Function that will be invoked after some failure occurured in the
// Forward phase of this same action.
OnError OnErrorFunc
// Result of the action. Stored for use in the backward phase.
result Result
// mutex for the result
rMutex sync.Mutex
}
// Pipeline is a list of actions. Each pipeline is atomic: either all actions
// are successfully executed, or none of them are. For that, it's fundamental
// that all actions are really small and atomic.
type Pipeline struct {
actions []*Action
}
var (
ErrPipelineNoActions = errors.New("No actions to execute.")
ErrPipelineForwardMissing = errors.New("All actions must define the forward function.")
ErrPipelineFewParameters = errors.New("Not enough parameters to call Action.Forward.")
)
// NewPipeline creates a new pipeline instance with the given list of actions.
func NewPipeline(actions ...*Action) *Pipeline {
// Actions are usually global functions, copying them
// guarantees each copy has an isolated Result.
newActions := make([]*Action, len(actions))
for i, action := range actions {
newAction := &Action{
Name: action.Name,
Forward: action.Forward,
Backward: action.Backward,
MinParams: action.MinParams,
OnError: action.OnError,
}
newActions[i] = newAction
}
return &Pipeline{actions: newActions}
}
// Result returns the result of the last action.
func (p *Pipeline) Result() Result {
action := p.actions[len(p.actions)-1]
action.rMutex.Lock()
defer action.rMutex.Unlock()
return action.result
}
// Execute executes the pipeline.
//
// The execution starts in the forward phase, calling the Forward function of
// all actions. If none of the Forward calls return error, the pipeline
// execution ends in the forward phase and is "committed".
//
// If any of the Forward calls fails, the executor switches to the backward phase
// (roll back) and call the Backward function for each action completed. It
// does not call the Backward function of the action that has failed.
//
// After rolling back all completed actions, it returns the original error
// returned by the action that failed.
func (p *Pipeline) Execute(params ...interface{}) (err error) {
var (
r Result
)
if len(p.actions) == 0 {
return ErrPipelineNoActions
}
fwCtx := FWContext{Params: params}
var i int
var a *Action
defer func() {
if r := recover(); r != nil {
log.Errorf("[pipeline] PANIC running the Forward for the %s action - %v", a.Name, r)
err = fmt.Errorf("panic running the Forward for the %s action: %v", a.Name, r)
if a.OnError != nil {
a.OnError(fwCtx, err)
}
p.rollback(i-1, params)
}
}()
for i, a = range p.actions {
log.Debugf("[pipeline] running the Forward for the %s action", a.Name)
if a.Forward == nil {
err = ErrPipelineForwardMissing
} else if len(fwCtx.Params) < a.MinParams {
err = ErrPipelineFewParameters
} else {
r, err = a.Forward(fwCtx)
a.rMutex.Lock()
a.result = r
a.rMutex.Unlock()
fwCtx.Previous = r
}
if err != nil {
log.Errorf("[pipeline] error running the Forward for the %s action - %s", a.Name, err)
if a.OnError != nil {
a.OnError(fwCtx, err)
}
p.rollback(i-1, params)
return err
}
}
return nil
}
func (p *Pipeline) rollback(index int, params []interface{}) {
bwCtx := BWContext{Params: params}
for i := index; i >= 0; i-- {
log.Debugf("[pipeline] running Backward for %s action", p.actions[i].Name)
if p.actions[i].Backward != nil {
bwCtx.FWResult = p.actions[i].result
p.actions[i].Backward(bwCtx)
}
}
}