Skip to content

Commit 6c2aa6a

Browse files
committed
Fix task release in session disconnect
Fixes #103
1 parent a812c10 commit 6c2aa6a

File tree

2 files changed

+74
-4
lines changed

2 files changed

+74
-4
lines changed

queue/abstract.lua

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,22 @@ function tube.ack(self, id)
151151
return result
152152
end
153153

154-
function tube.release(self, id, opts)
154+
local function tube_release_internal(self, id, opts, session_id)
155155
opts = opts or {}
156-
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
156+
local _taken = box.space._queue_taken:get{session_id, self.tube_id, id}
157157
if _taken == nil then
158158
error("Task was not taken in the session")
159159
end
160160

161-
box.space._queue_taken:delete{session.id(), self.tube_id, id}
161+
box.space._queue_taken:delete{session_id, self.tube_id, id}
162162
self:peek(id)
163163
return self.raw:normalize_task(self.raw:release(id, opts))
164164
end
165165

166+
function tube.release(self, id, opts)
167+
return tube_release_internal(self, id, opts, session.id())
168+
end
169+
166170
function tube.peek(self, id)
167171
local task = self.raw:peek(id)
168172
if task == nil then
@@ -371,7 +375,7 @@ function method._on_consumer_disconnect()
371375
log.warn("Consumer %s disconnected, release task %s(%s)",
372376
id, task[3], tube[1])
373377

374-
queue.tube[tube[1]]:release(task[3])
378+
tube_release_internal(queue.tube[tube[1]], task[3], nil, id)
375379
end
376380
end
377381
end

t/110-disconnect-trigger-check.t

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fiber = require('fiber')
4+
local netbox = require('net.box')
5+
local os = require('os')
6+
local queue = require('queue')
7+
local test = require('tap').test()
8+
local tnt = require('t.tnt')
9+
10+
11+
local tube
12+
13+
14+
local function check_result()
15+
test:plan(2)
16+
if tube == nil then
17+
os.exit(-1)
18+
end
19+
20+
local ok, res = pcall(tube.drop, tube)
21+
test:is(ok, true, 'drop empty queue')
22+
test:is(res, true, 'tube:drop() result is true')
23+
24+
tnt.finish()
25+
os.exit(test:check() == true and 0 or -1)
26+
end
27+
28+
29+
local function test_lost_session_id_after_yield(test)
30+
-- See
31+
-- https://github.com/tarantool/queue/issues/103
32+
-- https://github.com/tarantool/tarantool/issues/4627
33+
34+
-- We must check the results of a test after
35+
-- the queue._on_consumer_disconnect trigger
36+
-- has been done.
37+
-- The type of a triggers queue is LIFO
38+
box.session.on_disconnect(check_result)
39+
40+
local listen = 'localhost:1918'
41+
tnt.cfg{ listen = listen }
42+
43+
local driver = 'fifottl'
44+
tube = queue.create_tube('test_tube', driver,
45+
{ if_not_exists = true })
46+
47+
rawset(_G, 'queue', require('queue'))
48+
tube:grant('guest', { call = true })
49+
50+
-- Needed for yielding into
51+
-- the queue._on_consumer_disconnect trigger
52+
queue.tube.test_tube:put('1')
53+
queue.tube.test_tube:put('2')
54+
local connection = netbox.connect(listen)
55+
connection:call('queue.tube.test_tube:take')
56+
connection:call('queue.tube.test_tube:take')
57+
58+
connection:close()
59+
60+
fiber.sleep(5)
61+
-- Fail. Trigger check_result() is a valid exit point
62+
os.exit(-1)
63+
end
64+
65+
66+
test:test('Lost a session id after yield', test_lost_session_id_after_yield)

0 commit comments

Comments
 (0)