- 
                Notifications
    
You must be signed in to change notification settings  - Fork 867
 
feat: [WIP] Task Processing Optimization #7332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
c98c02e    to
    b89632f      
    Compare
  
    Signed-off-by: Ignat Tubylov <tubignat@uber.com>
0b87db0    to
    4c4f7de      
    Compare
  
    Signed-off-by: Ignat Tubylov <tubignat@uber.com>
| if progress.NextTaskKey.Compare(key) > 0 { | ||
| s.progress[i] = &GetTaskProgress{ | ||
| Range: Range{ | ||
| InclusiveMinTaskKey: key, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the new inclusive min should be the max of key and old inclusive min.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides, the items in the array are sorted and have no overlaps, we can merge items coming after the first item whose nextTaskKey is greater than key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the new inclusive min should be the max of key and old inclusive min.
Do you mean 'min' of the key and the old inclusive min? Because if we have already read past the key, the inclusive min will be bigger than the key, and that's exactly why we want to reset it to a lower value.
we can merge items coming after the first item whose nextTaskKey is greater than key.
What do you mean by merge items?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we find the first item whose next task key is greater than reset key, we know that the following item's next task key is also greater than reset key. We can set the item's range to be [reset key, exclusive max key of last item) and drop the following items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find a better way. We can find the first item whose exclusive Max Key is greater than reset key. And set the next task key of that item to the min of next task key and reset task key.
We also need to make sure that the inclusive min of that progress item and the inclusive min of the virtual slice is decreased if the reset task key is less than those 2.
Signed-off-by: Ignat Tubylov <tubignat@uber.com>
| if progress.NextTaskKey.Compare(key) > 0 { | ||
| s.progress[i] = &GetTaskProgress{ | ||
| Range: Range{ | ||
| InclusiveMinTaskKey: key, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we find the first item whose next task key is greater than reset key, we know that the following item's next task key is also greater than reset key. We can set the item's range to be [reset key, exclusive max key of last item) and drop the following items.
Signed-off-by: Ignat Tubylov <tubignat@uber.com>
| r := s.state.Range | ||
| key := task.GetTaskKey() | ||
| 
               | 
          ||
| if key.Compare(r.InclusiveMinTaskKey) < 0 || key.Compare(r.ExclusiveMaxTaskKey) >= 0 { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VirtualSliceState has Contains method to check if the task belongs to this slice. https://github.com/cadence-workflow/cadence/blob/master/service/history/queuev2/queue_state.go#L41C29-L41C37
| inserted := false | ||
| for _, vq := range m.virtualQueues { | ||
| if vq.InsertSingleTask(t) { | ||
| inserted = true | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we break here? The task only needs to be processed once, and it should only belong to one virtual queue.
| s.progress = []*GetTaskProgress{ | ||
| { | ||
| Range: Range{ | ||
| InclusiveMinTaskKey: key, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if key is greater than or equal to ExclusiveMaxTaskKey, we don't need to reset
| if progress.NextTaskKey.Compare(key) > 0 { | ||
| s.progress[i] = &GetTaskProgress{ | ||
| Range: Range{ | ||
| InclusiveMinTaskKey: key, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find a better way. We can find the first item whose exclusive Max Key is greater than reset key. And set the next task key of that item to the min of next task key and reset task key.
We also need to make sure that the inclusive min of that progress item and the inclusive min of the virtual slice is decreased if the reset task key is less than those 2.
What changed?
Timer task queue now keeps incoming tasks in an in-memory queue until the next execution iteration
Why?
To reduce load on DB by avoiding agressively polling it for next timer tasks
How did you test it?
Unit tests and manual testing