-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathjob_runs_controller.go
138 lines (127 loc) · 4.48 KB
/
job_runs_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package web
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"github.com/asdine/storm"
"github.com/gin-gonic/gin"
"github.com/manyminds/api2go/jsonapi"
"github.com/smartcontractkit/chainlink/logger"
"github.com/smartcontractkit/chainlink/services"
"github.com/smartcontractkit/chainlink/store"
"github.com/smartcontractkit/chainlink/store/models"
"github.com/smartcontractkit/chainlink/store/presenters"
"github.com/smartcontractkit/chainlink/utils"
)
// JobRunsController manages JobRun requests in the node.
type JobRunsController struct {
App *services.ChainlinkApplication
}
// Index returns paginated JobRuns for a given JobSpec
// Example:
// "<application>/specs/:SpecID/runs?size=1&page=2"
func (jrc *JobRunsController) Index(c *gin.Context) {
id := c.Param("SpecID")
size, page, offset, err := ParsePaginatedRequest(c.Query("size"), c.Query("page"))
if err != nil {
c.AbortWithError(422, err)
return
}
var jrs []models.JobRun
if count, err := jrc.App.Store.JobRunsCountFor(id); err != nil {
c.AbortWithError(500, fmt.Errorf("error getting count of JobRuns: %+v", err))
} else if err := jrc.App.Store.Find("JobID", id, &jrs, storm.Limit(size), storm.Skip(offset), storm.Reverse()); err != nil {
c.AbortWithError(500, fmt.Errorf("error getting JobRuns: %+v", err))
} else if buffer, err := NewPaginatedResponse(*c.Request.URL, size, page, count, jrs); err != nil {
c.AbortWithError(500, fmt.Errorf("failed to marshal document: %+v", err))
} else {
c.Data(200, MediaType, buffer)
}
}
// Create starts a new Run for the requested JobSpec.
// Example:
// "<application>/specs/:SpecID/runs"
func (jrc *JobRunsController) Create(c *gin.Context) {
id := c.Param("SpecID")
if j, err := jrc.App.Store.FindJob(id); err == storm.ErrNotFound {
c.AbortWithError(404, errors.New("Job not found"))
} else if err != nil {
c.AbortWithError(500, err)
} else if !j.WebAuthorized() {
c.AbortWithError(403, errors.New("Job not available on web API, recreate with web initiator"))
} else if data, err := getRunData(c); err != nil {
c.AbortWithError(500, err)
} else if jr, err := startJob(j, jrc.App.Store, data); err != nil {
c.AbortWithError(500, err)
} else if doc, err := jsonapi.Marshal(presenters.JobRun{jr}); err != nil {
c.AbortWithError(500, err)
} else {
c.Data(200, MediaType, doc)
}
}
func getRunData(c *gin.Context) (models.JSON, error) {
b, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
return models.JSON{}, err
}
return models.ParseJSON(b)
}
// Show returns the details of a JobRun.
// Example:
// "<application>/runs/:RunID"
func (jrc *JobRunsController) Show(c *gin.Context) {
id := c.Param("RunID")
if jr, err := jrc.App.Store.FindJobRun(id); err == storm.ErrNotFound {
c.AbortWithError(404, errors.New("Job Run not found"))
} else if err != nil {
c.AbortWithError(500, err)
} else if doc, err := jsonapi.Marshal(presenters.JobRun{jr}); err != nil {
c.AbortWithError(500, err)
} else {
c.Data(200, MediaType, doc)
}
}
// Update allows external adapters to resume a JobRun, reporting the result of
// the task and marking it no longer pending.
// Example:
// "<application>/runs/:RunID"
func (jrc *JobRunsController) Update(c *gin.Context) {
id := c.Param("RunID")
var brr models.BridgeRunResult
if jr, err := jrc.App.Store.FindJobRun(id); err == storm.ErrNotFound {
c.AbortWithError(404, errors.New("Job Run not found"))
} else if err != nil {
c.AbortWithError(500, err)
} else if !jr.Result.Status.PendingBridge() {
c.AbortWithError(405, errors.New("Cannot resume a job run that isn't pending"))
} else if err := c.ShouldBindJSON(&brr); err != nil {
c.AbortWithError(500, err)
} else if bt, err := jrc.App.Store.PendingBridgeType(jr); err != nil {
c.AbortWithError(500, err)
} else if _, err := bt.Authenticate(utils.StripBearer(c.Request.Header.Get("Authorization"))); err != nil {
publicError(c, http.StatusUnauthorized, err)
} else {
executeRun(jr, jrc.App.Store, brr.RunResult)
c.JSON(200, gin.H{"id": jr.ID})
}
}
func startJob(j models.JobSpec, s *store.Store, body models.JSON) (models.JobRun, error) {
i := j.InitiatorsFor(models.InitiatorWeb)[0]
jr, err := services.BuildRun(j, i, s)
if err != nil {
return jr, err
}
if s.Save(&jr); err != nil {
return jr, err
}
executeRun(jr, s, models.RunResult{Data: body})
return jr, nil
}
func executeRun(jr models.JobRun, s *store.Store, rr models.RunResult) {
go func() {
if err := s.RunChannel.Send(jr.ID, rr, nil); err != nil {
logger.Error("Web initiator: ", err.Error())
}
}()
}