Skip to content

Commit

Permalink
[history] add domain status check in taskfilter (uber#5140)
Browse files Browse the repository at this point in the history
What changed?

add domain status check in taskAllocator verification

Why?

tasks in queues should drop for deprecated or deleted domains
  • Loading branch information
shijiesheng authored Mar 9, 2023
1 parent 9643242 commit 4b5e888
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
19 changes: 19 additions & 0 deletions service/history/queue/task_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
package queue

import (
"errors"
"sync"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/shard"
htask "github.com/uber/cadence/service/history/task"
Expand Down Expand Up @@ -189,3 +191,20 @@ func (t *taskAllocatorImpl) Lock() {
func (t *taskAllocatorImpl) Unlock() {
t.locker.Unlock()
}

// isDomainNotRegistered checks either if domain does not exist or is in deprecated or deleted status
func isDomainNotRegistered(shard shard.Context, domainID string) (bool, error) {
domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
if _, ok := err.(*types.EntityNotExistsError); ok {
return true, nil
}
// unexpected error in finding a domain
return false, err
}
info := domainEntry.GetInfo()
if info == nil {
return false, errors.New("domain info is nil in cache")
}
return info.Status == persistence.DomainStatusDeprecated || info.Status == persistence.DomainStatusDeleted, nil
}
13 changes: 13 additions & 0 deletions service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ func newTimerQueueActiveProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil {
logger.Info("Domain is not in registered status, skip task in active timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
return false, nil
}

return taskAllocator.VerifyActiveTask(timer.DomainID, timer)
}

Expand Down Expand Up @@ -502,6 +507,10 @@ func newTimerQueueStandbyProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil {
logger.Info("Domain is not in registered status, skip task in standby timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
return false, nil
}
if timer.TaskType == persistence.TaskTypeWorkflowTimeout ||
timer.TaskType == persistence.TaskTypeDeleteHistoryEvent {
domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
Expand Down Expand Up @@ -589,6 +598,10 @@ func newTimerQueueFailoverProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if notRegistered, err := isDomainNotRegistered(shardContext, timer.DomainID); notRegistered && err == nil {
logger.Info("Domain is not in registered status, skip task in failover timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
return false, nil
}
return taskAllocator.VerifyFailoverActiveTask(domainIDs, timer.DomainID, timer)
}

Expand Down
12 changes: 12 additions & 0 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ func newTransferQueueActiveProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
logger.Info("Domain is not in registered status, skip task in active transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
return false, nil
}
return taskAllocator.VerifyActiveTask(task.DomainID, task)
}

Expand Down Expand Up @@ -514,6 +518,10 @@ func newTransferQueueStandbyProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
logger.Info("Domain is not in registered status, skip task in standby transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
return false, nil
}
if task.TaskType == persistence.TransferTaskTypeCloseExecution ||
task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed {
domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
Expand Down Expand Up @@ -596,6 +604,10 @@ func newTransferQueueFailoverProcessor(
if !ok {
return false, errUnexpectedQueueTask
}
if notRegistered, err := isDomainNotRegistered(shardContext, task.DomainID); notRegistered && err == nil {
logger.Info("Domain is not in registered status, skip task in failover transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
return false, nil
}
return taskAllocator.VerifyFailoverActiveTask(domainIDs, task.DomainID, task)
}

Expand Down

0 comments on commit 4b5e888

Please sign in to comment.