Skip to content

Commit 4e96760

Browse files
committed
Add a check of the session state before take a task
We need to check of a session state before take a task after wait() because session maybe in disconnecting state and task will be hang in a take state after the session will be disconnected Fixes #104
1 parent ab45a10 commit 4e96760

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

queue/abstract.lua

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ function tube.put(self, data, opts)
7373
end
7474

7575
local conds = {}
76+
local releasing_sessions = {}
7677

7778
function tube.take(self, timeout)
7879
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -94,7 +95,13 @@ function tube.take(self, timeout)
9495
conds[fid]:free()
9596
box.space._queue_consumers:delete{ sid, fid }
9697

97-
task = self.raw:take()
98+
-- We don't take a task if the session is in a disconnecting state
99+
if releasing_sessions[fid] == nil then
100+
task = self.raw:take()
101+
else
102+
releasing_sessions[fid] = nil
103+
return nil
104+
end
98105

99106
if task ~= nil then
100107
return self.raw:normalize_task(task)
@@ -352,6 +359,7 @@ function method._on_consumer_disconnect()
352359
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
353360
local cond = conds[waiter[2]]
354361
if cond then
362+
releasing_sessions[waiter[2]] = true
355363
cond:signal(waiter[2])
356364
end
357365
end

t/110-take-task-after-reconnect.t

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fiber = require('fiber')
4+
local netbox = require('net.box')
5+
local os = require('os')
6+
local queue = require('queue')
7+
local test = require('tap').test()
8+
local tnt = require('t.tnt')
9+
10+
11+
test:plan(1)
12+
13+
local listen = 'localhost:1918'
14+
tnt.cfg{ listen = listen }
15+
16+
17+
local function test_take_task_after_disconnect(test)
18+
test:plan(1)
19+
local driver = 'fifottl'
20+
local tube = queue.create_tube('test_tube', driver,
21+
{ if_not_exists = true })
22+
rawset(_G, 'queue', require('queue'))
23+
tube:grant('guest', { call = true })
24+
queue.tube.test_tube:put('test_data')
25+
26+
local connection = netbox.connect(listen)
27+
local fiber_1 = fiber.create(function()
28+
connection:call('queue.tube.test_tube:take')
29+
connection:call('queue.tube.test_tube:take')
30+
end)
31+
32+
-- This is not a best practice but we need to use the fiber.sleep().
33+
-- Expected results from a sleep() calling:
34+
-- 1) Execute first connection:call('queue.tube.test_tube:take')
35+
-- 2) Call the second connection:call('queue.tube.test_tube:take')
36+
-- and to hang the fiber_1
37+
-- 3) Start a fiber on the server side of connection which will execute
38+
-- second queue.tube.test_tube:take call and hang because the queue
39+
-- is empty
40+
fiber.sleep(0.1)
41+
42+
connection:close()
43+
44+
-- Continue the queue.tube.test_tube:take call from a third step of the
45+
-- previous comment
46+
fiber.sleep(0.1)
47+
48+
test:is((box.space.test_tube:select()[1][2]) == 'r', true, 'Task in ready state')
49+
end
50+
51+
52+
test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect)
53+
54+
55+
tnt.finish()
56+
os.exit(test:check() == true and 0 or -1)

0 commit comments

Comments
 (0)