-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathgcsworker.go
96 lines (80 loc) · 2.29 KB
/
gcsworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Package gcsworker implements an http worker with automatic Google Cloud authentication.
package gcsworker
import (
"context"
"net/http"
"bytes"
"errors"
"net/url"
"github.com/dave/blast/blaster"
"github.com/mitchellh/mapstructure"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
)
// New returns a new gcs worker
func New() blaster.Worker {
return &Worker{}
}
// Worker is the worker type
type Worker struct {
client *http.Client
}
// Start satisfies the blaster.Starter interface
func (w *Worker) Start(ctx context.Context, payload map[string]interface{}) error {
// notest
src, err := google.DefaultTokenSource(ctx)
if err != nil {
return err
}
w.client = oauth2.NewClient(ctx, src)
return nil
}
// Send satisfies the blaster.Worker interface
func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[string]interface{}, error) {
var payload def
if err := mapstructure.Decode(raw, &payload); err != nil {
return map[string]interface{}{"status": "Error decoding payload"}, err
}
request, err := http.NewRequest(payload.Method, payload.URL, bytes.NewBufferString(payload.Body))
if err != nil {
return map[string]interface{}{"status": "Error creating request"}, err
}
request = request.WithContext(ctx)
for k, v := range payload.Headers {
request.Header.Add(k, v)
}
response, err := w.client.Do(request)
if err != nil {
var status interface{}
ue, ok := err.(*url.Error)
switch {
case response != nil:
// notest
status = response.StatusCode
case ok && ue.Err == context.DeadlineExceeded:
status = "Timeout"
case ok && ue.Err == context.Canceled:
status = "Cancelled"
case ok:
status = ue.Err.Error()
default:
// notest
status = err.Error()
}
return map[string]interface{}{"status": status}, err
}
if response.StatusCode != 200 {
return map[string]interface{}{"status": response.StatusCode}, errors.New("non 200 status")
}
return map[string]interface{}{"status": 200}, nil
}
type def struct {
// Method sets the http method e.g. `GET`, `POST` etc.
Method string `mapstructure:"method"`
// Url sets the full URL of the http request
URL string `mapstructure:"url"`
// Body sets the full http body
Body string `mapstructure:"body"`
// Headers sets the http headers
Headers map[string]string `mapstructure:"headers"`
}