Skip to content

Fix task release in session disconnect #107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,22 @@ function tube.ack(self, id)
return result
end

function tube.release(self, id, opts)
local function tube_release_internal(self, id, opts, session_id)
opts = opts or {}
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
local _taken = box.space._queue_taken:get{session_id, self.tube_id, id}
if _taken == nil then
error("Task was not taken in the session")
end

box.space._queue_taken:delete{session.id(), self.tube_id, id}
box.space._queue_taken:delete{session_id, self.tube_id, id}
self:peek(id)
return self.raw:normalize_task(self.raw:release(id, opts))
end

function tube.release(self, id, opts)
return tube_release_internal(self, id, opts, session.id())
end

function tube.peek(self, id)
local task = self.raw:peek(id)
if task == nil then
Expand Down Expand Up @@ -371,7 +375,7 @@ function method._on_consumer_disconnect()
log.warn("Consumer %s disconnected, release task %s(%s)",
id, task[3], tube[1])

queue.tube[tube[1]]:release(task[3])
tube_release_internal(queue.tube[tube[1]], task[3], nil, id)
end
end
end
Expand Down
98 changes: 98 additions & 0 deletions t/110-disconnect-trigger-check.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env tarantool

local fiber = require('fiber')
local netbox = require('net.box')
local os = require('os')
local queue = require('queue')
local tap = require('tap')
local tnt = require('t.tnt')

local tube
local test = tap.test('lost a session id after yield')

-- The test cases are in check_result().
test:plan(2)

-- Verify that _queue_taken space is empty.
local function check_result()
if tube == nil then
os.exit(1)
end

-- tube:drop() is most simple way to check that _queue_taken
-- is empty. It give an error if it is not so.
local ok, res = pcall(tube.drop, tube)
test:is(ok, true, 'drop empty queue')
test:is(res, true, 'tube:drop() result is true')

tnt.finish()
os.exit(test:check() and 0 or 1)
end

-- Yield in queue's on_disconnect trigger (which handles a client
-- disconnection) may lead to a situation when _queue_taken
-- temporary space is not cleaned and becomes inconsistent with
-- 'status' field in <tube_name> space. This appears only on
-- tarantool versions affected by gh-4627.
--
-- See https://github.com/tarantool/queue/issues/103
-- See https://github.com/tarantool/tarantool/issues/4627
local function test_lost_session_id_after_yield()
-- We must check the results of a test after
-- the queue._on_consumer_disconnect trigger
-- has been done.
--
-- Triggers are run in LIFO order.
box.session.on_disconnect(check_result)

local listen = 'localhost:1918'
tnt.cfg{listen = listen}

local driver = 'fifottl'
tube = queue.create_tube('test_tube', driver, {if_not_exists = true})

rawset(_G, 'queue', require('queue'))
tube:grant('guest', {call = true})

-- We need at least two tasks to trigger box.session.id()
-- call after a yield in the queue._on_consumer_disconnect
-- trigger (in the version of queue before the fix). See
-- more below.
queue.tube.test_tube:put('1')
queue.tube.test_tube:put('2')
local connection = netbox.connect(listen)
connection:call('queue.tube.test_tube:take')
connection:call('queue.tube.test_tube:take')

-- After disconnection of a client the _on_consumer_disconnect
-- trigger is run. It changes 'status' field for tuples in
-- <tube_name> space in a loop and removes the corresponding
-- tuples from _queue_taken space. The version before the fix
-- operates in this way:
--
-- | <_on_consumer_disconnect>
-- | for task in tasks of the client:
-- | call <task:release>
-- |
-- | <task:release>
-- | delete _queue_taken tuple using box.session.id()
-- | update <tube_name> space using task_id -- !! yield
--
-- So the deletion from _queue_taken may be unable to delete
-- right tuples for second and following tasks, because
-- box.session.id() may give a garbage.
connection:close()

-- Wait for check_result() trigger, which will ensure that
-- _queue_taken space is cleaned and will exit successfully
-- in the case (or exit abnormally otherwise).
fiber.sleep(5)

-- Wrong session id may lead to 'Task was not taken in the
-- session' error in the _on_consumer_disconnect and so the
-- second on_disconnect trigger (check_result) will not be
-- fired.
os.exit(1)
end

test_lost_session_id_after_yield()