Skip to content

Commit

Permalink
Merge branch 'master' into db_setup
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Jan 6, 2021
2 parents 558478b + 6a8e9a3 commit b41c0d2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 40 deletions.
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func WorkflowTaskListType(taskListType int) Tag {
return newInt("wf-task-list-type", taskListType)
}

// WorkflowTaskListKind returns tag for WorkflowTaskListKind
func WorkflowTaskListKind(taskListKind int32) Tag {
return newInt32("wf-task-list-kind", taskListKind)
}

// WorkflowTaskListName returns tag for WorkflowTaskListName
func WorkflowTaskListName(taskListName string) Tag {
return newStringTag("wf-task-list-name", taskListName)
Expand Down
108 changes: 68 additions & 40 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type (
pollerIDCtxKey string
identityCtxKey string

queryResult struct {
workerResponse *types.MatchingRespondQueryTaskCompletedRequest
internalError error
}

// lockableQueryTaskMap maps query TaskID (which is a UUID generated in QueryWorkflow() call) to a channel
// that QueryWorkflow() will block on. The channel is unblocked either by worker sending response through
// RespondQueryTaskCompleted() or through an internal service error causing cadence to be unable to dispatch
Expand Down Expand Up @@ -176,21 +181,29 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID,
e.taskListsLock.Unlock()
return result, nil
}
e.logger.Info("Task list manager state changed", tag.LifeCycleStarting, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType))

// common tagged logger
logger := e.logger.WithTags(
tag.WorkflowTaskListName(taskList.name),
tag.WorkflowTaskListType(taskList.taskType),
tag.WorkflowDomainID(taskList.domainID),
)

logger.Info("Task list manager state changed", tag.LifeCycleStarting)
mgr, err := newTaskListManager(e, taskList, taskListKind, e.config)
if err != nil {
e.taskListsLock.Unlock()
e.logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err))
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
return nil, err
}
e.taskLists[*taskList] = mgr
e.taskListsLock.Unlock()
err = mgr.Start()
if err != nil {
e.logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err))
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
return nil, err
}
e.logger.Info("Task list manager state changed", tag.LifeCycleStarted, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType))
logger.Info("Task list manager state changed", tag.LifeCycleStarted)
return mgr, nil
}

Expand All @@ -215,15 +228,19 @@ func (e *matchingEngineImpl) AddDecisionTask(
domainID := request.GetDomainUUID()
taskListName := request.TaskList.GetName()
taskListKind := request.TaskList.Kind
taskListType := persistence.TaskListTypeDecision

e.logger.Debug(
fmt.Sprintf("Received AddDecisionTask for taskList=%v, WorkflowID=%v, RunID=%v, ScheduleToStartTimeout=%v",
request.TaskList.GetName(),
request.Execution.GetWorkflowID(),
request.Execution.GetRunID(),
request.GetScheduleToStartTimeoutSeconds()))
e.logger.Debug("Received AddDecisionTask",
tag.WorkflowTaskListName(request.TaskList.GetName()),
tag.WorkflowID(request.Execution.GetWorkflowID()),
tag.WorkflowRunID(request.Execution.GetRunID()),
tag.WorkflowDomainID(domainID),
tag.WorkflowTaskListType(taskListType),
tag.WorkflowScheduleID(request.GetScheduleID()),
tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())),
)

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -255,28 +272,31 @@ func (e *matchingEngineImpl) AddActivityTask(
request *types.AddActivityTaskRequest,
) (bool, error) {
domainID := request.GetDomainUUID()
sourceDomainID := request.GetSourceDomainUUID()
taskListName := request.TaskList.GetName()
taskListKind := request.TaskList.Kind

e.logger.Debug(
fmt.Sprintf("Received AddActivityTask for taskList=%v WorkflowID=%v, RunID=%v",
taskListName,
request.Execution.WorkflowID,
request.Execution.RunID))
taskListType := persistence.TaskListTypeActivity

e.logger.Debug("Received AddActivityTask",
tag.WorkflowTaskListName(taskListName),
tag.WorkflowID(request.Execution.GetWorkflowID()),
tag.WorkflowRunID(request.Execution.GetRunID()),
tag.WorkflowDomainID(domainID),
tag.WorkflowTaskListType(taskListType),
tag.WorkflowScheduleID(request.GetScheduleID()),
tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())),
)

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return false, err
}

