-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathcmd_workflow.go
256 lines (224 loc) · 7.8 KB
/
cmd_workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package sparta
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
////////////////////////////////////////////////////////////////////////////////
// CONSTANTS
////////////////////////////////////////////////////////////////////////////////
var (
// SpartaTagBuildIDKey is the keyname used in the CloudFormation Output
// that stores the user-supplied or automatically generated BuildID
// for this run
SpartaTagBuildIDKey = spartaTagName("buildId")
// SpartaTagBuildTagsKey is the keyname used in the CloudFormation Output
// that stores the optional user-supplied golang build tags
SpartaTagBuildTagsKey = spartaTagName("buildTags")
)
const (
// MetadataParamCloudFormationStackPath is the path to the template
MetadataParamCloudFormationStackPath = "CloudFormationStackPath"
// MetadataParamServiceName is the name of the stack to use
MetadataParamServiceName = "ServiceName"
// MetadataParamS3Bucket is the Metadata param we use for the bucket
MetadataParamS3Bucket = "CodeArtifactS3Bucket"
// MetadataParamCodeArchivePath is the intemediate local path to the code
MetadataParamCodeArchivePath = "CodeArchivePath"
// MetadataParamS3SiteArchivePath is the intemediate local path to the S3 site contents
MetadataParamS3SiteArchivePath = "S3SiteArtifactPath"
)
const (
// StackParamS3CodeKeyName is the Stack Parameter to the S3 key of the uploaded asset
StackParamS3CodeKeyName = "CodeArtifactS3Key"
// StackParamS3CodeBucketName is where we uploaded the artifact to
StackParamS3CodeBucketName = MetadataParamS3Bucket
// StackParamS3CodeVersion is the object version to use for the S3 item
StackParamS3CodeVersion = "CodeArtifactS3ObjectVersion"
// StackParamS3SiteArchiveKey is the param to the S3 archive for a static website.
StackParamS3SiteArchiveKey = "SiteArtifactS3Key"
// StackParamS3SiteArchiveVersion is the version of the S3 artifact to use
StackParamS3SiteArchiveVersion = "SiteArtifactS3ObjectVersion"
)
const (
// StackOutputBuildTime is the Output param for when this template was built
StackOutputBuildTime = "TemplateCreationTime"
// StackOutputBuildID is the Output tag that holds the build id
StackOutputBuildID = "BuildID"
)
func showOptionalAWSUsageInfo(err error, logger *logrus.Logger) {
if err == nil {
return
}
userAWSErr, userAWSErrOk := err.(awserr.Error)
if userAWSErrOk {
if strings.Contains(userAWSErr.Error(), "could not find region configuration") {
logger.Error("")
logger.Error("Consider setting env.AWS_REGION, env.AWS_DEFAULT_REGION, or env.AWS_SDK_LOAD_CONFIG to resolve this issue.")
logger.Error("See the documentation at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html for more information.")
logger.Error("")
}
}
}
// // logFilesize outputs a friendly filesize for the given filepath
// func logFilesize(message string, filePath string, logger *logrus.Logger) {
// // Binary size
// stat, err := os.Stat(filePath)
// if err == nil {
// logger.WithFields(logrus.Fields{
// "Path": filePath,
// "Size": humanize.Bytes(uint64(stat.Size())),
// }).Info(message)
// }
// }
func spartaTagName(baseKey string) string {
return fmt.Sprintf("io:gosparta:%s", baseKey)
}
// Sanitize the provided input by replacing illegal characters with underscores
func sanitizedName(input string) string {
return reSanitize.ReplaceAllString(input, "_")
}
type pipelineBaseOp interface {
Invoke(context.Context, *logrus.Logger) error
Rollback(context.Context, *logrus.Logger) error
}
type pipelineStageBase interface {
Run(context.Context, *logrus.Logger) error
Append(string, pipelineBaseOp) error
Rollback(context.Context, *logrus.Logger) error
}
type pipelineStageOpEntry struct {
opName string
op pipelineBaseOp
}
type pipelineStage struct {
ops []*pipelineStageOpEntry
}
func (ps *pipelineStage) Append(opName string, op pipelineBaseOp) error {
ps.ops = append(ps.ops, &pipelineStageOpEntry{
opName: opName,
op: op,
})
return nil
}
func (ps *pipelineStage) Run(ctx context.Context, logger *logrus.Logger) error {
var wg sync.WaitGroup
var mapErr sync.Map
for eachIndex, eachEntry := range ps.ops {
wg.Add(1)
go func(opIndex int, opEntry *pipelineStageOpEntry, goLogger *logrus.Logger) {
defer wg.Done()
opErr := opEntry.op.Invoke(ctx, goLogger)
if opErr != nil {
mapKey := fmt.Sprintf("%sErr%d", opEntry.opName, opIndex)
mapErr.Store(mapKey, fmt.Sprintf("Operation (%s) error: %s",
opEntry.opName,
opErr))
}
}(eachIndex, eachEntry, logger)
}
wg.Wait()
// Were there any errors?
errorText := []string{}
mapErr.Range(func(key interface{}, value interface{}) bool {
keyName := key
valueErr := value
errorText = append(errorText, fmt.Sprintf("%s:%#v", keyName, valueErr))
return true
})
if len(errorText) != 0 {
return errors.New(strings.Join(errorText, ", "))
}
return nil
}
func (ps *pipelineStage) Rollback(ctx context.Context, logger *logrus.Logger) error {
// Ok, another wg to async cleanup everything. Operations
// need to be a bit stateful for this...
var wgRollback sync.WaitGroup
logger.Debugf("Rolling back %T due to errors", ps)
for _, eachEntry := range ps.ops {
wgRollback.Add(1)
go func(opEntry *pipelineStageOpEntry, goLogger *logrus.Logger) {
defer wgRollback.Done()
opErr := opEntry.op.Rollback(ctx, goLogger)
if opErr != nil {
goLogger.Warnf("Operation (%s) rollback failed: %s", opEntry.opName, opErr)
}
}(eachEntry, logger)
}
wgRollback.Wait()
return nil
}
type pipelineStageEntry struct {
stageName string
stage pipelineStageBase
duration time.Duration
}
type pipeline struct {
stages []*pipelineStageEntry
startTime time.Time
}
func (p *pipeline) Append(stageName string, stage pipelineStageBase) {
p.stages = append(p.stages, &pipelineStageEntry{
stageName: stageName,
stage: stage,
})
}
func (p *pipeline) Run(ctx context.Context,
name string,
logger *logrus.Logger) error {
p.startTime = time.Now()
// Run the stages, if there is an error, rollback
for stageIndex, curStage := range p.stages {
startTime := time.Now()
stageErr := curStage.stage.Run(ctx, logger)
if stageErr != nil {
logger.Warnf("Pipeline stage %s failed", curStage.stageName)
for index := stageIndex; index >= 0; index-- {
rollbackErr := p.stages[index].stage.Rollback(ctx, logger)
if rollbackErr != nil {
logger.Warnf("Pipeline stage %s failed to Rollback", curStage.stageName)
}
}
return stageErr
}
curStage.duration = time.Since(startTime)
}
// pipelineTotalDuration := time.Since(pipelineStartTime)
// summaryLine := fmt.Sprintf("Pipeline complete: %s", name)
// logger.Info(headerDivider)
// logger.Info(summaryLine)
// logger.Info(headerDivider)
// for eachIndex, eachStageEntry := range p.stages {
// logger.Infof("Stage %d: %s", eachIndex)
// for _, eachOp := range eachStage.ops {
// logger.WithFields(logrus.Fields{
// "Duration (s)": fmt.Sprintf("%.f", eachOp.duration.Seconds()),
// }).Info(eachOp.opName)
// }
// }
// for _, eachEntry := range ctx.transaction.stepDurations {
// ctx.logger.WithFields(logrus.Fields{
// "Duration (s)": fmt.Sprintf("%.f", eachEntry.duration.Seconds()),
// }).Info(eachEntry.name)
// }
// elapsed := time.Since(startTime)
// ctx.logger.WithFields(logrus.Fields{
// "Duration (s)": fmt.Sprintf("%.f", elapsed.Seconds()),
// }).Info("Total elapsed time")
// curTime := time.Now()
// ctx.logger.WithFields(logrus.Fields{
// "Time (UTC)": curTime.UTC().Format(time.RFC3339),
// "Time (Local)": curTime.Format(time.RFC822),
// }).Info("Complete")
// ctx.logger.Info(headerDivider)
return nil
}
////////////////////////////////////////////////////////////////////////////////
// Interfaces
////////////////////////////////////////////////////////////////////////////////