-
Notifications
You must be signed in to change notification settings - Fork 37
/
cmd_watch.go
321 lines (285 loc) · 10.4 KB
/
cmd_watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package main
import (
"context"
"fmt"
"os"
"sort"
"strconv"
"strings"
"time"
circleci "github.com/jszwedko/go-circleci"
"github.com/spf13/cobra"
libhoney "github.com/honeycombio/libhoney-go"
)
// numChecks is the number of times to verify that we're finished before
// declaring success in case we enter a transient state with blocked jobs that
// really will start soon. This can be long - wait for up to 2 minutes (5sec *
// 24 = 120sec). It's ok for this to be long because it only covers the time
// when there are existing jobs that are not going to run. Most builds finish
// with all jobs finishing, so this timer will not caused delayed builds in
// those cases.
const numChecks = 24
type watchConfig struct {
timeoutMin int
circleKey string
workflowID string
jobName string
}
func commandWatch(cfg *libhoney.Config, filename *string, ciProvider *string, wcfg *watchConfig) *cobra.Command {
// WATCH eg: buildevents watch $TRAVIS_BUILD_ID
watchCmd := &cobra.Command{
Use: "watch BUILD_ID",
Short: "Polls the CircleCI API and waits until all jobs have finished.",
Long: `
Polls the CircleCI API and waits until all jobs have finished (either
succeeded, failed, or are blocked). It then reports the final status of the
build with the appropriate timers.`,
Args: cobra.MatchAll(
cobra.ExactArgs(1),
func(cmd *cobra.Command, args []string) error {
if *ciProvider != providerCircle {
return fmt.Errorf("watch command only valid for %s", providerCircle)
}
return nil
},
),
RunE: func(cmd *cobra.Command, args []string) error {
traceID := strings.TrimSpace(args[0])
ev := createEvent(cfg, *ciProvider, traceID)
defer ev.Send()
providerInfo(*ciProvider, ev)
ok, startTime, endTime, err := waitCircle(context.Background(), *wcfg)
if err != nil {
fmt.Printf("buildevents - Error detected: %s\n", err.Error())
return err
}
status := "failed"
if ok {
status = "success"
}
ev.Add(map[string]interface{}{
"service_name": ifClassic(cfg, "watch", cfg.Dataset),
"service.name": ifClassic(cfg, "watch", cfg.Dataset),
"command_name": "watch",
"trace.span_id": traceID,
"name": ifClassic(cfg, "watch "+traceID, "watch"),
"status": status,
"duration_ms": endTime.Sub(startTime) / time.Millisecond,
"source": "buildevents",
})
ev.Timestamp = startTime
arbitraryFields(*filename, ev) // TODO: consider - move this until after the watch timeout??
url, err := buildURL(cfg, traceID, startTime.Unix())
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to create trace URL: %v\n", err)
} else {
fmt.Println(url)
}
return nil
},
}
watchCmd.Flags().IntVarP(&wcfg.timeoutMin, "timeout", "t", 10, "[env.BUILDEVENT_TIMEOUT] maximum time (in minutes) that watch should wait before timing out")
if ts, ok := os.LookupEnv("BUILDEVENT_TIMEOUT"); ok {
// This will end up ignoring non-integer values in the envvar
if _, err := strconv.Atoi(ts); err == nil {
watchCmd.Flags().Lookup("timeout").Value.Set(ts)
}
}
watchCmd.Flags().StringVarP(&wcfg.circleKey, "circlekey", "c", "", "[env.BUILDEVENT_CIRCLE_API_TOKEN] CircleCI API token used for watching builds for private repositories")
if tok, ok := os.LookupEnv("BUILDEVENT_CIRCLE_API_TOKEN"); ok {
watchCmd.Flags().Lookup("circlekey").Value.Set(tok)
}
watchCmd.Flags().StringVarP(&wcfg.workflowID, "workflowid", "w", "", "[env.CIRCLE_WORKFLOW_ID] CircleCI identifier for the current workflow")
if wfid, ok := os.LookupEnv("CIRCLE_WORKFLOW_ID"); ok {
watchCmd.Flags().Lookup("workflowid").Value.Set(wfid)
}
watchCmd.Flags().StringVarP(&wcfg.jobName, "jobname", "j", "", "[env.CIRCLE_JOB] CircleCI identifier for the current job")
if jnm, ok := os.LookupEnv("CIRCLE_JOB"); ok {
watchCmd.Flags().Lookup("jobname").Value.Set(jnm)
}
return watchCmd
}
// waitCircle polls the CircleCI API checking for the status of this workflow
// and the jobs it contains. It returns whether the workflow build succeeded,
// the time it started, and the time it ended (which will be either nowish or
// sometime in the past if we timed out). The err returned is for errors polling
// the CircleCI API, not errors in the build itself.
func waitCircle(parent context.Context, cfg watchConfig) (passed bool, started, ended time.Time, err error) {
// we need a token to query anything; give a helpful error if we have no token
if cfg.circleKey == "" {
return false, time.Now(), time.Now().Add(time.Second), fmt.Errorf("circle token required to poll the API")
}
client := &circleci.Client{Token: cfg.circleKey}
wf, err := client.GetWorkflowV2(cfg.workflowID)
if err != nil {
return false, time.Now(), time.Now().Add(time.Second), err
}
started = wf.CreatedAt
ended = time.Now() // set a default in case we early exit
// set up cancellation timeout based on the configured timout duration
done := make(chan struct{})
ctx, cxl := context.WithTimeout(parent, time.Duration(cfg.timeoutMin)*time.Minute)
defer cxl()
// sometimes there's a gap between when a job finishes and the next one starts.
// In that case there are no jobs running and some jobs blocked that could
// still run. If we think the build has passed and finished, let's give it a
// buffer to spin up new jobs before really considering it done. This buffer
// will check for up to 2 minutes
checksLeft := numChecks + 1 // +1 because we decrement at the beginning of the loop
go func() {
defer close(done)
tk := time.NewTicker(5 * time.Second).C
for range tk {
// check for timeout or pause before the next iteration
select {
case <-ctx.Done():
// TODO add the fact that it timed out to the trace to say why it failed
fmt.Fprintf(os.Stderr, "Timeout reached waiting for the workflow to finish\n")
ended = time.Now()
return
default:
}
anyRunning, anyFailed, anyBlocked, err := evalWorkflow(client, cfg.workflowID, cfg.jobName)
if !anyRunning {
// if this is the first time we think we're finished store the timestamp
if checksLeft >= numChecks {
ended = time.Now()
}
if !anyBlocked && err == nil {
// we are legit done.
passed = !anyFailed
if passed {
fmt.Println("Build passed!")
} else {
fmt.Println("Build failed!")
}
return
}
// ok, carry on
checksLeft--
if checksLeft <= 0 {
// we're done checking.
passed = !anyFailed
if passed {
fmt.Println("Build passed!")
} else {
fmt.Println("Build failed!")
}
return
}
if err != nil {
// we previously successfully queried for the workflow; this is likely a
// transient error
fmt.Printf("Querying the CirlceCI API failed with %s; trying %d more times before giving up.\n", err.Error(), checksLeft)
continue
}
if anyFailed {
// don't bother rechecking if a job has failed
fmt.Printf("Build failed!\n")
ended = time.Now()
return
}
// yay looks like maybe we're done?
fmt.Printf("Build appears finished; checking %d more times to make sure.\n", checksLeft)
continue
}
// if we previously thought we were finished but now realize we weren't,
// reset the check counter so we try 3 times again next time we think we're
// finished.
passed = false
checksLeft = numChecks
}
}()
<-done
return passed, started, ended, nil
}
// evalWorkflow looks at the CircleCI API for the list of jobs in this workflow
// and decides whether the build has finished and if finished, whether it
// failed. If an error is returned, it represents an error talking to the
// CircleCI API, not an error with the workflow.
func evalWorkflow(client *circleci.Client, wfID string, jobName string) (anyRunning bool, anyFailed bool, anyBlocked bool, err error) {
fmt.Printf("%s: polling for jobs: ", time.Now().Format(time.StampMilli))
wfJobs, err := getJobs(client, wfID)
if err != nil {
fmt.Printf("error polling: %s\n", err.Error())
return true, true, false, err
}
fmt.Println(summarizeJobList(wfJobs))
anyRunning = false
anyBlocked = false
for _, job := range wfJobs {
// skip ourself so we don't wait if we're the only job running
if job.Name == jobName {
continue
}
switch job.Status {
case "success":
// success means it finished and passed, don't keep track of it
continue
case "blocked":
// blocked means it can't yet run, but that could be because either
// it's waiting on a running job, depends on a failed job, or
// it's not configured to run this build (because of a tag or something)
anyBlocked = true
continue
case "not_running":
// not_running is the same as queued
fallthrough
case "queued":
// queued means a job is due to start running soon, so we consider it running
// already.
anyRunning = true
case "failed":
anyFailed = true
continue
case "running":
anyRunning = true
}
}
return anyRunning, anyFailed, anyBlocked, nil
}
// getJobs queries the CircleCI API for a list of all jobs in the current workflow
func getJobs(client *circleci.Client, wfID string) ([]*circleci.WorkflowJob, error) {
// get the list of jobs, paging if necessary
wfJobs, more, err := client.ListWorkflowV2Jobs(wfID, nil)
if err != nil {
return nil, err
}
for more != nil {
// TODO only print this in debug mode
fmt.Printf("getting more jobs! next page is %s\n", *more)
var moreJobs []*circleci.WorkflowJob
moreJobs, more, err = client.ListWorkflowV2Jobs(wfID, nil)
if err != nil {
return nil, err
}
wfJobs = append(wfJobs, moreJobs...)
}
return wfJobs, nil
}
// summarizeJobList takes a list of jobs and returns a string summary
func summarizeJobList(wfJobs []*circleci.WorkflowJob) string {
if len(wfJobs) == 0 {
return "no jobs found"
}
// look at all the jobs and count how many are in each status state
countByStatus := map[string]int{}
for _, job := range wfJobs {
countByStatus[job.Status]++
}
// sort the statuses present to print them in a consistent order
sortedStatusList := make([]string, 0, len(countByStatus))
for key := range countByStatus {
sortedStatusList = append(sortedStatusList, key)
}
sort.Strings(sortedStatusList)
// create a list of printable status counts
statusStrings := make([]string, 0, len(countByStatus))
for i := 0; i < len(countByStatus); i++ {
status := sortedStatusList[i]
count := countByStatus[status]
statusStrings = append(statusStrings, fmt.Sprintf("%d %s", count, status))
}
// join the list of printable statuses to make one nice line
return strings.Join(statusStrings, ", ")
}