Skip to content

Commit

Permalink
Add JitterDelay option when creating workflows. (cadence-workflow#4886)
Browse files Browse the repository at this point in the history
* Add JitterDelay option when creating workflows.

* Update idls

* getting jitter to work with cron

* Update IDL

Co-authored-by: David Porter <david.porter@uber.com>
  • Loading branch information
2 people authored and abhishekj720 committed Jul 21, 2022
1 parent 8e7448f commit d6f7e45
Show file tree
Hide file tree
Showing 27 changed files with 1,723 additions and 1,172 deletions.
276 changes: 266 additions & 10 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

920 changes: 462 additions & 458 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

920 changes: 462 additions & 458 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

451 changes: 226 additions & 225 deletions .gen/proto/shared/v1/history.pb.yarpc.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ go test -v <path> -run <TestSuite> -testify.m <TestSpercificTaskName>
go test -v github.com/uber/cadence/common/persistence/persistence-tests -run TestVisibilitySamplingSuite -testify.m TestListClosedWorkflowExecutions
```

## IDL Changes

If you make changes in the idls submodule and want to test them locally, you can easily do that by using go mod to use the local idls directory instead of github.com/uber/cadence-idl. Temporarily add the following to the bottom of go.mod:

```replace github.com/uber/cadence-idl => ./idls```

## Pull Requests
After all the preparation you are about to write code and make a Pull Request for the issue.

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ cadence-bench: $(BUILD)/lint

.PHONY: go-generate bins tools release clean

bins: $(BINS)
bins: $(BINS) ## Make all binaries

tools: $(TOOLS)

go-generate: $(BIN)/mockgen $(BIN)/enumer $(BIN)/mockery $(BIN)/gowrap
Expand Down
25 changes: 21 additions & 4 deletions common/backoff/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package backoff

import (
"math"
"math/rand"
"time"

"github.com/robfig/cron"
Expand All @@ -45,7 +46,12 @@ func ValidateSchedule(cronSchedule string) error {

// GetBackoffForNextSchedule calculates the backoff time for the next run given
// a cronSchedule, workflow start time and workflow close time
func GetBackoffForNextSchedule(cronSchedule string, startTime time.Time, closeTime time.Time) time.Duration {
func GetBackoffForNextSchedule(
cronSchedule string,
startTime time.Time,
closeTime time.Time,
jitterStartSeconds int32,
) time.Duration {
if len(cronSchedule) == 0 {
return NoBackoff
}
Expand All @@ -63,13 +69,24 @@ func GetBackoffForNextSchedule(cronSchedule string, startTime time.Time, closeTi
}
backoffInterval := nextScheduleTime.Sub(closeUTCTime)
roundedInterval := time.Second * time.Duration(math.Ceil(backoffInterval.Seconds()))
return roundedInterval

var jitter time.Duration
if jitterStartSeconds > 0 {
jitter = time.Duration(rand.Int31n(jitterStartSeconds+1)) * time.Second
}

return roundedInterval + jitter
}

// GetBackoffForNextScheduleInSeconds calculates the backoff time in seconds for the
// next run given a cronSchedule and current time
func GetBackoffForNextScheduleInSeconds(cronSchedule string, startTime time.Time, closeTime time.Time) int32 {
backoffDuration := GetBackoffForNextSchedule(cronSchedule, startTime, closeTime)
func GetBackoffForNextScheduleInSeconds(
cronSchedule string,
startTime time.Time,
closeTime time.Time,
jitterStartSeconds int32,
) int32 {
backoffDuration := GetBackoffForNextSchedule(cronSchedule, startTime, closeTime, jitterStartSeconds)
if backoffDuration == NoBackoff {
return 0
}
Expand Down
86 changes: 85 additions & 1 deletion common/backoff/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package backoff

import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -61,8 +63,90 @@ func TestCron(t *testing.T) {
if tt.result != NoBackoff {
assert.NoError(t, err)
}
backoff := GetBackoffForNextSchedule(tt.cron, start, end)
backoff := GetBackoffForNextSchedule(tt.cron, start, end, 0)
assert.Equal(t, tt.result, backoff, "The cron spec is %s and the expected result is %s", tt.cron, tt.result)
})
}
}

func TestCronWithJitterStart(t *testing.T) {
var cronWithJitterStartTests = []struct {
cron string
startTime string
jitterStartSeconds int32
expectedResultSeconds time.Duration
expectedResultSeconds2 time.Duration
}{
// Note that the cron scheduler we use (robfig) schedules differently depending on the syntax:
// 1) * * * syntax : next run is scheduled on the first second of each minute, starting at next minute
// 2) @every X syntax: next run is scheduled X seconds from the time passed in to Next() call.
{"* * * * *", "2018-12-17T08:00:00+00:00", 10, time.Second * 60, time.Second * 60},
{"* * * * *", "2018-12-17T08:00:10+00:00", 30, time.Second * 50, time.Second * 60},
{"* * * * *", "2018-12-17T08:00:25+00:00", 15, time.Second * 35, time.Second * 60},
{"* * * * *", "2018-12-17T08:00:45+00:00", 0, time.Second * 15, time.Second * 60},
{"@every 60s", "2018-12-17T08:00:45+00:00", 0, time.Second * 60, time.Second * 60},
{"* * * * *", "2018-12-17T08:00:45+00:00", 45, time.Second * 15, time.Second * 60},
{"@every 60s", "2018-12-17T08:00:45+00:00", 45, time.Second * 60, time.Second * 60},
{"* * * * *", "2018-12-17T08:00:00+00:00", 70, time.Second * 60, time.Second * 60},
{"@every 20s", "2018-12-17T08:00:00+00:00", 15, time.Second * 20, time.Second * 20},
{"@every 10s", "2018-12-17T08:00:09+00:00", 0, time.Second * 10, time.Second * 10},
{"@every 20s", "2018-12-17T08:00:09+00:00", 15, time.Second * 20, time.Second * 20},
{"* * * * *", "0001-01-01T00:00:00+00:00", 0, time.Second * 60, time.Second * 60},
{"@every 20s", "0001-01-01T00:00:00+00:00", 0, time.Second * 20, time.Second * 20},
}

rand.Seed(int64(time.Now().Nanosecond()))
for idx, tt := range cronWithJitterStartTests {
t.Run(strconv.Itoa(idx), func(t *testing.T) {
exactCount := 0

start, _ := time.Parse(time.RFC3339, tt.startTime)
end := start
err := ValidateSchedule(tt.cron)
if tt.expectedResultSeconds != NoBackoff {
assert.NoError(t, err)
}
backoff := GetBackoffForNextSchedule(tt.cron, start, end, tt.jitterStartSeconds)
fmt.Printf("Backoff time for test %d = %v\n", idx, backoff)
delta := time.Duration(tt.jitterStartSeconds) * time.Second
expectedResultTime := start.Add(tt.expectedResultSeconds)
backoffTime := start.Add(backoff)
assert.WithinDuration(t, expectedResultTime, backoffTime, delta,
"The test specs are %v and the expected result in seconds is between %s and %s",
tt, tt.expectedResultSeconds, tt.expectedResultSeconds+delta)
if expectedResultTime == backoffTime {
exactCount++
}

// Also check next X cron times
caseCount := 5
for i := 1; i < caseCount; i++ {
startTime := expectedResultTime

backoff := GetBackoffForNextSchedule(tt.cron, startTime, startTime, tt.jitterStartSeconds)
expectedResultTime := startTime.Add(tt.expectedResultSeconds2)
backoffTime := startTime.Add(backoff)
assert.WithinDuration(t, expectedResultTime, backoffTime, delta,
"Iteration %d: The test specs are %v and the expected result in seconds is between %s and %s",
i, tt, tt.expectedResultSeconds, tt.expectedResultSeconds+delta)
if expectedResultTime == backoffTime {
exactCount++
}

}

// If jitter is > 0, we want to detect whether jitter is being applied - BUT we don't want the test
// to be flaky if the code randomly chooses a jitter of 0, so we try to have enough data points by
// checking the next X cron times AND by choosing a jitter thats not super low.

if tt.jitterStartSeconds > 0 && exactCount == caseCount {
// Test to make sure a jitter test case sometimes doesn't get exact values
t.Fatalf("FAILED to jitter properly? Test specs = %v\n", tt)
} else if tt.jitterStartSeconds == 0 && exactCount != caseCount {
// Test to make sure a non-jitter test case always gets exact values
t.Fatalf("Jittered when we weren't supposed to? Test specs = %v\n", tt)
}

})
}
}
8 changes: 8 additions & 0 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func FromContinueAsNewWorkflowExecutionDecisionAttributes(t *types.ContinueAsNew
Header: FromHeader(t.Header),
Memo: FromMemo(t.Memo),
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
JitterStart: secondsToDuration(t.JitterStartSeconds),
}
}

Expand All @@ -698,6 +699,7 @@ func ToContinueAsNewWorkflowExecutionDecisionAttributes(t *apiv1.ContinueAsNewWo
Header: ToHeader(t.Header),
Memo: ToMemo(t.Memo),
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
JitterStartSeconds: durationToSeconds(t.JitterStart),
}
}

Expand Down Expand Up @@ -3338,6 +3340,7 @@ func FromSignalWithStartWorkflowExecutionRequest(t *types.SignalWithStartWorkflo
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
Header: FromHeader(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
},
SignalName: t.SignalName,
SignalInput: FromPayload(t.SignalInput),
Expand Down Expand Up @@ -3369,6 +3372,7 @@ func ToSignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowE
SearchAttributes: ToSearchAttributes(t.StartRequest.SearchAttributes),
Header: ToHeader(t.StartRequest.Header),
DelayStartSeconds: durationToSeconds(t.StartRequest.DelayStart),
JitterStartSeconds: durationToSeconds(t.StartRequest.JitterStart),
}
}

Expand Down Expand Up @@ -3518,6 +3522,7 @@ func FromStartChildWorkflowExecutionInitiatedEventAttributes(t *types.StartChild
Memo: FromMemo(t.Memo),
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
}
}

Expand All @@ -3543,6 +3548,7 @@ func ToStartChildWorkflowExecutionInitiatedEventAttributes(t *apiv1.StartChildWo
Memo: ToMemo(t.Memo),
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
}
}

Expand Down Expand Up @@ -3607,6 +3613,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
Header: FromHeader(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
}
}

Expand All @@ -3631,6 +3638,7 @@ func ToStartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *ty
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
Header: ToHeader(t.Header),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5238,6 +5238,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
Header: FromHeader(t.Header),
DelayStartSeconds: t.DelayStartSeconds,
JitterStartSeconds: t.JitterStartSeconds,
}
}

Expand All @@ -5263,6 +5264,7 @@ func ToStartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *t
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
Header: ToHeader(t.Header),
DelayStartSeconds: t.DelayStartSeconds,
JitterStartSeconds: t.JitterStartSeconds,
}
}

Expand Down
29 changes: 29 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ type ContinueAsNewWorkflowExecutionDecisionAttributes struct {
Header *Header `json:"header,omitempty"`
Memo *Memo `json:"memo,omitempty"`
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
}

// GetExecutionStartToCloseTimeoutSeconds is an internal getter (TBD...)
Expand All @@ -756,6 +757,14 @@ func (v *ContinueAsNewWorkflowExecutionDecisionAttributes) GetBackoffStartInterv
return
}

// GetJitterStartSeconds is an internal getter (TBD...)
func (v *ContinueAsNewWorkflowExecutionDecisionAttributes) GetJitterStartSeconds() (o int32) {
if v != nil && v.JitterStartSeconds != nil {
return *v.JitterStartSeconds
}
return
}

// GetInitiator is an internal getter (TBD...)
func (v *ContinueAsNewWorkflowExecutionDecisionAttributes) GetInitiator() (o ContinueAsNewInitiator) {
if v != nil && v.Initiator != nil {
Expand Down Expand Up @@ -5549,6 +5558,7 @@ type SignalWithStartWorkflowExecutionRequest struct {
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
Header *Header `json:"header,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -5819,6 +5829,7 @@ type StartChildWorkflowExecutionInitiatedEventAttributes struct {
Memo *Memo `json:"memo,omitempty"`
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -5915,6 +5926,7 @@ type StartWorkflowExecutionRequest struct {
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
Header *Header `json:"header,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -5957,6 +5969,14 @@ func (v *StartWorkflowExecutionRequest) GetDelayStartSeconds() (o int32) {
return
}

// GetJitterStartSeconds is an internal getter (TBD...)
func (v *StartWorkflowExecutionRequest) GetJitterStartSeconds() (o int32) {
if v != nil && v.JitterStartSeconds != nil {
return *v.JitterStartSeconds
}
return
}

// GetRequestID is an internal getter (TBD...)
func (v *StartWorkflowExecutionRequest) GetRequestID() (o string) {
if v != nil {
Expand Down Expand Up @@ -6753,6 +6773,7 @@ type WorkflowExecutionContinuedAsNewEventAttributes struct {
Header *Header `json:"header,omitempty"`
Memo *Memo `json:"memo,omitempty"`
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
}

// GetNewExecutionRunID is an internal getter (TBD...)
Expand Down Expand Up @@ -6951,6 +6972,7 @@ type WorkflowExecutionStartedEventAttributes struct {
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
PrevAutoResetPoints *ResetPoints `json:"prevAutoResetPoints,omitempty"`
Header *Header `json:"header,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
}

// GetParentWorkflowDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -7041,6 +7063,13 @@ func (v *WorkflowExecutionStartedEventAttributes) GetFirstDecisionTaskBackoffSec
return
}

