Skip to content

Commit 653159c

Browse files
authored
Edits data gathering process to retry dataGatherer (#185)
* Edits datagatherer and posting data to retry fails datagatherer uses backoff package to retry datagatherers These are retried for 5 minutes post data also retried for 5 minutes depending on the error given Signed-off-by: Jamie Leppard <JammyL@users.noreply.github.com> * Changes param1 to AlwaysFail in dummy datagatherer param1 is no longer needed. To keep a way of testing the exponential backoff this has been replaced with a boolean value which allows the datagathering process to always fail Signed-off-by: Jamie Leppard <JammyL@users.noreply.github.com> Co-authored-by: Jamie Leppard <JammyL@users.noreply.github.com>
1 parent b29f34a commit 653159c

File tree

7 files changed

+112
-46
lines changed

7 files changed

+112
-46
lines changed

agent.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@ data-gatherers:
88
- kind: "dummy"
99
name: "dummy"
1010
config:
11-
param-1: "bar"
11+
failed-attempts: 5
12+
- kind: "dummy"
13+
name: "dummy-fail"
14+
config:
15+
always-fail: true

cmd/agent.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cmd
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/jetstack/preflight/pkg/agent"
78
"github.com/spf13/cobra"
@@ -43,12 +44,12 @@ func init() {
4344
"",
4445
"Authorization token. If used, it will override the authorization token in the configuration file.",
4546
)
46-
agentCmd.PersistentFlags().UintVarP(
47+
agentCmd.PersistentFlags().DurationVarP(
4748
&agent.Period,
4849
"period",
4950
"p",
50-
3600,
51-
"Time between scans, in seconds.",
51+
time.Hour,
52+
"Time between scans (given as XhYmZs).",
5253
)
5354
agentCmd.PersistentFlags().StringVarP(
5455
&agent.CredentialsPath,
@@ -78,5 +79,12 @@ func init() {
7879
"",
7980
"Input file path, if used, it will read data from a local file instead of gathering data from clusters",
8081
)
82+
agentCmd.PersistentFlags().DurationVarP(
83+
&agent.BackoffMaxTime,
84+
"backoff-max-time",
85+
"",
86+
5*time.Minute,
87+
"Max time for retrying failed data gatherers (given as XhYmZs).",
88+
)
8189

8290
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/Azure/azure-storage-blob-go v0.8.0
1010
github.com/aws/aws-sdk-go v1.25.30
1111
github.com/blang/semver v3.5.1+incompatible
12+
github.com/cenkalti/backoff v2.0.0+incompatible
1213
github.com/d4l3k/messagediff v1.2.1 // indirect
1314
github.com/gomarkdown/markdown v0.0.0-20191104174740-4d42851d4d5a
1415
github.com/google/go-cmp v0.3.0

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5/go.mod h1:CDReaxg1
9797
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
9898
github.com/caarlos0/ctrlc v1.0.0/go.mod h1:CdXpj4rmq0q/1Eb44M9zi2nKB0QraNKuRGYGrrHhcQw=
9999
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMSc6E5ydlp5NIonxObaeu/Iub/X03EKPVYo=
100+
github.com/cenkalti/backoff v2.0.0+incompatible h1:5IIPUHhlnUZbcHQsQou5k1Tn58nJkeJL9U+ig5CHJbY=
100101
github.com/cenkalti/backoff v2.0.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
101102
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
102103
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=

pkg/agent/config_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestValidConfigLoad(t *testing.T) {
1717
- name: d1
1818
kind: dummy
1919
config:
20-
param-1: "bar"
20+
always-fail: false
2121
input-path: "/home"
2222
output-path: "/nothome"
2323
`
@@ -35,7 +35,7 @@ func TestValidConfigLoad(t *testing.T) {
3535
Name: "d1",
3636
Kind: "dummy",
3737
Config: &dummyConfig{
38-
Param1: "bar",
38+
AlwaysFail: false,
3939
},
4040
},
4141
},
@@ -59,7 +59,7 @@ func TestValidConfigWithEndpointLoad(t *testing.T) {
5959
- name: d1
6060
kind: dummy
6161
config:
62-
param-1: "bar"
62+
always-fail: false
6363
`
6464

6565
loadedConfig, err := ParseConfig([]byte(configFileContents))
@@ -80,7 +80,7 @@ func TestValidConfigWithEndpointLoad(t *testing.T) {
8080
Name: "d1",
8181
Kind: "dummy",
8282
Config: &dummyConfig{
83-
Param1: "bar",
83+
AlwaysFail: false,
8484
},
8585
},
8686
},

pkg/agent/dummy_data_gatherer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
)
99

1010
type dummyConfig struct {
11-
Param1 string `yaml:"param-1"`
11+
AlwaysFail bool `yaml:"always-fail"`
12+
FailedAttempts int `yaml:"failed-attempts"`
1213
wantOnCreationErr bool
1314
}
1415

@@ -17,14 +18,25 @@ func (c *dummyConfig) NewDataGatherer(ctx context.Context) (datagatherer.DataGat
1718
return nil, fmt.Errorf("an error")
1819
}
1920
return &dummyDataGatherer{
20-
Param1: c.Param1,
21+
AlwaysFail: c.AlwaysFail,
22+
FailedAttempts: c.FailedAttempts,
2123
}, nil
2224
}
2325

2426
type dummyDataGatherer struct {
25-
Param1 string
27+
AlwaysFail bool
28+
attemptNumber int
29+
FailedAttempts int
2630
}
2731

2832
func (c *dummyDataGatherer) Fetch() (interface{}, error) {
29-
return nil, nil
33+
var err error
34+
if c.attemptNumber < c.FailedAttempts {
35+
err = fmt.Errorf("First %d attempts will fail", c.FailedAttempts)
36+
}
37+
if c.AlwaysFail {
38+
err = fmt.Errorf("This data gatherer will always fail")
39+
}
40+
c.attemptNumber++
41+
return nil, err
3042
}

pkg/agent/run.go

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"strings"
1313
"time"
1414

15+
"github.com/cenkalti/backoff"
16+
"github.com/hashicorp/go-multierror"
1517
"github.com/jetstack/preflight/api"
1618
"github.com/jetstack/preflight/pkg/client"
1719
"github.com/jetstack/preflight/pkg/datagatherer"
@@ -25,8 +27,8 @@ var ConfigFilePath string
2527
// AuthToken is the authorization token that will be used for API calls
2628
var AuthToken string
2729

28-
// Period is the number of seconds between scans
29-
var Period uint
30+
// Period is the time waited between scans
31+
var Period time.Duration
3032

3133
// OneShot flag causes agent to run once
3234
var OneShot bool
@@ -40,6 +42,9 @@ var OutputPath string
4042
// InputPath is where the agent will read data from instead of gathering from clusters if specified
4143
var InputPath string
4244

45+
// BackoffMaxTime is the maximum time for which data gatherers will be retried
46+
var BackoffMaxTime time.Duration
47+
4348
// Run starts the agent process
4449
func Run(cmd *cobra.Command, args []string) {
4550
ctx := context.Background()
@@ -49,7 +54,7 @@ func Run(cmd *cobra.Command, args []string) {
4954
if OneShot {
5055
break
5156
}
52-
time.Sleep(time.Duration(Period) * time.Second)
57+
time.Sleep(Period)
5358
}
5459
}
5560

@@ -170,7 +175,16 @@ func gatherAndOutputData(ctx context.Context, config Config, preflightClient *cl
170175
}
171176
log.Println("Data saved locally to", OutputPath)
172177
} else {
173-
postData(config, preflightClient, readings)
178+
backOff := backoff.NewExponentialBackOff()
179+
backOff.MaxElapsedTime = BackoffMaxTime
180+
post := func() error {
181+
return postData(config, preflightClient, readings)
182+
}
183+
err := backoff.RetryNotify(post, backOff, notify)
184+
if err != nil {
185+
log.Fatalf("%v", err)
186+
}
187+
174188
}
175189
}
176190

