diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 3c4e2689adc..c9205e4f0d6 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -862,6 +862,16 @@ func (entry *DomainCacheEntry) GetDomainNotActiveErr() error { ) } +// HasReplicationCluster returns true if the domain has replication in the cluster +func (entry *DomainCacheEntry) HasReplicationCluster(clusterName string) bool { + for _, cluster := range entry.GetReplicationConfig().Clusters { + if cluster.ClusterName == clusterName { + return true + } + } + return false +} + // Len return length func (t DomainCacheEntries) Len() int { return len(t) diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index b45bffaf259..e06f889db43 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -519,6 +519,25 @@ func newTimerQueueStandbyProcessor( if !ok { return false, errUnexpectedQueueTask } + if timer.TaskType == persistence.TaskTypeWorkflowTimeout || + timer.TaskType == persistence.TaskTypeDeleteHistoryEvent { + domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) + if err == nil { + if domainEntry.HasReplicationCluster(clusterName) { + // guarantee the processing of workflow execution history deletion + return true, nil + } + } else { + if _, ok := err.(*types.EntityNotExistsError); !ok { + // retry the task if failed to find the domain + logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID)) + return false, err + } else { + logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(timer.DomainID), tag.Value(timer)) + return false, nil + } + } + } return taskAllocator.VerifyStandbyTask(clusterName, timer.DomainID, timer) } diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 02885c3fc04..dfb473ea191 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -529,6 +529,25 @@ func newTransferQueueStandbyProcessor( if !ok { return false, errUnexpectedQueueTask } + if task.TaskType == persistence.TransferTaskTypeCloseExecution || + task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed { + domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID) + if err == nil { + if domainEntry.HasReplicationCluster(clusterName) { + // guarantee the processing of workflow execution close + return true, nil + } + } else { + if _, ok := err.(*types.EntityNotExistsError); !ok { + // retry the task if failed to find the domain + logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID)) + return false, err + } else { + logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task)) + return false, nil + } + } + } return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task) } diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index cca04d1ed2f..ffc99b44a7a 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -87,10 +87,7 @@ func (t *timerStandbyTaskExecutor) Execute( return errUnexpectedTask } - if !shouldProcessTask && - timerTask.TaskType != persistence.TaskTypeWorkflowTimeout && - timerTask.TaskType != persistence.TaskTypeDeleteHistoryEvent { - // guarantee the processing of workflow execution history deletion + if !shouldProcessTask { return nil } diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index a338d147ac8..a23762f3ecd 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -79,10 +79,7 @@ func (t *transferStandbyTaskExecutor) Execute( return errUnexpectedTask } - if !shouldProcessTask && - transferTask.TaskType != persistence.TransferTaskTypeCloseExecution && - transferTask.TaskType != persistence.TransferTaskTypeRecordWorkflowClosed { - // guarantee the processing of workflow execution close + if !shouldProcessTask { return nil }