Skip to content

Conversation

@tubignat
Copy link
Contributor

@tubignat tubignat commented Oct 13, 2025

What changed?
Timer task queue now keeps incoming tasks in an in-memory queue until the next execution iteration

Why?
To reduce load on DB by avoiding agressively polling it for next timer tasks

How did you test it?
Unit tests and manual testing

@tubignat tubignat force-pushed the master branch 2 times, most recently from c98c02e to b89632f Compare October 21, 2025 07:58
@tubignat tubignat changed the title [WIP] Task Processing Optimization feature: [WIP] Task Processing Optimization Oct 21, 2025
Signed-off-by: Ignat Tubylov <tubignat@uber.com>
@tubignat tubignat changed the title feature: [WIP] Task Processing Optimization feat: [WIP] Task Processing Optimization Oct 21, 2025
@tubignat tubignat force-pushed the master branch 2 times, most recently from 0b87db0 to 4c4f7de Compare October 21, 2025 10:19
Signed-off-by: Ignat Tubylov <tubignat@uber.com>
Signed-off-by: Ignat Tubylov <tubignat@uber.com>
if progress.NextTaskKey.Compare(key) > 0 {
s.progress[i] = &GetTaskProgress{
Range: Range{
InclusiveMinTaskKey: key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new inclusive min should be the max of key and old inclusive min.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, the items in the array are sorted and have no overlaps, we can merge items coming after the first item whose nextTaskKey is greater than key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new inclusive min should be the max of key and old inclusive min.

Do you mean 'min' of the key and the old inclusive min? Because if we have already read past the key, the inclusive min will be bigger than the key, and that's exactly why we want to reset it to a lower value.

we can merge items coming after the first item whose nextTaskKey is greater than key.

What do you mean by merge items?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we find the first item whose next task key is greater than reset key, we know that the following item's next task key is also greater than reset key. We can set the item's range to be [reset key, exclusive max key of last item) and drop the following items.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find a better way. We can find the first item whose exclusive Max Key is greater than reset key. And set the next task key of that item to the min of next task key and reset task key.

We also need to make sure that the inclusive min of that progress item and the inclusive min of the virtual slice is decreased if the reset task key is less than those 2.

Signed-off-by: Ignat Tubylov <tubignat@uber.com>
if progress.NextTaskKey.Compare(key) > 0 {
s.progress[i] = &GetTaskProgress{
Range: Range{
InclusiveMinTaskKey: key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we find the first item whose next task key is greater than reset key, we know that the following item's next task key is also greater than reset key. We can set the item's range to be [reset key, exclusive max key of last item) and drop the following items.

Signed-off-by: Ignat Tubylov <tubignat@uber.com>
r := s.state.Range
key := task.GetTaskKey()

if key.Compare(r.InclusiveMinTaskKey) < 0 || key.Compare(r.ExclusiveMaxTaskKey) >= 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VirtualSliceState has Contains method to check if the task belongs to this slice. https://github.com/cadence-workflow/cadence/blob/master/service/history/queuev2/queue_state.go#L41C29-L41C37

inserted := false
for _, vq := range m.virtualQueues {
if vq.InsertSingleTask(t) {
inserted = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we break here? The task only needs to be processed once, and it should only belong to one virtual queue.

s.progress = []*GetTaskProgress{
{
Range: Range{
InclusiveMinTaskKey: key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if key is greater than or equal to ExclusiveMaxTaskKey, we don't need to reset

if progress.NextTaskKey.Compare(key) > 0 {
s.progress[i] = &GetTaskProgress{
Range: Range{
InclusiveMinTaskKey: key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find a better way. We can find the first item whose exclusive Max Key is greater than reset key. And set the next task key of that item to the min of next task key and reset task key.

We also need to make sure that the inclusive min of that progress item and the inclusive min of the virtual slice is decreased if the reset task key is less than those 2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants