Skip to content

Commit c98c02e

Browse files
committed
Introduce Task Processor optimization
1 parent 6f65aef commit c98c02e

File tree

18 files changed

+257
-42
lines changed

18 files changed

+257
-42
lines changed

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,7 @@ const (
21132113
EnableTimerQueueV2
21142114
EnableTransferQueueV2PendingTaskCountAlert
21152115
EnableTimerQueueV2PendingTaskCountAlert
2116+
EnableTimerProcessorInMemoryQueue
21162117

21172118
// LastBoolKey must be the last one in this const group
21182119
LastBoolKey
@@ -4680,6 +4681,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
46804681
Filters: []Filter{ShardID},
46814682
DefaultValue: false,
46824683
},
4684+
EnableTimerProcessorInMemoryQueue: {
4685+
KeyName: "history.enableTimerProcessorInMemoryQueue",
4686+
Description: "EnableTimerProcessorInMemoryQueue is the flag to enable in-memory queue for timer processor",
4687+
Filters: []Filter{ShardID},
4688+
DefaultValue: false,
4689+
},
46834690
}
46844691

46854692
var FloatKeys = map[FloatKey]DynamicFloat{

config/development.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,4 @@ shardDistribution:
172172
prefix: "store"
173173
process:
174174
period: 1s
175-
heartbeatTTL: 2s
175+
heartbeatTTL: 2s

service/history/common/type.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,9 @@ type (
3636
Activities map[int64]*persistence.ActivityInfo
3737
History events.PersistedBlobs
3838
PersistenceError bool
39+
40+
// if true, the task will be scheduled in memory for the current execution, otherwise
41+
// it will only be scheduled after the next DB scan
42+
ScheduleInMemory bool
3943
}
4044
)

service/history/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ type Config struct {
154154
TimerProcessorSplitQueueIntervalJitterCoefficient dynamicproperties.FloatPropertyFn
155155
TimerProcessorMaxRedispatchQueueSize dynamicproperties.IntPropertyFn
156156
TimerProcessorMaxTimeShift dynamicproperties.DurationPropertyFn
157+
EnableTimerProcessorInMemoryQueue dynamicproperties.BoolPropertyFnWithShardIDFilter
157158
TimerProcessorHistoryArchivalSizeLimit dynamicproperties.IntPropertyFn
158159
TimerProcessorArchivalTimeLimit dynamicproperties.DurationPropertyFn
159160
DisableTimerFailoverQueue dynamicproperties.BoolPropertyFn
@@ -453,6 +454,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
453454
TimerProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient),
454455
TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicproperties.TimerProcessorMaxRedispatchQueueSize),
455456
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicproperties.TimerProcessorMaxTimeShift),
457+
EnableTimerProcessorInMemoryQueue: dc.GetBoolPropertyFilteredByShardID(dynamicproperties.EnableTimerProcessorInMemoryQueue),
456458
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicproperties.TimerProcessorHistoryArchivalSizeLimit),
457459
TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicproperties.TimerProcessorArchivalTimeLimit),
458460
DisableTimerFailoverQueue: dc.GetBoolProperty(dynamicproperties.DisableTimerFailoverQueue),

