diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index c293a672463..e58ec1367a2 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -251,6 +251,11 @@ func WorkflowTaskListType(taskListType int) Tag { return newInt("wf-task-list-type", taskListType) } +// WorkflowTaskListKind returns tag for WorkflowTaskListKind +func WorkflowTaskListKind(taskListKind int32) Tag { + return newInt32("wf-task-list-kind", taskListKind) +} + // WorkflowTaskListName returns tag for WorkflowTaskListName func WorkflowTaskListName(taskListName string) Tag { return newStringTag("wf-task-list-name", taskListName) diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index ccd4749ca16..0b114aaccf3 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -52,6 +52,11 @@ type ( pollerIDCtxKey string identityCtxKey string + queryResult struct { + workerResponse *types.MatchingRespondQueryTaskCompletedRequest + internalError error + } + // lockableQueryTaskMap maps query TaskID (which is a UUID generated in QueryWorkflow() call) to a channel // that QueryWorkflow() will block on. The channel is unblocked either by worker sending response through // RespondQueryTaskCompleted() or through an internal service error causing cadence to be unable to dispatch @@ -176,21 +181,29 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID, e.taskListsLock.Unlock() return result, nil } - e.logger.Info("Task list manager state changed", tag.LifeCycleStarting, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType)) + + // common tagged logger + logger := e.logger.WithTags( + tag.WorkflowTaskListName(taskList.name), + tag.WorkflowTaskListType(taskList.taskType), + tag.WorkflowDomainID(taskList.domainID), + ) + + logger.Info("Task list manager state changed", tag.LifeCycleStarting) mgr, err := newTaskListManager(e, taskList, taskListKind, e.config) if err != nil { e.taskListsLock.Unlock() - e.logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err)) + logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err)) return nil, err } e.taskLists[*taskList] = mgr e.taskListsLock.Unlock() err = mgr.Start() if err != nil { - e.logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err)) + logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err)) return nil, err } - e.logger.Info("Task list manager state changed", tag.LifeCycleStarted, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType)) + logger.Info("Task list manager state changed", tag.LifeCycleStarted) return mgr, nil } @@ -215,15 +228,19 @@ func (e *matchingEngineImpl) AddDecisionTask( domainID := request.GetDomainUUID() taskListName := request.TaskList.GetName() taskListKind := request.TaskList.Kind + taskListType := persistence.TaskListTypeDecision - e.logger.Debug( - fmt.Sprintf("Received AddDecisionTask for taskList=%v, WorkflowID=%v, RunID=%v, ScheduleToStartTimeout=%v", - request.TaskList.GetName(), - request.Execution.GetWorkflowID(), - request.Execution.GetRunID(), - request.GetScheduleToStartTimeoutSeconds())) + e.logger.Debug("Received AddDecisionTask", + tag.WorkflowTaskListName(request.TaskList.GetName()), + tag.WorkflowID(request.Execution.GetWorkflowID()), + tag.WorkflowRunID(request.Execution.GetRunID()), + tag.WorkflowDomainID(domainID), + tag.WorkflowTaskListType(taskListType), + tag.WorkflowScheduleID(request.GetScheduleID()), + tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())), + ) - taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) + taskList, err := newTaskListID(domainID, taskListName, taskListType) if err != nil { return false, err } @@ -255,28 +272,31 @@ func (e *matchingEngineImpl) AddActivityTask( request *types.AddActivityTaskRequest, ) (bool, error) { domainID := request.GetDomainUUID() - sourceDomainID := request.GetSourceDomainUUID() taskListName := request.TaskList.GetName() - taskListKind := request.TaskList.Kind - - e.logger.Debug( - fmt.Sprintf("Received AddActivityTask for taskList=%v WorkflowID=%v, RunID=%v", - taskListName, - request.Execution.WorkflowID, - request.Execution.RunID)) + taskListType := persistence.TaskListTypeActivity + + e.logger.Debug("Received AddActivityTask", + tag.WorkflowTaskListName(taskListName), + tag.WorkflowID(request.Execution.GetWorkflowID()), + tag.WorkflowRunID(request.Execution.GetRunID()), + tag.WorkflowDomainID(domainID), + tag.WorkflowTaskListType(taskListType), + tag.WorkflowScheduleID(request.GetScheduleID()), + tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())), + ) - taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity) + taskList, err := newTaskListID(domainID, taskListName, taskListType) if err != nil { return false, err } - tlMgr, err := e.getTaskListManager(taskList, taskListKind) + tlMgr, err := e.getTaskListManager(taskList, request.TaskList.Kind) if err != nil { return false, err } taskInfo := &persistence.TaskInfo{ - DomainID: sourceDomainID, + DomainID: request.GetSourceDomainUUID(), RunID: request.Execution.GetRunID(), WorkflowID: request.Execution.GetWorkflowID(), ScheduleID: request.GetScheduleID(), @@ -300,23 +320,26 @@ func (e *matchingEngineImpl) PollForDecisionTask( pollerID := req.GetPollerID() request := req.PollRequest taskListName := request.TaskList.GetName() - e.logger.Debug("Received PollForDecisionTask for taskList", tag.WorkflowTaskListName(taskListName)) + e.logger.Debug("Received PollForDecisionTask for taskList", + tag.WorkflowTaskListName(taskListName), + tag.WorkflowDomainID(domainID), + ) pollLoop: for { - err := common.IsValidContext(hCtx.Context) + if err := common.IsValidContext(hCtx.Context); err != nil { + return nil, err + } + + taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) if err != nil { return nil, err } + // Add frontend generated pollerID to context so tasklistMgr can support cancellation of // long-poll when frontend calls CancelOutstandingPoll API pollerCtx := context.WithValue(hCtx.Context, pollerIDKey, pollerID) pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity()) - taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) - if err != nil { - return nil, err - } - taskListKind := request.TaskList.Kind - task, err := e.getTask(pollerCtx, taskList, nil, taskListKind) + task, err := e.getTask(pollerCtx, taskList, nil, request.TaskList.Kind) if err != nil { // TODO: Is empty poll the best reply for errPumpClosed? if err == ErrNoTasks || err == errPumpClosed { @@ -367,8 +390,11 @@ pollLoop: if err != nil { switch err.(type) { case *types.EntityNotExistsError, *types.EventAlreadyStartedError: - e.logger.Debug(fmt.Sprintf("Duplicated decision task taskList=%v, taskID=%v", - taskListName, task.event.TaskID)) + e.logger.Debug( + "Duplicated decision task", + tag.TaskID(task.event.TaskID), + tag.WorkflowTaskListName(taskListName), + ) task.finish(nil) default: task.finish(err) @@ -392,7 +418,11 @@ func (e *matchingEngineImpl) PollForActivityTask( pollerID := req.GetPollerID() request := req.PollRequest taskListName := request.TaskList.GetName() - e.logger.Debug(fmt.Sprintf("Received PollForActivityTask for taskList=%v", taskListName)) + e.logger.Debug("Received PollForActivityTask", + tag.WorkflowTaskListName(taskListName), + tag.WorkflowDomainID(domainID), + ) + pollLoop: for { err := common.IsValidContext(hCtx.Context) @@ -434,8 +464,11 @@ pollLoop: if err != nil { switch err.(type) { case *types.EntityNotExistsError, *types.EventAlreadyStartedError: - e.logger.Debug(fmt.Sprintf("Duplicated activity task taskList=%v, taskID=%v", - taskListName, task.event.TaskID)) + e.logger.Debug( + "Duplicated activity task", + tag.TaskID(task.event.TaskID), + tag.WorkflowTaskListName(taskListName), + ) task.finish(nil) default: task.finish(err) @@ -448,11 +481,6 @@ pollLoop: } } -type queryResult struct { - workerResponse *types.MatchingRespondQueryTaskCompletedRequest - internalError error -} - // QueryWorkflow creates a DecisionTask with query data, send it through sync match channel, wait for that DecisionTask // to be processed by worker, and then return the query result. func (e *matchingEngineImpl) QueryWorkflow(