Skip to content

Commit

Permalink
Fix query workflow high latency after a long inactive time (uber#4871)
Browse files Browse the repository at this point in the history
* Fix query workflow high latency after a long inactive time

* Add new client integration test
  • Loading branch information
Shaddoll authored Jul 28, 2022
1 parent 43a17d2 commit a4d77f5
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 21 deletions.
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,7 @@ const (
CadenceErrUnauthorizedPerTaskListCounter
CadenceErrAuthorizeFailedPerTaskListCounter
CadenceErrRemoteSyncMatchFailedPerTaskListCounter
CadenceErrStickyWorkerUnavailablePerTaskListCounter

CadenceShardSuccessGauge
CadenceShardFailureGauge
Expand Down Expand Up @@ -2478,6 +2479,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceErrRemoteSyncMatchFailedPerTaskListCounter: {
metricName: "cadence_errors_remote_syncmatch_failed_per_tl", metricRollupName: "cadence_errors_remote_syncmatch_failed", metricType: Counter,
},
CadenceErrStickyWorkerUnavailablePerTaskListCounter: {
metricName: "cadence_errors_sticky_worker_unavailable_per_tl", metricRollupName: "cadence_errors_sticky_worker_unavailable_per_tl", metricType: Counter,
},
CadenceShardSuccessGauge: {metricName: "cadence_shard_success", metricType: Gauge},
CadenceShardFailureGauge: {metricName: "cadence_shard_failure", metricType: Gauge},
DomainReplicationQueueSizeGauge: {metricName: "domain_replication_queue_size", metricType: Gauge},
Expand Down
158 changes: 150 additions & 8 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ import (
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/client"
"go.uber.org/cadence/compatibility"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"

"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/service"
)
Expand Down Expand Up @@ -101,14 +104,14 @@ func (s *ClientIntegrationSuite) TearDownSuite() {

func (s *ClientIntegrationSuite) buildServiceClient() (workflowserviceclient.Interface, error) {
cadenceClientName := "cadence-client"
hostPort := "127.0.0.1:7104"
hostPort := "127.0.0.1:7114" // use grpc port
if TestFlags.FrontendAddr != "" {
hostPort = TestFlags.FrontendAddr
}
ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(cadenceClientName))
if err != nil {
s.Logger.Fatal("Failed to create transport channel", tag.Error(err))
}
ch := grpc.NewTransport(
grpc.ServerMaxRecvMsgSize(32*1024*1024),
grpc.ClientMaxRecvMsgSize(32*1024*1024),
)

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: cadenceClientName,
Expand All @@ -122,8 +125,13 @@ func (s *ClientIntegrationSuite) buildServiceClient() (workflowserviceclient.Int
if err := dispatcher.Start(); err != nil {
s.Logger.Fatal("Failed to create outbound transport channel", tag.Error(err))
}

return workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend)), nil
cc := dispatcher.ClientConfig(service.Frontend)
return compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(cc),
apiv1.NewWorkflowAPIYARPCClient(cc),
apiv1.NewWorkerAPIYARPCClient(cc),
apiv1.NewVisibilityAPIYARPCClient(cc),
), nil
}

