-
Notifications
You must be signed in to change notification settings - Fork 5
/
retry_queue.go
69 lines (58 loc) · 1.38 KB
/
retry_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package task
import (
"container/heap"
"sync"
)
type retryQueue struct {
retrySlice
retryQueueShutDownFlag bool
retryQueueWait *sync.WaitGroup
}
func newRetryQueue(retryQueueWait *sync.WaitGroup) *retryQueue {
retrySlice := retrySlice{}
heap.Init(&retrySlice)
return &retryQueue{
retrySlice: retrySlice,
retryQueueShutDownFlag: false,
retryQueueWait: retryQueueWait,
}
}
func (retryQueue *retryQueue) sendToRetryQueue(task *task) {
if task != nil {
heap.Push(retryQueue, task)
}
}
func (retryQueue *retryQueue) getRetryTaskList() []*task {
var taskList []*task
for retryQueue.Len() > 0 {
t := heap.Pop(retryQueue)
if t.(*task).nextRetryMs < getTimeMs() {
taskList = append(taskList, t.(*task))
} else {
heap.Push(retryQueue, t.(*task))
break
}
}
return taskList
}
type retrySlice []*task
func (retryQueue retrySlice) Len() int {
return len(retryQueue)
}
func (retryQueue retrySlice) Less(i, j int) bool {
return retryQueue[i].nextRetryMs < retryQueue[j].nextRetryMs
}
func (retryQueue retrySlice) Swap(i, j int) {
retryQueue[i], retryQueue[j] = retryQueue[j], retryQueue[i]
}
func (retryQueue *retrySlice) Push(x interface{}) {
item := x.(*task)
*retryQueue = append(*retryQueue, item)
}
func (retryQueue *retrySlice) Pop() interface{} {
old := *retryQueue
n := len(old)
item := old[n-1]
*retryQueue = old[0 : n-1]
return item
}