Skip to content

Commit cede945

Browse files
committed
utubettl: fix work with "ttl" of buried task
Before the patch if a task has been "buried" after it was "taken" (and the task has "ttr") when the time in `i_next_event` will be interpreted as "ttl" in `utubettl_fiber_iteration` and the task will be deleted.
1 parent 8c0c535 commit cede945

File tree

2 files changed

+47
-31
lines changed

2 files changed

+47
-31
lines changed

queue/abstract/driver/utubettl.lua

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,22 @@ end
343343

344344
-- bury task
345345
function method.bury(self, id)
346-
local task = self.space:update(id, {{ '=', i_status, state.BURIED }})
347-
if task ~= nil then
348-
return process_neighbour(
349-
self, task:transform(i_status, 1, state.BURIED), 'bury'
350-
)
346+
-- The `i_next_event` should be updated because if the task has been
347+
-- "buried" after it was "taken" (and the task has "ttr") when the time in
348+
-- `i_next_event` will be interpreted as "ttl" in `utubettl_fiber_iteration`
349+
-- and the task will be deleted.
350+
local task = self.space:get{id}
351+
if task == nil then
352+
return
351353
end
352-
self:on_task_change(task, 'bury')
354+
task = self.space:update(id, {
355+
{ '=', i_status, state.BURIED },
356+
{ '=', i_next_event, task[i_created] + task[i_ttl] }
357+
})
358+
359+
return process_neighbour(
360+
self, task:transform(i_status, 1, state.BURIED), 'bury'
361+
)
353362
end
354363

355364
-- unbury several tasks

t/190-work-with-ttl-buried-task.t

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,42 @@ tnt.cfg{}
2222
test:test('test work with "ttl", when "bury" after "take"', function(test)
2323
-- Before the patch if a task has been "buried" after it was "taken"
2424
-- (and the task has "ttr") when the time in `i_next_event` will be
25-
-- interpreted as "ttl" in `fifottl_fiber_iteration` and the task will
26-
-- be deleted.
27-
test:plan(3)
25+
-- interpreted as "ttl" in `{fifottl,utubettl}_fiber_iteration` and
26+
-- the task will be deleted.
27+
local drivers = {'fifottl', 'utubettl'}
28+
test:plan(3 * table.getn(drivers))
2829

2930
local TTR = 0.2
3031
local TTL = 1
3132

32-
local driver = 'fifottl'
33-
local tube = queue.create_tube('test_tube', driver, {if_not_exists = true})
34-
local task = tube:put('task1', {ttl = TTL, ttr = TTR})
35-
36-
-- "Take" a task and "bury" it.
37-
task = tube:take(0)
38-
local id = task[TASK_ID]
39-
tube:bury(id)
40-
41-
-- Check status of the task.
42-
task = tube:peek(id)
43-
test:is(task[TASK_STATE], state.BURIED, 'task "buried"')
44-
45-
-- Check status of the task after "ttr" has expired.
46-
fiber.sleep(TTR * 2)
47-
task = tube:peek(id)
48-
test:is(task[TASK_STATE], state.BURIED, 'task is still "buried"')
49-
50-
-- Check status of the task after "ttl" has expired.
51-
fiber.sleep(TTL * 2)
52-
task, err = pcall(tube.peek, tube, id)
53-
test:ok(err:match(string.format('Task %d not found', id)), 'task done')
33+
for _, driver in pairs(drivers) do
34+
local tube = queue.create_tube('test_tube', driver, {if_not_exists = true})
35+
local task = tube:put('task1', {ttl = TTL, ttr = TTR})
36+
37+
-- "Take" a task and "bury" it.
38+
task = tube:take(0)
39+
local id = task[TASK_ID]
40+
tube:bury(id)
41+
42+
-- Check status of the task.
43+
task = tube:peek(id)
44+
test:is(task[TASK_STATE], state.BURIED,
45+
('task "buried", driver: "%s"'):format(driver))
46+
47+
-- Check status of the task after "ttr" has expired.
48+
fiber.sleep(TTR * 2)
49+
task = tube:peek(id)
50+
test:is(task[TASK_STATE], state.BURIED,
51+
('task is still "buried", driver: "%s"'):format(driver))
52+
53+
-- Check status of the task after "ttl" has expired.
54+
fiber.sleep(TTL * 2)
55+
ok, res = pcall(tube.peek, tube, id)
56+
test:ok(res:match(string.format('Task %d not found', id)),
57+
('task done, driver: "%s"'):format(driver))
58+
59+
tube:drop()
60+
end
5461
end)
5562

5663
tnt.finish()

0 commit comments

Comments
 (0)