@@ -193,39 +207,67 @@ func gatherData(ctx context.Context, config Config) []*api.DataReading {
193207
dataGatherers[dgConfig.Name] = dg
194208
}
195209

196-
// Fetch from all datagatherers
197-
now := time.Now()
210+
//TODO Change backoff parameters to those desired
211+
backOff := backoff.NewExponentialBackOff()
212+
backOff.MaxElapsedTime = BackoffMaxTime
198213
readings := []*api.DataReading{}
199-
failedDataGatherers := []string{}
200-
for k, dg := range dataGatherers {
201-
i, err := dg.Fetch()
202-
if err != nil {
203-
log.Printf("Error fetching with DataGatherer %q: %s", k, err)
204-
failedDataGatherers = append(failedDataGatherers, k)
205-
continue
206-
}
214+
completedDataGatherers := make(map[string]bool, len(dataGatherers))
207215

208-
log.Printf("Gathered data for %q:\n", k)
209-
210-
readings = append(readings, &api.DataReading{
211-
ClusterID: config.ClusterID,
212-
DataGatherer: k,
213-
Timestamp: api.Time{Time: now},
214-
Data: i,
215-
})
216+
// Fetch from all datagatherers
217+
getReadings := func() error {
218+
var dgError *multierror.Error
219+
for k, dg := range dataGatherers {
220+
if completedDataGatherers[k] {
221+
continue
222+
}
223+
dgData, err := dg.Fetch()
224+
if err != nil {
225+
err = fmt.Errorf("%s: %v", k, err)
226+
dgError = multierror.Append(dgError, err)
227+
continue
228+
} else {
229+
completedDataGatherers[k] = true
230+
231+
log.Printf("Successfully gathered data for %q", k)
232+
now := time.Now()
233+
234+
readings = append(readings, &api.DataReading{
235+
ClusterID: config.ClusterID,
236+
DataGatherer: k,
237+
Timestamp: api.Time{Time: now},
238+
Data: dgData,
239+
})
240+
}
241+
}
242+
dgError.ErrorFormat = func(es []error) string {
243+
points := make([]string, len(es))
244+
for i, err := range es {
245+
points[i] = fmt.Sprintf("* %s", err)
246+
}
247+
return fmt.Sprintf(
248+
"The following %d data gatherer(s) have failed:\n\t%s",
249+
len(es), strings.Join(points, "\n\t"))
250+
}
251+
return dgError
216252
}
217253

218-
if len(failedDataGatherers) > 0 {
219-
log.Printf(
220-
"Warning, the following DataGatherers failed, %s. Their data is not being sent.",
221-
strings.Join(failedDataGatherers, ", "),
222-
)
254+
err := backoff.RetryNotify(getReadings, backOff, notify)
255+
if err != nil {
256+
log.Println(err)
257+
log.Printf("This will not be retried")
258+
} else {
259+
log.Printf("All data gatherers successfull")
223260
}
224261
return readings
225262
}
226263

227-
func postData(config Config, preflightClient *client.PreflightClient, readings []*api.DataReading) {
264+
func notify(err error, t time.Duration) {
265+
log.Println(err, "\nRetrying...")
266+
}
267+
268+
func postData(config Config, preflightClient *client.PreflightClient, readings []*api.DataReading) error {
228269
baseURL := config.Server
270+
var err error
229271

230272
log.Println("Running Agent...")
231273
log.Println("Posting data to ", baseURL)
@@ -241,7 +283,7 @@ func postData(config Config, preflightClient *client.PreflightClient, readings [
241283
res, err := preflightClient.Post(path, bytes.NewBuffer(data))
242284

243285
if err != nil {
244-
log.Fatalf("Failed to post data: %+v", err)
286+
return fmt.Errorf("Failed to post data: %+v", err)
245287
}
246288
if code := res.StatusCode; code < 200 || code >= 300 {
247289
errorContent := ""
@@ -251,15 +293,13 @@ func postData(config Config, preflightClient *client.PreflightClient, readings [
251293
}
252294
defer res.Body.Close()
253295

254-
log.Fatalf("Received response with status code %d. Body: %s", code, errorContent)
296+
return fmt.Errorf("Received response with status code %d. Body: %s", code, errorContent)
255297
}
256298
} else {
257299
err := preflightClient.PostDataReadings(config.OrganizationID, readings)
258-
// TODO: handle errors gracefully: e.g. handle retries when it is possible
259300
if err != nil {
260-
log.Fatalf("Post to server failed: %+v", err)
301+
return fmt.Errorf("Post to server failed: %+v", err)
261302
}
262303
}
263-
264-
log.Println("Data sent successfully.")
304+
return err
265305
}

0 commit comments

Comments
 (0)