Skip to content

Commit 2344c46

Browse files
committed
Add a check of the session state before take a task
We need to check of a session state before take a task after wait() because session maybe in disconnecting state and task will be hang in a take state after the session will be disconnected Fixes #104
1 parent de0a346 commit 2344c46

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

queue/abstract.lua

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ function tube.put(self, data, opts)
7373
end
7474

7575
local conds = {}
76+
local releasing_sessions = {}
7677

7778
function tube.take(self, timeout)
7879
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -94,7 +95,13 @@ function tube.take(self, timeout)
9495
conds[fid]:free()
9596
box.space._queue_consumers:delete{ sid, fid }
9697

97-
task = self.raw:take()
98+
-- We don't take a task if the session is in a disconnecting state
99+
if releasing_sessions[fid] == nil then
100+
task = self.raw:take()
101+
else
102+
releasing_sessions[fid] = nil
103+
return nil
104+
end
98105

99106
if task ~= nil then
100107
return self.raw:normalize_task(task)
@@ -356,6 +363,7 @@ function method._on_consumer_disconnect()
356363
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
357364
local cond = conds[waiter[2]]
358365
if cond then
366+
releasing_sessions[waiter[2]] = true
359367
cond:signal(waiter[2])
360368
end
361369
end

t/120-take-task-after-reconnect.t

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fiber = require('fiber')
4+
local netbox = require('net.box')
5+
local os = require('os')
6+
local queue = require('queue')
7+
local tap = require('tap')
8+
local tnt = require('t.tnt')
9+
10+
11+
local test = tap.test('take a task after reconnect')
12+
test:plan(1)
13+
14+
local listen = 'localhost:1918'
15+
tnt.cfg{listen = listen}
16+
17+
18+
local function test_take_task_after_disconnect(test)
19+
test:plan(1)
20+
local driver = 'fifottl'
21+
local tube = queue.create_tube('test_tube', driver,
22+
{if_not_exists = true})
23+
rawset(_G, 'queue', require('queue'))
24+
tube:grant('guest', {call = true})
25+
task_id = tube:put('test_data')[1]
26+
-- Now we have one task in a ready state
27+
28+
local connection = netbox.connect(listen)
29+
local fiber_1 = fiber.create(function()
30+
connection:call('queue.tube.test_tube:take')
31+
connection:call('queue.tube.test_tube:take')
32+
end)
33+
34+
-- This is not a best practice but we need to use the fiber.sleep()
35+
-- (not fiber.yield()).
36+
-- Expected results from a sleep() calling:
37+
-- 1) Execute first connection:call('queue.tube.test_tube:take')
38+
-- Now one task in a taken state
39+
-- 2) Call the second connection:call('queue.tube.test_tube:take')
40+
-- and to hang the fiber_1
41+
-- 3) Start a fiber on the server side of connection which will execute
42+
-- second queue.tube.test_tube:take call and hang because the queue
43+
-- is empty
44+
fiber.sleep(0.1)
45+
46+
connection:close()
47+
48+
fiber.sleep(0.1)
49+
-- The taken task will be released (cause - disconnection).
50+
-- After that the fiber which waiting of a ready task (into take procedure)
51+
-- will try to take this task (before the fix).
52+
53+
54+
test:is(tube:peek(task_id)[2] == 'r', true, 'Task in ready state')
55+
end
56+
57+
58+
test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect)
59+
60+
61+
tnt.finish()
62+
os.exit(test:check() and 0 or 1)

0 commit comments

Comments
 (0)