Skip to content

Commit

Permalink
Change error type on query before first decision task (#3121)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Mar 19, 2020
1 parent 86ae10c commit 1e52f96
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite

func buildRatelimiters(cfg *config.Persistence, maxQPS dynamicconfig.IntPropertyFn) map[string]quotas.Limiter {
result := make(map[string]quotas.Limiter, len(cfg.DataStores))
for dsName, _ := range cfg.DataStores {
for dsName := range cfg.DataStores {
if maxQPS != nil && maxQPS() > 0 {
result[dsName] = quotas.NewDynamicRateLimiter(func() float64 { return float64(maxQPS()) })
}
Expand Down
4 changes: 2 additions & 2 deletions common/task/weightedRoundRobinTaskScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *weightedRoundRobinTaskSchedulerSuite) TestDispatcher_SubmitWithNoError(
tasksPerRound := []int{6, 5, 2, 1, 1}
round := 0
mockFn := func(_ Task) error {
numSubmittedTask += 1
numSubmittedTask++
if numSubmittedTask == tasksPerRound[round] {
round++
numSubmittedTask = 0
Expand All @@ -182,7 +182,7 @@ func (s *weightedRoundRobinTaskSchedulerSuite) TestDispatcher_SubmitWithNoError(
return nil
}

for priority, _ := range testSchedulerWeights {
for priority := range testSchedulerWeights {
for i := 0; i != taskPerPriority; i++ {
mockTask := NewMockPriorityTask(s.controller)
mockTask.EXPECT().Priority().Return(priority).AnyTimes()
Expand Down
12 changes: 10 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const (
activityCancellationMsgActivityIDUnknown = "ACTIVITY_ID_UNKNOWN"
activityCancellationMsgActivityNotStarted = "ACTIVITY_ID_NOT_STARTED"
timerCancellationMsgTimerIDUnknown = "TIMER_ID_UNKNOWN"
queryFirstDecisionTaskWaitTime = time.Second
defaultQueryFirstDecisionTaskWaitTime = time.Second
queryFirstDecisionTaskCheckInterval = 200 * time.Millisecond
)

Expand Down Expand Up @@ -183,7 +183,7 @@ var (
// ErrQueryEnteredInvalidState is error indicating query entered invalid state
ErrQueryEnteredInvalidState = &workflow.BadRequestError{Message: "query entered invalid state, this should be impossible"}
// ErrQueryWorkflowBeforeFirstDecision is error indicating that query was attempted before first decision task completed
ErrQueryWorkflowBeforeFirstDecision = &workflow.BadRequestError{Message: "workflow must handle at least one decision task before it can be queried"}
ErrQueryWorkflowBeforeFirstDecision = &workflow.QueryFailedError{Message: "workflow must handle at least one decision task before it can be queried"}
// ErrConsistentQueryNotEnabled is error indicating that consistent query was requested but either cluster or domain does not enable consistent query
ErrConsistentQueryNotEnabled = &workflow.BadRequestError{Message: "cluster or domain does not enable strongly consistent query but strongly consistent query was requested"}
// ErrConsistentQueryBufferExceeded is error indicating that too many consistent queries have been buffered and until buffered queries are finished new consistent queries cannot be buffered
Expand Down Expand Up @@ -821,6 +821,14 @@ func (e *historyEngineImpl) QueryWorkflow(

// query cannot be processed unless at least one decision task has finished
// if first decision task has not finished wait for up to a second for it to complete
queryFirstDecisionTaskWaitTime := defaultQueryFirstDecisionTaskWaitTime
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxWaitTime := ctxDeadline.Sub(time.Now()) - time.Second
if ctxWaitTime > queryFirstDecisionTaskWaitTime {
queryFirstDecisionTaskWaitTime = ctxWaitTime
}
}
deadline := time.Now().Add(queryFirstDecisionTaskWaitTime)
for mutableStateResp.GetPreviousStartedEventId() <= 0 && time.Now().Before(deadline) {
<-time.After(queryFirstDecisionTaskCheckInterval)
Expand Down

0 comments on commit 1e52f96

Please sign in to comment.