service/history/config/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func TestNewConfig(t *testing.T) {
278278
"QueueMaxVirtualQueueCount": {dynamicproperties.QueueMaxVirtualQueueCount, 101},
279279
"VirtualSliceForceAppendInterval": {dynamicproperties.VirtualSliceForceAppendInterval, time.Second},
280280
"ReplicationTaskProcessorLatencyLogThreshold": {dynamicproperties.ReplicationTaskProcessorLatencyLogThreshold, time.Duration(0)},
281+
"EnableTimerProcessorInMemoryQueue": {dynamicproperties.EnableTimerProcessorInMemoryQueue, false},
281282
}
282283
client := dynamicconfig.NewInMemoryClient()
283284
for fieldName, expected := range fields {

service/history/execution/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,7 @@ func notifyTasks(
992992
ExecutionInfo: executionInfo,
993993
Tasks: tasksByCategory[persistence.HistoryTaskCategoryTimer],
994994
PersistenceError: persistenceError,
995+
ScheduleInMemory: true,
995996
}
996997
replicationTaskInfo := &hcommon.NotifyTaskInfo{
997998
ExecutionInfo: executionInfo,

service/history/execution/context_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ func TestNotifyTasksFromWorkflowSnapshot(t *testing.T) {
289289
},
290290
},
291291
PersistenceError: true,
292+
ScheduleInMemory: true,
292293
})
293294
mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{
294295
ExecutionInfo: &persistence.WorkflowExecutionInfo{
@@ -419,6 +420,7 @@ func TestNotifyTasksFromWorkflowMutation(t *testing.T) {
419420
},
420421
},
421422
PersistenceError: true,
423+
ScheduleInMemory: true,
422424
})
423425
mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{
424426
ExecutionInfo: &persistence.WorkflowExecutionInfo{

service/history/queuev2/queue_base.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,14 @@ func (q *queueBase) processNewTasks() bool {
274274
return true
275275
}
276276

277+
func (q *queueBase) insertSingleTask(task task.Task) bool {
278+
return q.virtualQueueManager.InsertSingleTaskToRootQueue(task)
279+
}
280+
281+
func (q *queueBase) removeScheduledTasksAfter(t time.Time) {
282+
q.virtualQueueManager.RemoveScheduledTasksAfter(t)
283+
}
284+
277285
func (q *queueBase) updateQueueState(ctx context.Context) {
278286
q.metricsScope.IncCounter(metrics.AckLevelUpdateCounter)
279287
queueState := &QueueState{

service/history/queuev2/queue_scheduled.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,42 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT
143143
return
144144
}
145145

146-
nextTime := info.Tasks[0].GetVisibilityTimestamp()
147-
for i := 1; i < numTasks; i++ {
148-
ts := info.Tasks[i].GetVisibilityTimestamp()
149-
if ts.Before(nextTime) {
150-
nextTime = ts
146+
q.base.logger.Debug(
147+
"New timer task notification received",
148+
tag.Dynamic("numTasks", numTasks),
149+
tag.Dynamic("scheduleInMemory", info.ScheduleInMemory),
150+
tag.Dynamic("persistenceError", info.PersistenceError),
151+
tag.Dynamic("shardId", q.base.shard.GetShardID()),
152+
)
153+
154+
tasksToBeReadFromDB := make([]persistence.Task, 0)
155+
156+
if info.ScheduleInMemory && !info.PersistenceError {
157+
for _, task := range info.Tasks {
158+
ts := task.GetVisibilityTimestamp()
159+
q.base.logger.Debug("Submitting task to an in-memory queue", tag.Dynamic("scheduledTime", ts), tag.Dynamic("shardId", q.base.shard.GetShardID()))
160+
161+
if !q.base.insertSingleTask(q.base.taskInitializer(task)) {
162+
tasksToBeReadFromDB = append(tasksToBeReadFromDB, task)
163+
}
151164
}
165+
} else {
166+
tasksToBeReadFromDB = info.Tasks
167+
}
168+
169+
var nextReadTime time.Time
170+
for _, task := range tasksToBeReadFromDB {
171+
ts := task.GetVisibilityTimestamp()
172+
if nextReadTime.IsZero() || ts.Before(nextReadTime) {
173+
nextReadTime = ts
174+
}
175+
}
176+
177+
if !nextReadTime.IsZero() {
178+
q.base.removeScheduledTasksAfter(nextReadTime)
179+
q.notify(nextReadTime)
152180
}
153181

154-
q.notify(nextTime)
155182
q.base.metricsScope.AddCounter(metrics.NewHistoryTaskCounter, int64(numTasks))
156183
}
157184

service/history/queuev2/queue_scheduled_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func TestScheduledQueue_LifeCycle(t *testing.T) {
3737
mockExecutionManager := persistence.NewMockExecutionManager(ctrl)
3838

3939
// Setup mock expectations
40+
mockShard.EXPECT().GetShardID().Return(1).AnyTimes()
4041
mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).AnyTimes()
4142
mockShard.EXPECT().GetTimeSource().Return(mockTimeSource).AnyTimes()
4243
mockShard.EXPECT().GetQueueState(persistence.HistoryTaskCategoryTimer).Return(&types.QueueState{

0 commit comments

Comments
 (0)