tlMgr, err := e.getTaskListManager(taskList, taskListKind)
tlMgr, err := e.getTaskListManager(taskList, request.TaskList.Kind)
if err != nil {
return false, err
}

taskInfo := &persistence.TaskInfo{
DomainID: sourceDomainID,
DomainID: request.GetSourceDomainUUID(),
RunID: request.Execution.GetRunID(),
WorkflowID: request.Execution.GetWorkflowID(),
ScheduleID: request.GetScheduleID(),
Expand All @@ -300,23 +320,26 @@ func (e *matchingEngineImpl) PollForDecisionTask(
pollerID := req.GetPollerID()
request := req.PollRequest
taskListName := request.TaskList.GetName()
e.logger.Debug("Received PollForDecisionTask for taskList", tag.WorkflowTaskListName(taskListName))
e.logger.Debug("Received PollForDecisionTask for taskList",
tag.WorkflowTaskListName(taskListName),
tag.WorkflowDomainID(domainID),
)
pollLoop:
for {
err := common.IsValidContext(hCtx.Context)
if err := common.IsValidContext(hCtx.Context); err != nil {
return nil, err
}

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
}

// Add frontend generated pollerID to context so tasklistMgr can support cancellation of
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(hCtx.Context, pollerIDKey, pollerID)
pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity())
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
}
taskListKind := request.TaskList.Kind
task, err := e.getTask(pollerCtx, taskList, nil, taskListKind)
task, err := e.getTask(pollerCtx, taskList, nil, request.TaskList.Kind)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
Expand Down Expand Up @@ -367,8 +390,11 @@ pollLoop:
if err != nil {
switch err.(type) {
case *types.EntityNotExistsError, *types.EventAlreadyStartedError:
e.logger.Debug(fmt.Sprintf("Duplicated decision task taskList=%v, taskID=%v",
taskListName, task.event.TaskID))
e.logger.Debug(
"Duplicated decision task",
tag.TaskID(task.event.TaskID),
tag.WorkflowTaskListName(taskListName),
)
task.finish(nil)
default:
task.finish(err)
Expand All @@ -392,7 +418,11 @@ func (e *matchingEngineImpl) PollForActivityTask(
pollerID := req.GetPollerID()
request := req.PollRequest
taskListName := request.TaskList.GetName()
e.logger.Debug(fmt.Sprintf("Received PollForActivityTask for taskList=%v", taskListName))
e.logger.Debug("Received PollForActivityTask",
tag.WorkflowTaskListName(taskListName),
tag.WorkflowDomainID(domainID),
)

pollLoop:
for {
err := common.IsValidContext(hCtx.Context)
Expand Down Expand Up @@ -434,8 +464,11 @@ pollLoop:
if err != nil {
switch err.(type) {
case *types.EntityNotExistsError, *types.EventAlreadyStartedError:
e.logger.Debug(fmt.Sprintf("Duplicated activity task taskList=%v, taskID=%v",
taskListName, task.event.TaskID))
e.logger.Debug(
"Duplicated activity task",
tag.TaskID(task.event.TaskID),
tag.WorkflowTaskListName(taskListName),
)
task.finish(nil)
default:
task.finish(err)
Expand All @@ -448,11 +481,6 @@ pollLoop:
}
}

type queryResult struct {
workerResponse *types.MatchingRespondQueryTaskCompletedRequest
internalError error
}

// QueryWorkflow creates a DecisionTask with query data, send it through sync match channel, wait for that DecisionTask
// to be processed by worker, and then return the query result.
func (e *matchingEngineImpl) QueryWorkflow(
Expand Down

0 comments on commit b41c0d2

Please sign in to comment.