func (v *WorkflowExecutionStartedEventAttributes) GetJitterStartSeconds() (o int32) {
if v != nil && v.JitterStartSeconds != nil {
return *v.JitterStartSeconds
}
return
}

// GetMemo is an internal getter (TBD...)
func (v *WorkflowExecutionStartedEventAttributes) GetMemo() (o *Memo) {
if v != nil && v.Memo != nil {
Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/decision.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ var (
Header: &Header,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
JitterStartSeconds: &Duration1,
}
FailWorkflowExecutionDecisionAttributes = types.FailWorkflowExecutionDecisionAttributes{
Reason: &FailureReason,
Expand Down
6 changes: 5 additions & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,19 @@ func CreateHistoryStartWorkflowRequest(
}

delayStartSeconds := startRequest.GetDelayStartSeconds()
jitterStartSeconds := startRequest.GetJitterStartSeconds()
firstDecisionTaskBackoffSeconds := delayStartSeconds
if len(startRequest.GetCronSchedule()) > 0 {
delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
firstDecisionTaskBackoffSeconds = backoff.GetBackoffForNextScheduleInSeconds(
startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime)
startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)

// backoff seconds was calculated based on delayed start time, so we need to
// add the delayStartSeconds to that backoff.
firstDecisionTaskBackoffSeconds += delayStartSeconds
} else if jitterStartSeconds > 0 {
// Add a random jitter to start time, if requested.
firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
}

histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)
Expand Down
Loading

0 comments on commit d6f7e45

Please sign in to comment.