diff --git a/config/operator/deployment.yml b/config/operator/deployment.yml index 05c9d67..f5881e0 100644 --- a/config/operator/deployment.yml +++ b/config/operator/deployment.yml @@ -3,7 +3,6 @@ kind: Namespace apiVersion: v1 metadata: name: miniloops - --- kind: Deployment apiVersion: apps/v1 diff --git a/pkg/runner/local/script.go b/pkg/runner/local/script.go index e47c27d..b4265c9 100644 --- a/pkg/runner/local/script.go +++ b/pkg/runner/local/script.go @@ -80,14 +80,15 @@ func (script *Script) Start() { go func() { for range script.ticker.C { start := time.Now() + ctx, _ := context.WithTimeout(context.Background(), script.every) - res, err := script.Exec() + res, err := script.Exec(ctx) script.execCount.Inc() script.execDuration.Add(float64(time.Since(start).Milliseconds())) serr := client. LoopsFor(script.object.GetNamespace()). - SetLastExecution(context.Background(), script.object.GetName(), time.Now(), err == nil) + SetLastExecution(ctx, script.object.GetName(), time.Now(), err == nil) if serr != nil { logrus.WithError(serr).Error("cannot set loop last execution") } @@ -133,7 +134,8 @@ func (script *Script) Stop() { script.ticker.Stop() } -func (script *Script) Exec() (*warp10.Response, error) { +// Exec execute the given script +func (script *Script) Exec(ctx context.Context) (*warp10.Response, error) { script.Lock() defer script.Unlock() @@ -156,7 +158,7 @@ func (script *Script) Exec() (*warp10.Response, error) { Native(). CoreV1(). Secrets(script.object.GetNamespace()). - Get(context.Background(), loopImport.Secret.Name, meta.GetOptions{}) + Get(ctx, loopImport.Secret.Name, meta.GetOptions{}) if err != nil { return nil, err } @@ -175,7 +177,9 @@ func (script *Script) Exec() (*warp10.Response, error) { logrus.Debug("WarpScript\n", ws.String()) - res := warp10.NewRequest(script.object.Spec.Endpoint, ws.String()).Exec() + res := warp10. + NewRequest(script.object.Spec.Endpoint, ws.String()). + Exec(ctx) if res.IsErrored() { return nil, res.Error() } diff --git a/pkg/warp10/warp10.go b/pkg/warp10/warp10.go index 55a34f4..765cc4b 100644 --- a/pkg/warp10/warp10.go +++ b/pkg/warp10/warp10.go @@ -1,6 +1,7 @@ package warp10 import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -45,8 +46,13 @@ func NewRequest(endpoint, warpscript string) Request { return Request{Endpoint: endpoint, WarpScript: warpscript} } -func (req Request) Exec() *Response { - return Exec(req) +// Exec execute the given WarpScript +func (req Request) Exec(ctx context.Context) *Response { + if ctx == nil { + ctx = context.Background() + } + + return Exec(ctx, req) } func (res Response) IsErrored() bool { @@ -91,10 +97,11 @@ func (err ResponseError) Error() string { return fmt.Sprintf("WarpScript#%d: %s", err.Line, err.Message) } -func Exec(request Request) *Response { +// Exec execute the given WarpScript +func Exec(ctx context.Context, request Request) *Response { response := &Response{request: &request} - req, err := http.NewRequest("POST", request.Endpoint, strings.NewReader(request.WarpScript)) + req, err := http.NewRequestWithContext(ctx, "POST", request.Endpoint, strings.NewReader(request.WarpScript)) if err != nil { return response }