diff --git a/service/history/historyCache.go b/service/history/historyCache.go index 6cebf86804a..8b6cd8d0f4f 100644 --- a/service/history/historyCache.go +++ b/service/history/historyCache.go @@ -21,6 +21,8 @@ package history import ( + "sync/atomic" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" @@ -44,6 +46,11 @@ type ( } ) +const ( + cacheNotReleased int32 = 0 + cacheReleased int32 = 1 +) + var ( // ErrTryLock is a temporary error that is thrown by the API // when it loses the race to create workflow execution context @@ -107,9 +114,12 @@ func (c *historyCache) getOrCreateWorkflowExecution(domainID string, // This will create a closure on every request. // Consider revisiting this if it causes too much GC activity + status := cacheNotReleased releaseFunc := func() { - context.Unlock() - c.Release(key) + if atomic.CompareAndSwapInt32(&status, cacheNotReleased, cacheReleased) { + context.Unlock() + c.Release(key) + } } context.Lock() diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index c86ca37ad94..fb1988e18d5 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -328,12 +328,12 @@ func (t *transferQueueProcessorImpl) processActivityTask(task *persistence.Trans if err != nil { return err } + defer release() - var mb *mutableStateBuilder - mb, err = context.loadWorkflowExecution() + var msBuilder *mutableStateBuilder + msBuilder, err = context.loadWorkflowExecution() timeout := int32(0) if err != nil { - release() if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil @@ -341,13 +341,15 @@ func (t *transferQueueProcessorImpl) processActivityTask(task *persistence.Trans return err } - if ai, found := mb.GetActivityInfo(task.ScheduleID); found { + if ai, found := msBuilder.GetActivityInfo(task.ScheduleID); found { timeout = ai.ScheduleToStartTimeout } else { logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeActivityTask, task.TaskID, task.ScheduleID) } - release() + // release the context lock since we no longer need mutable state builder and + // the rest of logic is making RPC call, which takes time. + release() if timeout != 0 { err = t.matchingClient.AddActivityTask(nil, &m.AddActivityTaskRequest{ DomainUUID: common.StringPtr(targetDomainID), @@ -382,27 +384,30 @@ func (t *transferQueueProcessorImpl) processDecisionTask(task *persistence.Trans if err != nil { return err } + defer release() - var mb *mutableStateBuilder - mb, err = context.loadWorkflowExecution() + var msBuilder *mutableStateBuilder + msBuilder, err = context.loadWorkflowExecution() if err != nil { - release() if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil } return err } - timeout := mb.executionInfo.WorkflowTimeout - wfTypeName := mb.executionInfo.WorkflowTypeName - startTimestamp := mb.executionInfo.StartTimestamp - if mb.isStickyTaskListEnabled() { - taskList.Name = common.StringPtr(mb.executionInfo.StickyTaskList) + + timeout := msBuilder.executionInfo.WorkflowTimeout + wfTypeName := msBuilder.executionInfo.WorkflowTypeName + startTimestamp := msBuilder.executionInfo.StartTimestamp + if msBuilder.isStickyTaskListEnabled() { + taskList.Name = common.StringPtr(msBuilder.executionInfo.StickyTaskList) taskList.Kind = common.TaskListKindPtr(workflow.TaskListKindSticky) - timeout = mb.executionInfo.StickyScheduleToStartTimeout + timeout = msBuilder.executionInfo.StickyScheduleToStartTimeout } - release() + // release the context lock since we no longer need mutable state builder and + // the rest of logic is making RPC call, which takes time. + release() err = t.matchingClient.AddDecisionTask(nil, &m.AddDecisionTaskRequest{ DomainUUID: common.StringPtr(domainID), Execution: &execution, @@ -444,8 +449,8 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra } defer release() - var mb *mutableStateBuilder - mb, err = context.loadWorkflowExecution() + var msBuilder *mutableStateBuilder + msBuilder, err = context.loadWorkflowExecution() if err != nil { if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, but the mutable state was @@ -455,16 +460,34 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra return err } + replyToParentWorkflow := msBuilder.hasParentExecution() && msBuilder.executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew + var completionEvent *workflow.HistoryEvent + if replyToParentWorkflow { + completionEvent, _ = msBuilder.GetCompletionEvent() + } + parentDomainID := msBuilder.executionInfo.ParentDomainID + parentWorkflowID := msBuilder.executionInfo.ParentWorkflowID + parentRunID := msBuilder.executionInfo.ParentRunID + initiatedID := msBuilder.executionInfo.InitiatedID + + workflowTypeName := msBuilder.executionInfo.WorkflowTypeName + workflowStartTimestamp := msBuilder.executionInfo.StartTimestamp.UnixNano() + workflowCloseTimestamp := msBuilder.getLastUpdatedTimestamp() + workflowCloseStatus := getWorkflowExecutionCloseStatus(msBuilder.executionInfo.CloseStatus) + workflowHistoryLength := msBuilder.GetNextEventID() + + // release the context lock since we no longer need mutable state builder and + // the rest of logic is making RPC call, which takes time. + release() // Communicate the result to parent execution if this is Child Workflow execution - if mb.hasParentExecution() && mb.executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew { - completionEvent, _ := mb.GetCompletionEvent() + if replyToParentWorkflow { err = t.historyClient.RecordChildExecutionCompleted(nil, &history.RecordChildExecutionCompletedRequest{ - DomainUUID: common.StringPtr(mb.executionInfo.ParentDomainID), + DomainUUID: common.StringPtr(parentDomainID), WorkflowExecution: &workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(mb.executionInfo.ParentWorkflowID), - RunId: common.StringPtr(mb.executionInfo.ParentRunID), + WorkflowId: common.StringPtr(parentWorkflowID), + RunId: common.StringPtr(parentRunID), }, - InitiatedId: common.Int64Ptr(mb.executionInfo.InitiatedID), + InitiatedId: common.Int64Ptr(initiatedID), CompletedExecution: &workflow.WorkflowExecution{ WorkflowId: common.StringPtr(task.WorkflowID), RunId: common.StringPtr(task.RunID), @@ -476,11 +499,10 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra switch err.(type) { case *workflow.EntityNotExistsError: err = nil + default: + return err } } - if err != nil { - return err - } // Record closing in visibility store retentionSeconds := int64(0) @@ -498,11 +520,11 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra return t.visibilityManager.RecordWorkflowExecutionClosed(&persistence.RecordWorkflowExecutionClosedRequest{ DomainUUID: task.DomainID, Execution: execution, - WorkflowTypeName: mb.executionInfo.WorkflowTypeName, - StartTimestamp: mb.executionInfo.StartTimestamp.UnixNano(), - CloseTimestamp: mb.getLastUpdatedTimestamp(), - Status: getWorkflowExecutionCloseStatus(mb.executionInfo.CloseStatus), - HistoryLength: mb.GetNextEventID(), + WorkflowTypeName: workflowTypeName, + StartTimestamp: workflowStartTimestamp, + CloseTimestamp: workflowCloseTimestamp, + Status: workflowCloseStatus, + HistoryLength: workflowHistoryLength, RetentionSeconds: retentionSeconds, }) } @@ -545,6 +567,29 @@ func (t *transferQueueProcessorImpl) processCancelExecution(task *persistence.Tr // No pending request cancellation for this initiatedID, complete this transfer task return nil } + + // handle workflow cancel itself + if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID { + // it does not matter if the run ID is a mismatch + cancelRequest := &history.RequestCancelWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + CancelRequest: &workflow.RequestCancelWorkflowExecutionRequest{ + Domain: common.StringPtr(targetDomainID), + WorkflowExecution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(task.TargetWorkflowID), + RunId: common.StringPtr(task.TargetRunID), + }, + Identity: common.StringPtr(identityHistoryService), + }, + } + err = t.requestCancelFailed(task, context, cancelRequest) + if _, ok := err.(*workflow.EntityNotExistsError); ok { + // this could happen if this is a duplicate processing of the task, and the execution has already completed. + return nil + } + return err + } + cancelRequest := &history.RequestCancelWorkflowExecutionRequest{ DomainUUID: common.StringPtr(targetDomainID), CancelRequest: &workflow.RequestCancelWorkflowExecutionRequest{ @@ -643,13 +688,14 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr // handle workflow signal itself if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID { + // it does not matter if the run ID is a mismatch signalRequest := &history.SignalWorkflowExecutionRequest{ - DomainUUID: common.StringPtr(domainID), + DomainUUID: common.StringPtr(targetDomainID), SignalRequest: &workflow.SignalWorkflowExecutionRequest{ - Domain: common.StringPtr(domainID), + Domain: common.StringPtr(targetDomainID), WorkflowExecution: &workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(task.WorkflowID), - RunId: common.StringPtr(task.RunID), + WorkflowId: common.StringPtr(task.TargetWorkflowID), + RunId: common.StringPtr(task.TargetRunID), }, Identity: common.StringPtr(identityHistoryService), Control: ri.Control, @@ -711,6 +757,9 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr return nil } + // release the context lock since we no longer need mutable state builder and + // the rest of logic is making RPC call, which takes time. + release() // remove signalRequestedID from target workflow, after Signal detail is removed from source workflow removeRequest := &history.RemoveSignalMutableStateRequest{ DomainUUID: common.StringPtr(targetDomainID), @@ -929,9 +978,9 @@ func (t *transferQueueProcessorImpl) requestCancelCompleted(task *persistence.Tr msBuilder.AddExternalWorkflowExecutionCancelRequested( initiatedEventID, - *request.DomainUUID, - *request.CancelRequest.WorkflowExecution.WorkflowId, - common.StringDefault(request.CancelRequest.WorkflowExecution.RunId), + request.GetDomainUUID(), + request.CancelRequest.WorkflowExecution.GetWorkflowId(), + request.CancelRequest.WorkflowExecution.GetRunId(), ) return nil @@ -983,9 +1032,9 @@ func (t *transferQueueProcessorImpl) requestCancelFailed(task *persistence.Trans msBuilder.AddRequestCancelExternalWorkflowExecutionFailedEvent( emptyEventID, initiatedEventID, - *request.DomainUUID, - *request.CancelRequest.WorkflowExecution.WorkflowId, - common.StringDefault(request.CancelRequest.WorkflowExecution.RunId), + request.GetDomainUUID(), + request.CancelRequest.WorkflowExecution.GetWorkflowId(), + request.CancelRequest.WorkflowExecution.GetRunId(), workflow.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution) return nil