forked from wellle/rmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis_scripts.go
145 lines (112 loc) · 3.05 KB
/
redis_scripts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package rmq
var redisScripts = map[string]string{
"publish": `
local call = redis.call
local readyQueue = KEYS[1]
local priorityQueue = KEYS[2]
local value = ARGV[1]
local jobPriority = tonumber(ARGV[2])
local jobId = call("INCR", "increasing_id")
call('set', jobId .. "_value", value);
call('set', jobId .. "_priority", jobPriority);
if jobPriority == 0 then
call('LPUSH', readyQueue, jobId);
else
call("ZADD", priorityQueue, jobPriority, jobId)
end
`,
"consume": `
local call = redis.call
local readyQueue = KEYS[1]
local unackedQueue = KEYS[2]
local priorityQueue = KEYS[3]
local jobId
local results = call("ZPopMax", priorityQueue)
local length = #results
if length == 2 then
jobId = results[1]
call("LPush", unackedQueue, jobId)
else
jobId = call("RPopLPush", readyQueue, unackedQueue)
end
return jobId
`,
"ack": `
local call = redis.call
local unackedQueue = KEYS[1]
local jobId = tonumber(ARGV[1])
local count = call("LREM", unackedQueue, 1, jobId)
call("DEL", jobId .. "_value")
call("DEL", jobId .. "_priority")
return count
`,
"move": `
local call = redis.call
local sourceQueue = KEYS[1]
local destinationQueue = KEYS[2]
local priorityQueueSource = KEYS[3]
local priorityQueueDestination = KEYS[4]
local jobId = tonumber(ARGV[1])
call("LREM", sourceQueue, 1, jobId)
if priorityQueueSource then
call("ZREM", priorityQueueSource, jobId)
end
if priorityQueueDestination then
local jobPriority = call("GET", jobId .. "_priority")
if jobPriority == 0 then
call('LPUSH', destinationQueue, jobId);
else
call("ZADD", priorityQueueDestination, jobPriority, jobId)
end
else
call("LPUSH", destinationQueue, jobId)
end
`,
"return": `
local call = redis.call
local fromQueue = KEYS[1]
local readyQueue = KEYS[2]
local priorityQueue = KEYS[3]
local count = tonumber(ARGV[1])
local countAffected = 0
if not count then
count = call("LLen", fromQueue)
end
for i=1,count do
local jobId = call("RPop", fromQueue)
if jobId then
local jobPriority = call("GET", jobId .. "_priority")
if jobPriority == 0 then
call('LPUSH', readyQueue, jobId);
else
call("ZADD", priorityQueue, jobPriority, jobId)
end
countAffected = countAffected + 1
end
end
return countAffected
`,
"purge": `
local call = redis.call
local queueToPurge = KEYS[1]
local priorityQueue = KEYS[2]
local countAffected = 0
local countOfPrioritizedAffected = 0
local countAffected = call("LLen", queueToPurge)
for i=1,countAffected do
local jobId = call("RPop", queueToPurge)
call("DEL", jobId .. "_value")
call("DEL", jobId .. "_priority")
end
if priorityQueue then
local countOfPrioritizedAffected = call("ZCount", priorityQueue, '-inf', '+inf')
countAffected = countAffected + countOfPrioritizedAffected
for i=1,countOfPrioritizedAffected do
local jobId = call("ZPOPMAX", priorityQueue)
call("DEL", jobId .. "_value")
call("DEL", jobId .. "_priority")
end
end
return countAffected
`,
}