Skip to content

Commit

Permalink
Remove tasklist kind from tasklist id (#4295)
Browse files Browse the repository at this point in the history
* Remove tasklist kind from tasklist id
  • Loading branch information
yux0 authored Jul 2, 2021
1 parent 6a00f35 commit 28bb116
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 24 deletions.
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,7 @@ func (e *historyEngineImpl) getMutableState(
NextEventID: mutableState.GetNextEventID(),
PreviousStartedEventID: common.Int64Ptr(mutableState.GetPreviousStartedEventID()),
TaskList: &types.TaskList{Name: executionInfo.TaskList},
StickyTaskList: &types.TaskList{Name: executionInfo.StickyTaskList},
StickyTaskList: &types.TaskList{Name: executionInfo.StickyTaskList, Kind: types.TaskListKindSticky.Ptr()},
ClientLibraryVersion: executionInfo.ClientLibraryVersion,
ClientFeatureVersion: executionInfo.ClientFeatureVersion,
ClientImpl: executionInfo.ClientImpl,
Expand Down
23 changes: 11 additions & 12 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ func (e *matchingEngineImpl) getTaskListByDomainLocked(
domainID string,
) []string {
var taskLists []string
for tl := range e.taskLists {
if tl.taskListKind == types.TaskListKindNormal && tl.domainID == domainID {
for tl, tlm := range e.taskLists {
if tlm.GetTaskListKind() == types.TaskListKindNormal && tl.domainID == domainID {
taskLists = append(taskLists, tl.qualifiedTaskListName.baseName)
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (e *matchingEngineImpl) AddDecisionTask(
tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())),
)

taskList, err := newTaskListID(domainID, taskListName, taskListType, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func (e *matchingEngineImpl) AddActivityTask(
tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())),
)

taskList, err := newTaskListID(domainID, taskListName, taskListType, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -361,7 +361,7 @@ pollLoop:
return nil, err
}

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
}
Expand All @@ -370,7 +370,7 @@ pollLoop:
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(hCtx.Context, pollerIDKey, pollerID)
pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity())
task, err := e.getTask(pollerCtx, taskList, nil, request.TaskList.Kind)
task, err := e.getTask(pollerCtx, taskList, nil, taskListKind)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
Expand Down Expand Up @@ -454,7 +454,6 @@ func (e *matchingEngineImpl) PollForActivityTask(
pollerID := req.GetPollerID()
request := req.PollRequest
taskListName := request.GetTaskList().GetName()
taskListKind := request.GetTaskList().Kind
e.logger.Debug("Received PollForActivityTask",
tag.WorkflowTaskListName(taskListName),
tag.WorkflowDomainID(domainID),
Expand All @@ -467,7 +466,7 @@ pollLoop:
return nil, err
}

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -532,7 +531,7 @@ func (e *matchingEngineImpl) QueryWorkflow(
domainID := queryRequest.GetDomainUUID()
taskListName := queryRequest.GetTaskList().GetName()
taskListKind := queryRequest.GetTaskList().Kind
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -612,7 +611,7 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(
taskListKind := request.GetTaskList().Kind
pollerID := request.GetPollerID()

taskList, err := newTaskListID(domainID, taskListName, taskListType, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return err
}
Expand All @@ -638,7 +637,7 @@ func (e *matchingEngineImpl) DescribeTaskList(
taskListName := request.GetDescRequest().GetTaskList().GetName()
taskListKind := request.GetDescRequest().GetTaskList().Kind

taskList, err := newTaskListID(domainID, taskListName, taskListType, taskListKind)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -730,7 +729,7 @@ func (e *matchingEngineImpl) getAllPartitions(
return partitionKeys, err
}
taskList := request.GetTaskList()
taskListID, err := newTaskListID(domainID, taskList.GetName(), taskListType, taskList.Kind)
taskListID, err := newTaskListID(domainID, taskList.GetName(), taskListType)
rootPartition := taskListID.GetRoot()

partitionKeys = append(partitionKeys, rootPartition)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ func newTestTaskListManager() *testTaskListManager {
}

func newTestTaskListID(domainID string, name string, taskType int) *taskListID {
result, err := newTaskListID(domainID, name, taskType, nil)
result, err := newTaskListID(domainID, name, taskType)
if err != nil {
panic(fmt.Sprintf("newTaskListID failed with error %v", err))
}
Expand Down
5 changes: 5 additions & 0 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type (
// DescribeTaskList returns information about the target tasklist
DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse
String() string
GetTaskListKind() types.TaskListKind
}

// Single task list in memory state
Expand Down Expand Up @@ -407,6 +408,10 @@ func (c *taskListManagerImpl) String() string {
return buf.String()
}

func (c *taskListManagerImpl) GetTaskListKind() types.TaskListKind {
return c.taskListKind
}

// completeTask marks a task as processed. Only tasks created by taskReader (i.e. backlog from db) reach
// here. As part of completion:
// - task is deleted from the database when err is nil
Expand Down
13 changes: 3 additions & 10 deletions service/matching/tasklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

type (
// taskListID is the key that uniquely identifies a task list
taskListID struct {
qualifiedTaskListName
domainID string
taskType int
taskListKind types.TaskListKind
domainID string
taskType int
}
// qualifiedTaskListName refers to the fully qualified task list name
qualifiedTaskListName struct {
Expand Down Expand Up @@ -124,26 +122,21 @@ func (tn *qualifiedTaskListName) init() error {
return nil
}

// newTaskListID returns taskListID which uniquely identfies as task list
// newTaskListID returns taskListID which uniquely identifies as task list
func newTaskListID(
domainID string,
taskListName string,
taskType int,
taskListKind *types.TaskListKind,
) (*taskListID, error) {
name, err := newTaskListName(taskListName)
if err != nil {
return nil, err
}
if taskListKind == nil {
taskListKind = types.TaskListKindNormal.Ptr()
}

return &taskListID{
qualifiedTaskListName: name,
domainID: domainID,
taskType: taskType,
taskListKind: *taskListKind,
}, nil
}

Expand Down

0 comments on commit 28bb116

Please sign in to comment.