Skip to content

Commit

Permalink
Improve standby task processing (uber#4695)
Browse files Browse the repository at this point in the history
* Refactor standby task filter logic

* Improve standby task processing
  • Loading branch information
Shaddoll committed Jan 13, 2022
1 parent 028c444 commit a0be592
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
10 changes: 10 additions & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,16 @@ func (entry *DomainCacheEntry) GetDomainNotActiveErr() error {
)
}

// HasReplicationCluster returns true if the domain has replication in the cluster
func (entry *DomainCacheEntry) HasReplicationCluster(clusterName string) bool {
for _, cluster := range entry.GetReplicationConfig().Clusters {
if cluster.ClusterName == clusterName {
return true
}
}
return false
}

// Len return length
func (t DomainCacheEntries) Len() int {
return len(t)
Expand Down
19 changes: 19 additions & 0 deletions service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,25 @@ func newTimerQueueStandbyProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if timer.TaskType == persistence.TaskTypeWorkflowTimeout ||
timer.TaskType == persistence.TaskTypeDeleteHistoryEvent {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
if err == nil {
if domainEntry.HasReplicationCluster(clusterName) {
// guarantee the processing of workflow execution history deletion
return true, nil
}
} else {
if _, ok := err.(*types.EntityNotExistsError); !ok {
// retry the task if failed to find the domain
logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID))
return false, err
} else {
logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(timer.DomainID), tag.Value(timer))
return false, nil
}
}
}
return taskAllocator.VerifyStandbyTask(clusterName, timer.DomainID, timer)
}

Expand Down
19 changes: 19 additions & 0 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,25 @@ func newTransferQueueStandbyProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if task.TaskType == persistence.TransferTaskTypeCloseExecution ||
task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed {
domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
if err == nil {
if domainEntry.HasReplicationCluster(clusterName) {
// guarantee the processing of workflow execution close
return true, nil
}
} else {
if _, ok := err.(*types.EntityNotExistsError); !ok {
// retry the task if failed to find the domain
logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID))
return false, err
} else {
logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task))
return false, nil
}
}
}
return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task)
}

Expand Down
5 changes: 1 addition & 4 deletions service/history/task/timer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ func (t *timerStandbyTaskExecutor) Execute(
return errUnexpectedTask
}

if !shouldProcessTask &&
timerTask.TaskType != persistence.TaskTypeWorkflowTimeout &&
timerTask.TaskType != persistence.TaskTypeDeleteHistoryEvent {
// guarantee the processing of workflow execution history deletion
if !shouldProcessTask {
return nil
}

Expand Down
5 changes: 1 addition & 4 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ func (t *transferStandbyTaskExecutor) Execute(
return errUnexpectedTask
}

if !shouldProcessTask &&
transferTask.TaskType != persistence.TransferTaskTypeCloseExecution &&
transferTask.TaskType != persistence.TransferTaskTypeRecordWorkflowClosed {
// guarantee the processing of workflow execution close
if !shouldProcessTask {
return nil
}

Expand Down

0 comments on commit a0be592

Please sign in to comment.