Skip to content

Commit

Permalink
Drop stuck close execution transfer task (#3240)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 7, 2020
1 parent 6be19bd commit 609149f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 3 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,6 +1616,8 @@ const (
TaskProcessingLatency
TaskQueueLatency

TransferTaskMissingEventCounter

TransferTaskThrottledCounter
TimerTaskThrottledCounter

Expand Down Expand Up @@ -1965,6 +1967,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskLimitExceededCounter: {metricName: "task_errors_limit_exceeded_counter", metricType: Counter},
TaskProcessingLatency: {metricName: "task_latency_processing", metricType: Timer},
TaskQueueLatency: {metricName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounter: {metricName: "transfer_task_missing_event_counter", metricType: Counter},
TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
TimerTaskThrottledCounter: {metricName: "timer_task_throttled_counter", metricType: Counter},
Expand Down
17 changes: 16 additions & 1 deletion common/service/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ type MapPropertyFn func(opts ...FilterOption) map[string]interface{}
// StringPropertyFnWithDomainFilter is a wrapper to get string property from dynamic config
type StringPropertyFnWithDomainFilter func(domain string) string

// BoolPropertyFnWithDomainFilter is a wrapper to get string property from dynamic config
// BoolPropertyFnWithDomainFilter is a wrapper to get bool property from dynamic config
type BoolPropertyFnWithDomainFilter func(domain string) bool

// BoolPropertyFnWithDomainIDFilter is a wrapper to get bool property from dynamic config
type BoolPropertyFnWithDomainIDFilter func(id string) bool

// BoolPropertyFnWithTaskListInfoFilters is a wrapper to get bool property from dynamic config with three filters: domain, taskList, taskType
type BoolPropertyFnWithTaskListInfoFilters func(domain string, taskList string, taskType int) bool

Expand Down Expand Up @@ -289,6 +292,18 @@ func (c *Collection) GetBoolPropertyFnWithDomainFilter(key Key, defaultValue boo
}
}

// GetBoolPropertyFnWithDomainIDFilter gets property with domainID filter and asserts that it's a bool
func (c *Collection) GetBoolPropertyFnWithDomainIDFilter(key Key, defaultValue bool) BoolPropertyFnWithDomainIDFilter {
return func(id string) bool {
val, err := c.client.GetBoolValue(key, getFilterMap(DomainIDFilter(id)), defaultValue)
if err != nil {
c.logError(key, err)
}
c.logValue(key, val, defaultValue, boolCompareEquals)
return val
}
}

// GetBoolPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's an bool
func (c *Collection) GetBoolPropertyFilteredByTaskListInfo(key Key, defaultValue bool) BoolPropertyFnWithTaskListInfoFilters {
return func(domain string, taskList string, taskType int) bool {
Expand Down
9 changes: 9 additions & 0 deletions common/service/dynamicconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ func (s *configSuite) TestGetBoolProperty() {
s.Equal(false, value())
}

func (s *configSuite) TestGetBoolPropertyFilteredByDomainID() {
key := testGetBoolPropertyFilteredByDomainIDKey
domainID := "testDomainID"
value := s.cln.GetBoolPropertyFnWithDomainIDFilter(key, true)
s.Equal(true, value(domainID))
s.client.SetValue(key, false)
s.Equal(false, value(domainID))
}

func (s *configSuite) TestGetBoolPropertyFilteredByTaskListInfo() {
key := testGetBoolPropertyFilteredByTaskListInfoKey
domain := "testDomain"
Expand Down
20 changes: 18 additions & 2 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var keys = map[Key]string{
testGetDurationPropertyFilteredByDomainKey: "testGetDurationPropertyFilteredByDomainKey",
testGetIntPropertyFilteredByTaskListInfoKey: "testGetIntPropertyFilteredByTaskListInfoKey",
testGetDurationPropertyFilteredByTaskListInfoKey: "testGetDurationPropertyFilteredByTaskListInfoKey",
testGetBoolPropertyFilteredByDomainIDKey: "testGetBoolPropertyFilteredByDomainIDKey",
testGetBoolPropertyFilteredByTaskListInfoKey: "testGetBoolPropertyFilteredByTaskListInfoKey",

// system settings
Expand Down Expand Up @@ -223,6 +224,7 @@ var keys = map[Key]string{
MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability",
MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore",
ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster",
EnableDropStuckTaskByDomainID: "history.DropStuckTaskByDomain",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS",
Expand Down Expand Up @@ -269,6 +271,7 @@ const (
testGetDurationPropertyFilteredByDomainKey
testGetIntPropertyFilteredByTaskListInfoKey
testGetDurationPropertyFilteredByTaskListInfoKey
testGetBoolPropertyFilteredByDomainIDKey
testGetBoolPropertyFilteredByTaskListInfoKey

// EnableGlobalDomain is key for enable global domain
Expand Down Expand Up @@ -333,7 +336,7 @@ const (

// FrontendPersistenceMaxQPS is the max qps frontend host can query DB
FrontendPersistenceMaxQPS
// FrontendPersistenceMaxGlobalQPS is the max qps frontend cluster can query DB
// FrontendPersistenceGlobalMaxQPS is the max qps frontend cluster can query DB
FrontendPersistenceGlobalMaxQPS
// FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page
FrontendVisibilityMaxPageSize
Expand Down Expand Up @@ -382,7 +385,7 @@ const (
MatchingRPS
// MatchingPersistenceMaxQPS is the max qps matching host can query DB
MatchingPersistenceMaxQPS
// MatchingPersistenceMaxQPS is the max qps matching cluster can query DB
// MatchingPersistenceGlobalMaxQPS is the max qps matching cluster can query DB
MatchingPersistenceGlobalMaxQPS
// MatchingMinTaskThrottlingBurstSize is the minimum burst size for task list throttling
MatchingMinTaskThrottlingBurstSize
Expand Down Expand Up @@ -582,6 +585,9 @@ const (
// DecisionHeartbeatTimeout for decision heartbeat
DecisionHeartbeatTimeout

// EnableDropStuckTaskByDomainID is whether stuck timer/transfer task should be dropped for a domain
EnableDropStuckTaskByDomainID

// key for worker

// WorkerPersistenceMaxQPS is the max qps worker host can query DB
Expand Down Expand Up @@ -698,6 +704,7 @@ func (f Filter) String() string {
var filters = []string{
"unknownFilter",
"domainName",
"domainID",
"taskListName",
"taskType",
}
Expand All @@ -706,6 +713,8 @@ const (
unknownFilter Filter = iota
// DomainName is the domain name
DomainName
// DomainID is the domainID
DomainID
// TaskListName is the tasklist name
TaskListName
// TaskType is the task type (0:Decision, 1:Activity)
Expand All @@ -732,6 +741,13 @@ func DomainFilter(name string) FilterOption {
}
}

// DomainIDFilter filters by domainID
func DomainIDFilter(id string) FilterOption {
return func(filterMap map[Filter]interface{}) {
filterMap[DomainID] = id
}
}

// TaskTypeFilter filters by task type
func TaskTypeFilter(taskType int) FilterOption {
return func(filterMap map[Filter]interface{}) {
Expand Down
4 changes: 4 additions & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ type Config struct {

//Crocess DC Replication configuration
ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithDomainFilter

EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
}

const (
Expand Down Expand Up @@ -337,6 +339,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore, 0),

ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false),

EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFnWithDomainIDFilter(dynamicconfig.EnableDropStuckTaskByDomainID, false),
}

return cfg
Expand Down
9 changes: 9 additions & 0 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ func (t *taskProcessor) handleTaskError(
return nil
}

if transferTask, ok := task.task.(*persistence.TransferTaskInfo); ok &&
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == ErrMissingWorkflowStartEvent &&
t.config.EnableDropStuckTaskByDomainID(task.task.GetDomainID()) { // use domainID here to avoid accessing domainCache
scope.IncCounter(metrics.TransferTaskMissingEventCounter)
task.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == ErrTaskRetry {
scope.IncCounter(metrics.TaskStandbyRetryCounter)
Expand Down

0 comments on commit 609149f

Please sign in to comment.