Skip to content

Commit

Permalink
Improve logs for transfer task validator (uber#5044)
Browse files Browse the repository at this point in the history
Co-authored-by: David Porter <david.porter@uber.com>
  • Loading branch information
Shaddoll and davidporter-id-au authored Dec 12, 2022
1 parent 2060667 commit d5f3f4f
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions service/history/queue/transfer_queue_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ const (

type (
pendingTaskInfo struct {
executionInfo *persistence.WorkflowExecutionInfo
task persistence.Task
persistenceError bool
executionInfo *persistence.WorkflowExecutionInfo
task persistence.Task
potentialFalsePositive bool
}

transferQueueValidator struct {
Expand All @@ -57,6 +57,7 @@ type (

pendingTaskInfos map[int64]pendingTaskInfo
maxReadLevels map[int]task.Key
minReadTaskID int64
lastValidateTime time.Time
validationInterval dynamicconfig.DurationPropertyFn
}
Expand All @@ -77,6 +78,7 @@ func newTransferQueueValidator(

pendingTaskInfos: make(map[int64]pendingTaskInfo),
maxReadLevels: make(map[int]task.Key),
minReadTaskID: 0,
lastValidateTime: timeSource.Now(),
validationInterval: validationInterval,
}
Expand Down Expand Up @@ -108,10 +110,13 @@ func (v *transferQueueValidator) addTasks(
}

for _, task := range info.Tasks[:numTaskToAdd] {
// It is possible that a task is acked before it is added to the validator
// In that case, the lost task could be a potential false positive case
potentialFalsePositive := info.PersistenceError || task.GetTaskID() <= v.minReadTaskID
v.pendingTaskInfos[task.GetTaskID()] = pendingTaskInfo{
executionInfo: info.ExecutionInfo,
task: task,
persistenceError: info.PersistenceError,
executionInfo: info.ExecutionInfo,
task: task,
potentialFalsePositive: potentialFalsePositive,
}
}
}
Expand Down Expand Up @@ -178,10 +183,11 @@ func (v *transferQueueValidator) validatePendingTasks() {
tag.WorkflowDomainID(taskInfo.executionInfo.DomainID),
tag.WorkflowID(taskInfo.executionInfo.WorkflowID),
tag.WorkflowRunID(taskInfo.executionInfo.RunID),
tag.Bool(taskInfo.persistenceError),
tag.Bool(taskInfo.potentialFalsePositive),
)
v.metricsScope.IncCounter(metrics.QueueValidatorLostTaskCounter)
delete(v.pendingTaskInfos, taskID)
}
}
v.minReadTaskID = minReadTaskID
}

0 comments on commit d5f3f4f

Please sign in to comment.