-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathjob_runs_controller.go
130 lines (118 loc) · 4.21 KB
/
job_runs_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package web
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"github.com/gin-gonic/gin"
"github.com/manyminds/api2go/jsonapi"
"github.com/smartcontractkit/chainlink/services"
"github.com/smartcontractkit/chainlink/store/models"
"github.com/smartcontractkit/chainlink/store/orm"
"github.com/smartcontractkit/chainlink/store/presenters"
"github.com/smartcontractkit/chainlink/utils"
)
// JobRunsController manages JobRun requests in the node.
type JobRunsController struct {
App services.Application
}
// Index returns paginated JobRuns for a given JobSpec
// Example:
// "<application>/runs?jobSpecId=:jobSpecId&size=1&page=2"
func (jrc *JobRunsController) Index(c *gin.Context) {
id := c.Query("jobSpecId")
size, page, offset, err := ParsePaginatedRequest(c.Query("size"), c.Query("page"))
if err != nil {
c.AbortWithError(422, err)
return
}
order := orm.Ascending
if c.Query("sort") == "-createdAt" {
order = orm.Descending
}
store := jrc.App.GetStore()
var runs []models.JobRun
var count int
if id == "" {
runs, count, err = store.JobRunsSorted(order, offset, size)
} else {
runs, count, err = store.JobRunsSortedFor(id, order, offset, size)
}
if err == orm.ErrorNotFound {
c.Data(404, MediaType, emptyJSON)
} else if err != nil {
c.AbortWithError(500, fmt.Errorf("error getting paged JobRuns: %+v", err))
} else if buffer, err := NewPaginatedResponse(*c.Request.URL, size, page, count, runs); err != nil {
c.AbortWithError(500, fmt.Errorf("failed to marshal document: %+v", err))
} else {
c.Data(200, MediaType, buffer)
}
}
// Create starts a new Run for the requested JobSpec.
// Example:
// "<application>/specs/:SpecID/runs"
func (jrc *JobRunsController) Create(c *gin.Context) {
id := c.Param("SpecID")
if j, err := jrc.App.GetStore().FindJob(id); err == orm.ErrorNotFound {
c.AbortWithError(404, errors.New("Job not found"))
} else if err != nil {
c.AbortWithError(500, err)
} else if !j.WebAuthorized() {
c.AbortWithError(403, errors.New("Job not available on web API, recreate with web initiator"))
} else if data, err := getRunData(c); err != nil {
c.AbortWithError(500, err)
} else if jr, err := services.ExecuteJob(j, j.InitiatorsFor(models.InitiatorWeb)[0], models.RunResult{Data: data}, nil, jrc.App.GetStore()); err != nil {
c.AbortWithError(500, err)
} else if doc, err := jsonapi.Marshal(presenters.JobRun{JobRun: *jr}); err != nil {
c.AbortWithError(500, err)
} else {
c.Data(200, MediaType, doc)
}
}
func getRunData(c *gin.Context) (models.JSON, error) {
b, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
return models.JSON{}, err
}
return models.ParseJSON(b)
}
// Show returns the details of a JobRun.
// Example:
// "<application>/runs/:RunID"
func (jrc *JobRunsController) Show(c *gin.Context) {
id := c.Param("RunID")
if jr, err := jrc.App.GetStore().FindJobRun(id); err == orm.ErrorNotFound {
c.AbortWithError(404, errors.New("Job Run not found"))
} else if err != nil {
c.AbortWithError(500, err)
} else if doc, err := jsonapi.Marshal(presenters.JobRun{JobRun: jr}); err != nil {
c.AbortWithError(500, err)
} else {
c.Data(200, MediaType, doc)
}
}
// Update allows external adapters to resume a JobRun, reporting the result of
// the task and marking it no longer pending.
// Example:
// "<application>/runs/:RunID"
func (jrc *JobRunsController) Update(c *gin.Context) {
id := c.Param("RunID")
var brr models.BridgeRunResult
if jr, err := jrc.App.GetStore().FindJobRun(id); err == orm.ErrorNotFound {
c.AbortWithError(404, errors.New("Job Run not found"))
} else if err != nil {
c.AbortWithError(500, err)
} else if !jr.Result.Status.PendingBridge() {
c.AbortWithError(405, errors.New("Cannot resume a job run that isn't pending"))
} else if err := c.ShouldBindJSON(&brr); err != nil {
c.AbortWithError(500, err)
} else if bt, err := jrc.App.GetStore().PendingBridgeType(jr); err != nil {
c.AbortWithError(500, err)
} else if _, err := bt.Authenticate(utils.StripBearer(c.Request.Header.Get("Authorization"))); err != nil {
publicError(c, http.StatusUnauthorized, err)
} else if _, err = services.ResumePendingTask(&jr, jrc.App.GetStore(), brr.RunResult); err != nil {
c.AbortWithError(500, err)
} else {
c.JSON(200, gin.H{"id": jr.ID})
}
}