Skip to content

Commit

Permalink
Add jittered workflow deletion configuration (uber#4789)
Browse files Browse the repository at this point in the history
Added a toggled parameter that spreads out workflow deletions over a day. We still maintain the same retention period as defined in domain configurations. This will randomly add up to the configured int amount in minutes.
  • Loading branch information
allenchen2244 authored Apr 15, 2022
1 parent 91579a1 commit b49002d
Show file tree
Hide file tree
Showing 11 changed files with 1,210 additions and 1,127 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2199,6 +2199,12 @@ const (
// Default value: false
Lockdown

// WorkflowDeletionJitterRange defines the duration in minutes for workflow close tasks jittering
// KeyName: system.workflowDeletionJitterRange
// Value type: Duration
// Default value: 1 (no jittering)
WorkflowDeletionJitterRange

// LastKeyForTest must be the last one in this const group for testing purpose
LastKeyForTest
)
Expand Down Expand Up @@ -2252,6 +2258,7 @@ var Keys = map[Key]string{
GRPCMaxSizeInByte: "system.grpcMaxSizeInByte",
EnableWatchDog: "system.EnableWatchDog",
Lockdown: "system.Lockdown",
WorkflowDeletionJitterRange: "system.workflowDeletionJitterRange",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Config struct {
ThrottledLogRPS dynamicconfig.IntPropertyFn
EnableStickyQuery dynamicconfig.BoolPropertyFnWithDomainFilter
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
WorkflowDeletionJitterRange dynamicconfig.IntPropertyFnWithDomainFilter

// HistoryCache settings
// Change of these configs require shard restart
Expand Down Expand Up @@ -388,6 +389,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 15*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 25*time.Minute),
WorkflowDeletionJitterRange: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowDeletionJitterRange, 1),

TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS, 1000),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType, int(task.SchedulerTypeWRR)),
Expand Down
12 changes: 6 additions & 6 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2551,7 +2551,7 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent(
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
event,
event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -2591,7 +2591,7 @@ func (e *mutableStateBuilder) AddFailWorkflowEvent(
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
event,
event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -2630,7 +2630,7 @@ func (e *mutableStateBuilder) AddTimeoutWorkflowEvent(
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
event,
event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -2709,7 +2709,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCanceledEvent(
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
event,
event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -3215,7 +3215,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
event,
event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -3332,7 +3332,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
continueAsNewEvent,
continueAsNewEvent, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name),
); err != nil {
return nil, nil, err
}
Expand Down
1,505 changes: 751 additions & 754 deletions service/history/execution/mutable_state_mock.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
) error
GenerateWorkflowCloseTasks(
closeEvent *types.HistoryEvent,
workflowDeletionTaskJitterRange int,
) error
GenerateRecordWorkflowStartedTasks(
startEvent *types.HistoryEvent,
Expand Down Expand Up @@ -167,6 +168,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowStartTasks(

func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
closeEvent *types.HistoryEvent,
workflowDeletionTaskJitterRange int,
) error {

executionInfo := r.mutableState.GetExecutionInfo()
Expand Down Expand Up @@ -247,7 +249,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
}

closeTimestamp := time.Unix(0, closeEvent.GetTimestamp())
retentionDuration := time.Duration(retentionInDays) * time.Hour * 24
retentionDuration := (time.Duration(retentionInDays) * time.Hour * 24) + (time.Duration(rand.Intn(workflowDeletionTaskJitterRange)) * time.Minute)
r.mutableState.AddTimerTasks(&persistence.DeleteHistoryEventTask{
// TaskID is set by shard
VisibilityTimestamp: closeTimestamp.Add(retentionDuration),
Expand Down
Loading

0 comments on commit b49002d

Please sign in to comment.