Skip to content

Commit 32ff608

Browse files
committed
Delete TTL tasks using delete
And add states for: * delayed (delayed -> ready) * TTL (* -> done) * TTR (taken -> ready) closes gh-51
1 parent 6b26929 commit 32ff608

File tree

3 files changed

+13
-16
lines changed

3 files changed

+13
-16
lines changed

queue/abstract.lua

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -432,14 +432,11 @@ end
432432

433433
local function build_stats(space)
434434
local stats = {tasks = {}, calls = {
435-
ack = 0,
436-
bury = 0,
437-
delete = 0,
438-
kick = 0,
439-
put = 0,
440-
release = 0,
441-
take = 0,
442-
touch = 0
435+
ack = 0, bury = 0, delete = 0,
436+
kick = 0, put = 0, release = 0,
437+
take = 0, touch = 0,
438+
-- for *ttl queues only
439+
ttl = 0, ttr = 0, delay = 0,
443440
}}
444441

445442
local st = rawget(queue.stat, space) or {}

queue/abstract/driver/fifottl.lua

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ function method._fiber(self)
122122
{ '=', i_status, state.READY },
123123
{ '=', i_next_event, task[i_created] + task[i_ttl] }
124124
})
125-
self:on_task_change(task)
125+
self:on_task_change(task, 'delay')
126126
estimated = 0
127127
processed = processed + 1
128128
else
@@ -135,8 +135,8 @@ function method._fiber(self)
135135
task = self.space.index.watch:min{ state }
136136
if task ~= nil and task[i_status] == state then
137137
if now >= task[i_next_event] then
138-
self.space:delete(task[i_id])
139-
self:on_task_change(task:transform(2, 1, state.DONE))
138+
task = self:delete(task[i_id]):transform(2, 1, state.DONE)
139+
self:on_task_change(task, 'ttl')
140140
estimated = 0
141141
processed = processed + 1
142142
else
@@ -154,7 +154,7 @@ function method._fiber(self)
154154
{ '=', i_status, state.READY },
155155
{ '=', i_next_event, task[i_created] + task[i_ttl] }
156156
})
157-
self:on_task_change(task)
157+
self:on_task_change(task, 'ttr')
158158
estimated = 0
159159
processed = processed + 1
160160
else

queue/abstract/driver/utubettl.lua

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ function method._fiber(self)
138138
{ '=', i_status, state.READY },
139139
{ '=', i_next_event, task[i_created] + task[i_ttl] }
140140
})
141-
self:on_task_change(task)
141+
self:on_task_change(task, 'delayed')
142142
estimated = 0
143143
processed = processed + 1
144144
else
@@ -151,8 +151,8 @@ function method._fiber(self)
151151
task = self.space.index.watch:min{ state }
152152
if task ~= nil and task[i_status] == state then
153153
if now >= task[i_next_event] then
154-
self.space:delete(task[i_id])
155-
self:on_task_change(task:transform(2, 1, state.DONE))
154+
task = self:delete(task[i_id]):transform(2, 1, state.DONE)
155+
self:on_task_change(task, 'ttl')
156156
estimated = 0
157157
processed = processed + 1
158158
else
@@ -170,7 +170,7 @@ function method._fiber(self)
170170
{ '=', i_status, state.READY },
171171
{ '=', i_next_event, task[i_created] + task[i_ttl] }
172172
})
173-
self:on_task_change(task)
173+
self:on_task_change(task, 'ttr')
174174
estimated = 0
175175
processed = processed + 1
176176
else

0 commit comments

Comments
 (0)