Skip to content

Commit

Permalink
consolidate status on RunResults
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Ellis <email@steveell.is>
  • Loading branch information
dimroc authored and se3000 committed Mar 23, 2018
1 parent 4e1483c commit 3eb0036
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 52 deletions.
7 changes: 5 additions & 2 deletions adapters/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ type Bridge struct {
// If the Perform is resumed with a pending RunResult, the RunResult is marked
// not pending and the RunResult is returned.
func (ba *Bridge) Perform(input models.RunResult, _ *store.Store) models.RunResult {
if input.Pending {
if input.Pending() {
return markNotPending(input)
}
return ba.handleNewRun(input)
}

func markNotPending(input models.RunResult) models.RunResult {
input.Pending = false
input.Status = models.StatusInProgress
return input
}

Expand Down Expand Up @@ -63,6 +63,9 @@ func (ba *Bridge) handleNewRun(input models.RunResult) models.RunResult {
if err != nil {
return baRunResultError(input, "unmarshaling JSON", err)
}
if rr.ExternalPending {
return rr.MarkPending()
}
return rr
}

Expand Down
6 changes: 3 additions & 3 deletions adapters/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestBridge_Perform_FromUnstarted(t *testing.T) {
assert.Equal(t, test.want, val.String())
assert.Equal(t, test.wantExists, val.Exists())
assert.Equal(t, test.wantErrored, result.HasError())
assert.Equal(t, test.wantPending, result.Pending)
assert.Equal(t, test.wantPending, result.Pending())
})
}
}
Expand Down Expand Up @@ -83,14 +83,14 @@ func TestBridge_Perform_FromPending(t *testing.T) {
input := models.RunResult{
Data: cltest.JSONFromString(test.input),
ErrorMessage: test.errorMessage,
Pending: true,
Status: models.StatusPending,
}

result := ba.Perform(input, store)

assert.Equal(t, test.want, result.Data.String())
assert.Equal(t, test.errorMessage, result.ErrorMessage)
assert.Equal(t, false, result.Pending)
assert.Equal(t, false, result.Pending())
})
}
}
2 changes: 1 addition & 1 deletion adapters/eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type EthTx struct {
// is not currently pending. Then it confirms the transaction was confirmed on
// the blockchain.
func (etx *EthTx) Perform(input models.RunResult, store *store.Store) models.RunResult {
if !input.Pending {
if !input.Pending() {
return createTxRunResult(etx, input, store)
} else {
return ensureTxRunResult(input, store)
Expand Down
6 changes: 3 additions & 3 deletions adapters/eth_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestEthTxAdapter_Perform_FromPending(t *testing.T) {
output := adapter.Perform(input, store)

assert.False(t, output.HasError())
assert.True(t, output.Pending)
assert.True(t, output.Pending())
assert.Nil(t, store.One("ID", tx.ID, tx))
attempts, _ := store.AttemptsFor(tx.ID)
assert.Equal(t, 1, len(attempts))
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestEthTxAdapter_Perform_FromPendingBumpGas(t *testing.T) {
output := adapter.Perform(input, store)

assert.False(t, output.HasError())
assert.True(t, output.Pending)
assert.True(t, output.Pending())
assert.Nil(t, store.One("ID", tx.ID, tx))
attempts, _ := store.AttemptsFor(tx.ID)
assert.Equal(t, 2, len(attempts))
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestEthTxAdapter_Perform_FromPendingConfirm(t *testing.T) {

output := adapter.Perform(input, store)

assert.False(t, output.Pending)
assert.False(t, output.Pending())
assert.False(t, output.HasError())

assert.Nil(t, store.One("ID", tx.ID, tx))
Expand Down
4 changes: 2 additions & 2 deletions adapters/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestHttpGet_Perform(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, test.want, val)
assert.Equal(t, test.wantErrored, result.HasError())
assert.Equal(t, false, result.Pending)
assert.Equal(t, false, result.Pending())
})
}
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestHttpPost_Perform(t *testing.T) {
assert.Equal(t, test.want, val.String())
assert.Equal(t, true, val.Exists())
assert.Equal(t, test.wantErrored, result.HasError())
assert.Equal(t, false, result.Pending)
assert.Equal(t, false, result.Pending())
})
}
}
2 changes: 1 addition & 1 deletion adapters/no_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (noa *NoOp) Perform(input models.RunResult, _ *store.Store) models.RunResul
type NoOpPend struct{}

// Perform on this adapter type returns an empty RunResult with an
// added field for the status to indicate the task is Pending
// added field for the status to indicate the task is Pending.
func (noa *NoOpPend) Perform(input models.RunResult, _ *store.Store) models.RunResult {
return input.MarkPending()
}
4 changes: 2 additions & 2 deletions internal/cltest/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func RunResultWithError(err error) models.RunResult {

func MarkJobRunPending(jr models.JobRun, i int) models.JobRun {
jr.Status = models.StatusPending
jr.Result.Pending = true
jr.Result.Status = models.StatusPending
jr.TaskRuns[i].Status = models.StatusPending
jr.TaskRuns[i].Result.Pending = true
jr.TaskRuns[i].Result.Status = models.StatusPending
return jr
}
7 changes: 4 additions & 3 deletions services/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ func ExecuteRun(run models.JobRun, store *store.Store, input models.RunResult) (
return run, wrapError(run, err)
}

if prevRun.Result.Pending {
if prevRun.Result.Pending() {
logger.Infow(fmt.Sprintf("Task %v pending", taskRun.Task.Type), taskRun.ForLogger("task", i, "result", prevRun.Result)...)
break
}
logger.Infow(fmt.Sprintf("Task %v finished", taskRun.Task.Type), taskRun.ForLogger("task", i, "result", prevRun.Result)...)
if prevRun.Result.HasError() {
logger.Infow(fmt.Sprintf("Task %v errored", taskRun.Task.Type), taskRun.ForLogger("task", i, "result", prevRun.Result)...)
break
}
logger.Infow(fmt.Sprintf("Task %v completed", taskRun.Task.Type), taskRun.ForLogger("task", i, "result", prevRun.Result)...)
}

run = run.ApplyResult(prevRun.Result)
Expand All @@ -104,7 +105,7 @@ func startTask(
run.Result = adapter.Perform(input, store)
if run.Result.HasError() {
run.Status = models.StatusErrored
} else if run.Result.Pending {
} else if run.Result.Pending() {
run.Status = models.StatusPending
} else {
run.Status = models.StatusCompleted
Expand Down
32 changes: 19 additions & 13 deletions store/models/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (jr JobRun) ApplyResult(result RunResult) JobRun {
jr.Result = result
if jr.Result.HasError() {
jr.Status = StatusErrored
} else if jr.Result.Pending { // update to be enum and support blocking
} else if jr.Result.Pending() {
jr.Status = StatusPending
} else {
jr.Status = StatusCompleted
Expand Down Expand Up @@ -126,35 +126,36 @@ func (tr TaskRun) MergeTaskParams(j JSON) (TaskRun, error) {
// the Data and ErrorMessage, if any of either, and contains
// a Pending field to track the status.
type RunResult struct {
JobRunID string `json:"jobRunId"`
Data JSON `json:"data"`
ErrorMessage null.String `json:"error"`
Pending bool `json:"pending"`
JobRunID string `json:"jobRunId"`
Data JSON `json:"data"`
ErrorMessage null.String `json:"error"`
Status string `json:"status"`
ExternalPending bool `json:"pending"`
}

// WithValue returns a copy of the RunResult, overriding the "value" field of
// Data and setting Pending to false.
// Data and setting the status to in progress.
func (rr RunResult) WithValue(val string) RunResult {
data, err := rr.Data.Add("value", val)
if err != nil {
return rr.WithError(err)
}
rr.Pending = false
rr.Status = StatusCompleted
rr.Data = data
return rr
}

// WithValue returns a copy of the RunResult, setting the error field
// and setting Pending to false.
// and setting the status to in progress.
func (rr RunResult) WithError(err error) RunResult {
rr.ErrorMessage = null.StringFrom(err.Error())
rr.Pending = false
rr.Status = StatusErrored
return rr
}

// MarkPending returns a copy of RunResult but with Pending set to true.
// MarkPending returns a copy of RunResult but with status set to pending.
func (rr RunResult) MarkPending() RunResult {
rr.Pending = true
rr.Status = StatusPending
return rr
}

Expand Down Expand Up @@ -184,6 +185,11 @@ func (rr RunResult) HasError() bool {
return rr.ErrorMessage.Valid
}

// Pending returns true if the status is pending.
func (rr RunResult) Pending() bool {
return rr.Status == StatusPending
}

// Error returns the string value of the ErrorMessage field.
func (rr RunResult) Error() string {
return rr.ErrorMessage.String
Expand Down Expand Up @@ -223,8 +229,8 @@ func (rr RunResult) Merge(in RunResult) (RunResult, error) {
if len(in.JobRunID) == 0 {
in.JobRunID = rr.JobRunID
}
if in.Pending || rr.Pending {
in.Pending = true
if in.Pending() || rr.Pending() {
in = in.MarkPending()
}
return in, nil
}
42 changes: 21 additions & 21 deletions store/models/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ func TestRunResult_Merge(t *testing.T) {
name string
originalData string
originalError null.String
originalPending bool
originalStatus string
originalJRID string
inData string
inError null.String
inPending bool
inStatus string
inJRID string
wantData string
wantErrorMessage null.String
Expand All @@ -140,32 +140,32 @@ func TestRunResult_Merge(t *testing.T) {
wantErrored bool
}{
{"merging data",
`{"value":"old&busted","unique":"1"}`, nullString, false, jrID,
`{"value":"newHotness","and":"!"}`, nullString, false, jrID,
`{"value":"old&busted","unique":"1"}`, nullString, models.StatusInProgress, jrID,
`{"value":"newHotness","and":"!"}`, nullString, models.StatusInProgress, jrID,
`{"value":"newHotness","unique":"1","and":"!"}`, nullString, false, jrID, false},
{"original error throws",
`{"value":"old"}`, cltest.NullString("old problem"), false, jrID,
`{}`, nullString, false, jrID,
`{"value":"old"}`, cltest.NullString("old problem"), models.StatusInProgress, jrID,
`{}`, nullString, models.StatusInProgress, jrID,
`{"value":"old"}`, cltest.NullString("old problem"), false, jrID, true},
{"error override",
`{"value":"old"}`, nullString, false, jrID,
`{}`, cltest.NullString("new problem"), false, jrID,
`{"value":"old"}`, nullString, models.StatusInProgress, jrID,
`{}`, cltest.NullString("new problem"), models.StatusInProgress, jrID,
`{"value":"old"}`, cltest.NullString("new problem"), false, jrID, false},
{"original job run ID",
`{"value":"old"}`, nullString, false, jrID,
`{}`, nullString, false, "",
`{"value":"old"}`, nullString, models.StatusInProgress, jrID,
`{}`, nullString, models.StatusInProgress, "",
`{"value":"old"}`, nullString, false, jrID, false},
{"job run ID override",
`{"value":"old"}`, nullString, false, utils.NewBytes32ID(),
`{}`, nullString, false, jrID,
`{"value":"old"}`, nullString, models.StatusInProgress, utils.NewBytes32ID(),
`{}`, nullString, models.StatusInProgress, jrID,
`{"value":"old"}`, nullString, false, jrID, false},
{"original pending",
`{"value":"old"}`, nullString, true, jrID,
`{}`, nullString, false, jrID,
`{"value":"old"}`, nullString, models.StatusPending, jrID,
`{}`, nullString, models.StatusInProgress, jrID,
`{"value":"old"}`, nullString, true, jrID, false},
{"pending override",
`{"value":"old"}`, nullString, false, jrID,
`{}`, nullString, true, jrID,
`{"value":"old"}`, nullString, models.StatusInProgress, jrID,
`{}`, nullString, models.StatusPending, jrID,
`{"value":"old"}`, nullString, true, jrID, false},
}

Expand All @@ -175,31 +175,31 @@ func TestRunResult_Merge(t *testing.T) {
Data: models.JSON{gjson.Parse(test.originalData)},
ErrorMessage: test.originalError,
JobRunID: test.originalJRID,
Pending: test.originalPending,
Status: test.originalStatus,
}
in := models.RunResult{
Data: cltest.JSONFromString(test.inData),
ErrorMessage: test.inError,
JobRunID: test.inJRID,
Pending: test.inPending,
Status: test.inStatus,
}
merged, err := original.Merge(in)
assert.Equal(t, test.wantErrored, err != nil)

assert.JSONEq(t, test.originalData, original.Data.String())
assert.Equal(t, test.originalError, original.ErrorMessage)
assert.Equal(t, test.originalJRID, original.JobRunID)
assert.Equal(t, test.originalPending, original.Pending)
assert.Equal(t, test.originalStatus, original.Status)

assert.JSONEq(t, test.inData, in.Data.String())
assert.Equal(t, test.inError, in.ErrorMessage)
assert.Equal(t, test.inJRID, in.JobRunID)
assert.Equal(t, test.inPending, in.Pending)
assert.Equal(t, test.inStatus, in.Status)

assert.JSONEq(t, test.wantData, merged.Data.String())
assert.Equal(t, test.wantErrorMessage, merged.ErrorMessage)
assert.Equal(t, test.wantJRID, merged.JobRunID)
assert.Equal(t, test.wantPending, merged.Pending)
assert.Equal(t, test.wantPending, merged.Pending())
})
}
}
2 changes: 1 addition & 1 deletion web/job_runs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (jrc *JobRunsController) Update(c *gin.Context) {
c.JSON(500, gin.H{
"errors": []string{err.Error()},
})
} else if !jr.Result.Pending {
} else if !jr.Result.Pending() {
c.JSON(405, gin.H{
"errors": []string{"Cannot resume a job run that isn't pending"},
})
Expand Down

0 comments on commit 3eb0036

Please sign in to comment.