Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove sticky query #2846

Merged
merged 2 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions common/client/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,9 @@ const (
// SupportedCLIVersion indicates the highest cli version server will accept requests from
SupportedCLIVersion = "1.4.0"

// GoWorkerStickyQueryVersion indicates the minimum client version of go worker which supports StickyQuery
GoWorkerStickyQueryVersion = "1.0.0"
// JavaWorkerStickyQueryVersion indicates the minimum client version of the java worker which supports StickyQuery
JavaWorkerStickyQueryVersion = "1.0.0"
// GoWorkerConsistentQueryVersion indicates the minimum client version of the go worker which supports ConsistentQuery
GoWorkerConsistentQueryVersion = "1.5.0"

stickyQuery = "sticky-query"
consistentQuery = "consistent-query"
)

Expand All @@ -62,7 +57,6 @@ type (
VersionChecker interface {
ClientSupported(ctx context.Context, enableClientVersionCheck bool) error

SupportsStickyQuery(clientImpl string, clientFeatureVersion string) error
SupportsConsistentQuery(clientImpl string, clientFeatureVersion string) error
}

Expand All @@ -76,12 +70,9 @@ type (
func NewVersionChecker() VersionChecker {
supportedFeatures := map[string]map[string]version.Constraints{
GoSDK: {
stickyQuery: mustNewConstraint(fmt.Sprintf(">=%v", GoWorkerStickyQueryVersion)),
consistentQuery: mustNewConstraint(fmt.Sprintf(">=%v", GoWorkerConsistentQueryVersion)),
},
JavaSDK: {
stickyQuery: mustNewConstraint(fmt.Sprintf(">=%v", JavaWorkerStickyQueryVersion)),
},
JavaSDK: {},
}
supportedClients := map[string]version.Constraints{
GoSDK: mustNewConstraint(fmt.Sprintf("<=%v", SupportedGoSDKVersion)),
Expand Down Expand Up @@ -122,12 +113,6 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi
return nil
}

// SupportsStickyQuery returns error if sticky query is not supported otherwise nil.
// In case client version lookup fails assume the client does not support feature.
func (vc *versionChecker) SupportsStickyQuery(clientImpl string, clientFeatureVersion string) error {
return vc.featureSupported(clientImpl, clientFeatureVersion, stickyQuery)
}

// SupportsConsistentQuery returns error if consistent query is not supported otherwise nil.
// In case client version lookup fails assume the client does not support feature.
func (vc *versionChecker) SupportsConsistentQuery(clientImpl string, clientFeatureVersion string) error {
Expand Down
68 changes: 0 additions & 68 deletions common/client/versionChecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,74 +120,6 @@ func (s *VersionCheckerSuite) TestClientSupported() {
}
}

func (s *VersionCheckerSuite) TestSupportsStickyQuery() {
testCases := []struct {
clientImpl string
clientFeatureVersion string
expectErr bool
}{
{
clientImpl: "",
expectErr: true,
},
{
clientImpl: GoSDK,
expectErr: true,
},
{
clientImpl: "unknown",
clientFeatureVersion: "0.0.0",
expectErr: true,
},
{
clientImpl: GoSDK,
clientFeatureVersion: "malformed-feature-version",
expectErr: true,
},
{
clientImpl: GoSDK,
clientFeatureVersion: GoWorkerStickyQueryVersion,
expectErr: false,
},
{
clientImpl: JavaSDK,
clientFeatureVersion: JavaWorkerStickyQueryVersion,
expectErr: false,
},
{
clientImpl: GoSDK,
clientFeatureVersion: "0.9.0",
expectErr: true,
},
{
clientImpl: JavaSDK,
clientFeatureVersion: "0.9.0",
expectErr: true,
},
{
clientImpl: GoSDK,
clientFeatureVersion: "2.0.0",
expectErr: false,
},
{
clientImpl: JavaSDK,
clientFeatureVersion: "2.0.0",
expectErr: false,
},
}

for _, tc := range testCases {
vc := NewVersionChecker()
if tc.expectErr {
err := vc.SupportsStickyQuery(tc.clientImpl, tc.clientFeatureVersion)
s.Error(err)
s.IsType(&shared.ClientVersionNotSupportedError{}, err)
} else {
s.NoError(vc.SupportsStickyQuery(tc.clientImpl, tc.clientFeatureVersion))
}
}
}

func (s *VersionCheckerSuite) TestSupportsConsistentQuery() {
testCases := []struct {
clientImpl string
Expand Down
56 changes: 1 addition & 55 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (

"github.com/pborman/uuid"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/yarpc/yarpcerrors"
"golang.org/x/net/context"

h "github.com/uber/cadence/.gen/go/history"
m "github.com/uber/cadence/.gen/go/matching"
Expand All @@ -43,7 +41,6 @@ import (
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/definition"
Expand Down Expand Up @@ -887,60 +884,9 @@ func (e *historyEngineImpl) queryDirectlyThroughMatching(
matchingRequest := &m.QueryWorkflowRequest{
DomainUUID: common.StringPtr(domainID),
QueryRequest: queryRequest,
TaskList: msResp.TaskList,
}

supportsStickyQuery := client.NewVersionChecker().SupportsStickyQuery(msResp.GetClientImpl(), msResp.GetClientFeatureVersion()) == nil
if len(msResp.GetStickyTaskList().GetName()) != 0 && supportsStickyQuery {
matchingRequest.TaskList = msResp.StickyTaskList
// using a clean new context in case customer provide a context which has
// a really short deadline, causing we clear the stickyness
stickyContext, cancel := context.WithTimeout(context.Background(), time.Duration(msResp.GetStickyTaskListScheduleToStartTimeout())*time.Second)
matchingResp, err := e.rawMatchingClient.QueryWorkflow(stickyContext, matchingRequest)
cancel()
if err == nil {
return &h.QueryWorkflowResponse{Response: matchingResp}, nil
}
if yarpcError, ok := err.(*yarpcerrors.Status); !ok || yarpcError.Code() != yarpcerrors.CodeDeadlineExceeded {
e.logger.Error("Query directly though matching on sticky failed",
tag.WorkflowDomainName(queryRequest.GetDomain()),
tag.WorkflowID(queryRequest.Execution.GetWorkflowId()),
tag.WorkflowRunID(queryRequest.Execution.GetRunId()),
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()),
tag.Error(err))
return nil, err
}
// this means sticky timeout, should try using the normal tasklist
// we should clear the stickyness of this workflow
e.logger.Info("QueryWorkflow timed-out with stickyTaskList, will try nonSticky one",
tag.WorkflowDomainName(queryRequest.GetDomain()),
tag.WorkflowID(queryRequest.Execution.GetWorkflowId()),
tag.WorkflowRunID(queryRequest.Execution.GetRunId()),
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()),
tag.WorkflowTaskListName(msResp.GetStickyTaskList().GetName()),
tag.WorkflowNextEventID(msResp.GetNextEventId()),
tag.Error(err))

resetContext, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = e.ResetStickyTaskList(resetContext, &h.ResetStickyTaskListRequest{
DomainUUID: common.StringPtr(domainID),
Execution: queryRequest.Execution,
})
cancel()
if err != nil {
return nil, err
}
}

matchingRequest.TaskList = msResp.TaskList
if err := common.IsValidContext(ctx); err != nil {
e.logger.Error("Context expired before query could be attempted on nonSticky",
tag.WorkflowDomainName(queryRequest.GetDomain()),
tag.WorkflowID(queryRequest.Execution.GetWorkflowId()),
tag.WorkflowRunID(queryRequest.Execution.GetRunId()),
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()),
tag.Error(err))
return nil, err
}
matchingResp, err := e.matchingClient.QueryWorkflow(ctx, matchingRequest)
if err != nil {
e.logger.Error("Query directly though matching on non-sticky failed",
Expand Down
7 changes: 1 addition & 6 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,11 @@ pollLoop:
return emptyPollForDecisionTaskResponse, nil
}

isStickyEnabled := false
supportsSticky := client.NewVersionChecker().SupportsStickyQuery(mutableStateResp.GetClientImpl(), mutableStateResp.GetClientFeatureVersion()) == nil
if len(mutableStateResp.StickyTaskList.GetName()) != 0 && supportsSticky {
isStickyEnabled = true
}
resp := &h.RecordDecisionTaskStartedResponse{
PreviousStartedEventId: mutableStateResp.PreviousStartedEventId,
NextEventId: mutableStateResp.NextEventId,
WorkflowType: mutableStateResp.WorkflowType,
StickyExecutionEnabled: common.BoolPtr(isStickyEnabled),
StickyExecutionEnabled: common.BoolPtr(false),
WorkflowExecutionTaskList: mutableStateResp.TaskList,
BranchToken: mutableStateResp.CurrentBranchToken,
}
Expand Down