func (s *ClientIntegrationSuite) SetupTest() {
Expand Down Expand Up @@ -372,3 +380,137 @@ func (s *ClientIntegrationSuite) TestClientDataConverter_WithChild() {
s.Equal(3, d.NumOfCallToData)
s.Equal(2, d.NumOfCallFromData)
}

func (s *ClientIntegrationSuite) Test_StickyWorkerRestartDecisionTask() {
testCases := []struct {
name string
waitTime time.Duration
doQuery bool
doSignal bool
delayCheck func(duration time.Duration) bool
}{
{
name: "new_query_after_10s_no_delay",
waitTime: 10 * time.Second,
doQuery: true,
delayCheck: func(duration time.Duration) bool {
return duration < 5*time.Second
},
},
{
name: "new_query_immediately_expect_5s_delay",
waitTime: 0,
doQuery: true,
delayCheck: func(duration time.Duration) bool {
return duration > 5*time.Second
},
},
{
name: "new_workflow_task_after_10s_no_delay",
waitTime: 10 * time.Second,
doSignal: true,
delayCheck: func(duration time.Duration) bool {
return duration < 5*time.Second
},
},
{
name: "new_workflow_task_immediately_expect_5s_delay",
waitTime: 0,
doSignal: true,
delayCheck: func(duration time.Duration) bool {
return duration > 5*time.Second
},
},
}
for _, tt := range testCases {
s.Run(tt.name, func() {
workflowFn := func(ctx workflow.Context) (string, error) {
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return "query works", nil
})

signalCh := workflow.GetSignalChannel(ctx, "test")
var msg string
signalCh.Receive(ctx, &msg)
return msg, nil
}

taskList := "task-list-" + tt.name

oldWorker := worker.New(s.wfService, s.domainName, taskList, worker.Options{})
oldWorker.RegisterWorkflow(workflowFn)
if err := oldWorker.Start(); err != nil {
s.Logger.Fatal("Error when start worker", tag.Error(err))
}

id := "test-sticky-delay" + tt.name
workflowOptions := client.StartWorkflowOptions{
ID: id,
TaskList: taskList,
ExecutionStartToCloseTimeout: 20 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
defer cancel()
workflowRun, err := s.wfClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

findFirstWorkflowTaskCompleted := false
WaitForFirstWorkflowTaskComplete:
for i := 0; i < 10; i++ {
// wait until first workflow task completed (so we know sticky is set on workflow)
iter := s.wfClient.GetWorkflowHistory(ctx, id, "", false, 0)
for iter.HasNext() {
evt, err := iter.Next()
s.NoError(err)
if evt.GetEventType() == shared.EventTypeDecisionTaskCompleted {
findFirstWorkflowTaskCompleted = true
break WaitForFirstWorkflowTaskComplete
}
}
time.Sleep(time.Second)
}
s.True(findFirstWorkflowTaskCompleted)

// stop old worker
oldWorker.Stop()

// maybe wait for 10s, which will make matching aware the old sticky worker is unavailable
time.Sleep(tt.waitTime)

// start a new worker
newWorker := worker.New(s.wfService, s.domainName, taskList, worker.Options{})
newWorker.RegisterWorkflow(workflowFn)
if err := newWorker.Start(); err != nil {
s.Logger.Fatal("Error when start worker", tag.Error(err))
}
defer newWorker.Stop()

startTime := time.Now()
// send a signal, and workflow should complete immediately, there should not be 5s delay
if tt.doSignal {
err = s.wfClient.SignalWorkflow(ctx, id, "", "test", "test")
s.NoError(err)

err = workflowRun.Get(ctx, nil)
s.NoError(err)
} else if tt.doQuery {
// send a signal, and workflow should complete immediately, there should not be 5s delay
queryResult, err := s.wfClient.QueryWorkflow(ctx, id, "", "test", "test")
s.NoError(err)

var queryResultStr string
err = queryResult.Get(&queryResultStr)
s.NoError(err)
s.Equal("query works", queryResultStr)
}
endTime := time.Now()
duration := endTime.Sub(startTime)
s.True(tt.delayCheck(duration), "delay check failed: %s", duration)
})
}
}
1 change: 1 addition & 0 deletions host/query_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Sticky() {
}
queryResultCh := make(chan QueryResult)
queryWorkflowFn := func(queryType string) {
time.Sleep(100 * time.Millisecond) // sleep for a short time, otherwise sticky tasklist will not have workers available
queryResp, err := s.engine.QueryWorkflow(createContext(), &types.QueryWorkflowRequest{
Domain: s.domainName,
Execution: &types.WorkflowExecution{
Expand Down
17 changes: 15 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,19 @@ func (e *historyEngineImpl) queryDirectlyThroughMatching(
scope.IncCounter(metrics.DirectQueryDispatchStickySuccessCount)
return &types.HistoryQueryWorkflowResponse{Response: matchingResp}, nil
}
if yarpcError, ok := err.(*yarpcerrors.Status); !ok || yarpcError.Code() != yarpcerrors.CodeDeadlineExceeded {
switch v := err.(type) {
case *types.StickyWorkerUnavailableError:
case *yarpcerrors.Status:
if v.Code() != yarpcerrors.CodeDeadlineExceeded {
e.logger.Error("query directly though matching on sticky failed, will not attempt query on non-sticky",
tag.WorkflowDomainName(queryRequest.GetDomain()),
tag.WorkflowID(queryRequest.Execution.GetWorkflowID()),
tag.WorkflowRunID(queryRequest.Execution.GetRunID()),
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()),
tag.Error(err))
return nil, err
}
default:
e.logger.Error("query directly though matching on sticky failed, will not attempt query on non-sticky",
tag.WorkflowDomainName(queryRequest.GetDomain()),
tag.WorkflowID(queryRequest.Execution.GetWorkflowID()),
Expand All @@ -1320,7 +1332,8 @@ func (e *historyEngineImpl) queryDirectlyThroughMatching(
tag.WorkflowDomainName(queryRequest.GetDomain()),
tag.WorkflowID(queryRequest.Execution.GetWorkflowID()),
tag.WorkflowRunID(queryRequest.Execution.GetRunID()),
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()))
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()),
tag.Error(err))
resetContext, cancel := context.WithTimeout(context.Background(), 5*time.Second)
clearStickinessStopWatch := scope.StartTimer(metrics.DirectQueryDispatchClearStickinessLatency)
_, err := e.ResetStickyTaskList(resetContext, &types.HistoryResetStickyTaskListRequest{
Expand Down
16 changes: 15 additions & 1 deletion service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,21 @@ func (t *transferActiveTaskExecutor) processDecisionTask(
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
return t.pushDecision(ctx, task, taskList, decisionTimeout)
err = t.pushDecision(ctx, task, taskList, decisionTimeout)
if _, ok := err.(*types.StickyWorkerUnavailableError); ok {
// sticky worker is unavailable, switch to non-sticky task list
taskList = &types.TaskList{
Name: mutableState.GetExecutionInfo().TaskList,
}

// Continue to use sticky schedule_to_start timeout as TTL for the matching task. Because the schedule_to_start
// timeout timer task is already created which will timeout this task if no worker pick it up in 5s anyway.
// There is no need to reset sticky, because if this task is picked by new worker, the new worker will reset
// the sticky queue to a new one. However, if worker is completely down, that schedule_to_start timeout task
// will re-create a new non-sticky task and reset sticky.
err = t.pushDecision(ctx, task, taskList, decisionTimeout)
}
return err
}

func (t *transferActiveTaskExecutor) processCloseExecution(
Expand Down
8 changes: 7 additions & 1 deletion service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (t *transferStandbyTaskExecutor) processDecisionTask(
executionInfo := mutableState.GetExecutionInfo()
workflowTimeout := executionInfo.WorkflowTimeout
decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout)
if executionInfo.TaskList != transferTask.TaskList {
// Experimental: try to push sticky task as regular task with sticky timeout as TTL.
// workflow might be sticky before namespace become standby
// there shall already be a schedule_to_start timer created
decisionTimeout = executionInfo.StickyScheduleToStartTimeout
}

ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, decisionInfo.Version, transferTask.Version, transferTask)
if err != nil || !ok {
Expand All @@ -186,7 +192,7 @@ func (t *transferStandbyTaskExecutor) processDecisionTask(
if decisionInfo.StartedID == common.EmptyEventID {
return newPushDecisionToMatchingInfo(
decisionTimeout,
types.TaskList{Name: transferTask.TaskList},
types.TaskList{Name: executionInfo.TaskList}, // at standby, always use non-sticky tasklist
), nil
}

Expand Down
3 changes: 3 additions & 0 deletions service/matching/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (reqCtx *handlerContext) handleErr(err error) error {
case *types.RemoteSyncMatchedError:
scope.IncCounter(metrics.CadenceErrRemoteSyncMatchFailedPerTaskListCounter)
return err
case *types.StickyWorkerUnavailableError:
scope.IncCounter(metrics.CadenceErrStickyWorkerUnavailablePerTaskListCounter)
return err
default:
scope.IncCounter(metrics.CadenceFailuresPerTaskList)
reqCtx.logger.Error("Uncategorized error", tag.Error(err))
Expand Down
21 changes: 21 additions & 0 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ import (
"github.com/uber/cadence/common/types"
)

// If sticky poller is not seem in last 10s, we treat it as sticky worker unavailable
// This seems aggressive, but the default sticky schedule_to_start timeout is 5s, so 10s seems reasonable.
const _stickyPollerUnavailableWindow = 10 * time.Second

// Implements matching.Engine
// TODO: Switch implementation from lock/channel based to a partitioned agent
// to simplify code and reduce possibility of synchronization errors.
Expand Down Expand Up @@ -101,6 +105,8 @@ var (

pollerIDKey pollerIDCtxKey = "pollerID"
identityKey identityCtxKey = "identity"

_stickyPollerUnavailableError = &types.StickyWorkerUnavailableError{Message: "sticky worker is unavailable, please use non-sticky task list."}
)

var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented
Expand Down Expand Up @@ -290,6 +296,13 @@ func (e *matchingEngineImpl) AddDecisionTask(
return false, err
}

if taskListKind != nil && *taskListKind == types.TaskListKindSticky {
// check if the sticky worker is still available, if not, fail this request early
if !tlMgr.HasPollerAfter(time.Now().Add(-_stickyPollerUnavailableWindow)) {
return false, _stickyPollerUnavailableError
}
}

taskInfo := &persistence.TaskInfo{
DomainID: domainID,
RunID: request.Execution.GetRunID(),
Expand Down Expand Up @@ -597,6 +610,14 @@ func (e *matchingEngineImpl) QueryWorkflow(
if err != nil {
return nil, err
}

if taskListKind != nil && *taskListKind == types.TaskListKindSticky {
// check if the sticky worker is still available, if not, fail this request early
if !tlMgr.HasPollerAfter(time.Now().Add(-_stickyPollerUnavailableWindow)) {
return nil, _stickyPollerUnavailableError
}
}

taskID := uuid.New()
resp, err := tlMgr.DispatchQueryTask(hCtx.Context, taskID, queryRequest)

Expand Down
15 changes: 14 additions & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,25 @@ func (s *matchingEngineSuite) PollForDecisionTasksResultTest() {
}

_, err := s.matchingEngine.AddDecisionTask(s.handlerContext, &addRequest)
s.Error(err)
s.Contains(err.Error(), "sticky worker is unavailable")
// poll the sticky tasklist, should get no result
resp, err := s.matchingEngine.PollForDecisionTask(s.handlerContext, &types.MatchingPollForDecisionTaskRequest{
DomainUUID: domainID,
PollRequest: &types.PollForDecisionTaskRequest{
TaskList: stickyTaskList,
Identity: identity},
})
s.NoError(err)
s.Equal(emptyPollForDecisionTaskResponse, resp)
// add task to sticky tasklist again, this time it should pass
_, err = s.matchingEngine.AddDecisionTask(s.handlerContext, &addRequest)
s.NoError(err)

taskList := &types.TaskList{}
taskList.Name = tl

resp, err := s.matchingEngine.PollForDecisionTask(s.handlerContext, &types.MatchingPollForDecisionTaskRequest{
resp, err = s.matchingEngine.PollForDecisionTask(s.handlerContext, &types.MatchingPollForDecisionTaskRequest{
DomainUUID: domainID,
PollRequest: &types.PollForDecisionTaskRequest{
TaskList: stickyTaskList,
Expand Down
Loading

0 comments on commit a4d77f5

Please sign in to comment.