Skip to content

Commit

Permalink
feat(runner): use Context
Browse files Browse the repository at this point in the history
  • Loading branch information
miton18 committed Nov 22, 2020
1 parent 9bfaf44 commit 0408468
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
1 change: 0 additions & 1 deletion config/operator/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ kind: Namespace
apiVersion: v1
metadata:
name: miniloops

---
kind: Deployment
apiVersion: apps/v1
Expand Down
14 changes: 9 additions & 5 deletions pkg/runner/local/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ func (script *Script) Start() {
go func() {
for range script.ticker.C {
start := time.Now()
ctx, _ := context.WithTimeout(context.Background(), script.every)

res, err := script.Exec()
res, err := script.Exec(ctx)
script.execCount.Inc()
script.execDuration.Add(float64(time.Since(start).Milliseconds()))

serr := client.
LoopsFor(script.object.GetNamespace()).
SetLastExecution(context.Background(), script.object.GetName(), time.Now(), err == nil)
SetLastExecution(ctx, script.object.GetName(), time.Now(), err == nil)
if serr != nil {
logrus.WithError(serr).Error("cannot set loop last execution")
}
Expand Down Expand Up @@ -133,7 +134,8 @@ func (script *Script) Stop() {
script.ticker.Stop()
}

func (script *Script) Exec() (*warp10.Response, error) {
// Exec execute the given script
func (script *Script) Exec(ctx context.Context) (*warp10.Response, error) {
script.Lock()
defer script.Unlock()

Expand All @@ -156,7 +158,7 @@ func (script *Script) Exec() (*warp10.Response, error) {
Native().
CoreV1().
Secrets(script.object.GetNamespace()).
Get(context.Background(), loopImport.Secret.Name, meta.GetOptions{})
Get(ctx, loopImport.Secret.Name, meta.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -175,7 +177,9 @@ func (script *Script) Exec() (*warp10.Response, error) {

logrus.Debug("WarpScript\n", ws.String())

res := warp10.NewRequest(script.object.Spec.Endpoint, ws.String()).Exec()
res := warp10.
NewRequest(script.object.Spec.Endpoint, ws.String()).
Exec(ctx)
if res.IsErrored() {
return nil, res.Error()
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/warp10/warp10.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package warp10

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -45,8 +46,13 @@ func NewRequest(endpoint, warpscript string) Request {
return Request{Endpoint: endpoint, WarpScript: warpscript}
}

func (req Request) Exec() *Response {
return Exec(req)
// Exec execute the given WarpScript
func (req Request) Exec(ctx context.Context) *Response {
if ctx == nil {
ctx = context.Background()
}

return Exec(ctx, req)
}

func (res Response) IsErrored() bool {
Expand Down Expand Up @@ -91,10 +97,11 @@ func (err ResponseError) Error() string {
return fmt.Sprintf("WarpScript#%d: %s", err.Line, err.Message)
}

func Exec(request Request) *Response {
// Exec execute the given WarpScript
func Exec(ctx context.Context, request Request) *Response {
response := &Response{request: &request}

req, err := http.NewRequest("POST", request.Endpoint, strings.NewReader(request.WarpScript))
req, err := http.NewRequestWithContext(ctx, "POST", request.Endpoint, strings.NewReader(request.WarpScript))
if err != nil {
return response
}
Expand Down

0 comments on commit 0408468

Please sign in to comment.