Skip to content

Commit 79a544f

Browse files
committed
Add a check of the session state before take a task
We need to check of a session state before take a task after wait() because session maybe in disconnecting state and task will be hang in a take state after the session will be disconnected Fixes: #104
1 parent a812c10 commit 79a544f

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

queue/abstract.lua

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ function tube.put(self, data, opts)
7373
end
7474

7575
local conds = {}
76+
local releasing_sessions = {}
7677

7778
function tube.take(self, timeout)
7879
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -94,7 +95,13 @@ function tube.take(self, timeout)
9495
conds[fid]:free()
9596
box.space._queue_consumers:delete{ sid, fid }
9697

97-
task = self.raw:take()
98+
-- We don't take a task if the session is in a disconnecting state
99+
if releasing_sessions[fid] == nil then
100+
task = self.raw:take()
101+
else
102+
releasing_sessions[fid] = nil
103+
return nil
104+
end
98105

99106
if task ~= nil then
100107
return self.raw:normalize_task(task)
@@ -352,6 +359,7 @@ function method._on_consumer_disconnect()
352359
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
353360
local cond = conds[waiter[2]]
354361
if cond then
362+
releasing_sessions[waiter[2]] = true
355363
cond:signal(waiter[2])
356364
end
357365
end

t/110-take-task-after-reconnect.t

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fiber = require('fiber')
4+
local netbox = require('net.box')
5+
local os = require('os')
6+
local queue = require('queue')
7+
local test = require('tap').test()
8+
local tnt = require('t.tnt')
9+
10+
11+
test:plan(1)
12+
13+
local listen = 'localhost:1918'
14+
tnt.cfg{ listen = listen }
15+
16+
17+
local function test_take_task_after_disconnect(test)
18+
test:plan(1)
19+
local driver = 'fifottl'
20+
local tube = queue.create_tube('test_tube', driver,
21+
{ if_not_exists = true })
22+
rawset(_G, 'queue', require('queue'))
23+
tube:grant('guest', { call = true })
24+
queue.tube.test_tube:put('test_data')
25+
26+
local connection = netbox.connect(listen)
27+
local fiber_1 = fiber.create(function()
28+
connection:call('queue.tube.test_tube:take')
29+
connection:call('queue.tube.test_tube:take')
30+
end)
31+
32+
fiber.sleep(0.1)
33+
connection:close()
34+
fiber.set_joinable(fiber_1, true)
35+
fiber.kill(fiber_1)
36+
fiber.join(fiber_1)
37+
fiber.sleep(0.1)
38+
39+
test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in ready state')
40+
end
41+
42+
43+
test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect)
44+
45+
46+
tnt.finish()
47+
os.exit(test:check() == true and 0 or -1)

0 commit comments

Comments
 (0)