Skip to content

replicaset_mode: fix queue.tube indexes #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,39 @@ function method._on_consumer_disconnect()
session.disconnect(conn_id)
end

-- function takes tuples and recreates tube
local function recreate_tube(tube_tuple)
local name, id, space_name, tube_type, opts = tube_tuple:unpack()

local driver = queue.driver[tube_type]
if driver == nil then
error("Unknown tube type " .. tostring(tube_type))
end

local space = box.space[space_name]
if space == nil then
error(("Space '%s' doesn't exists"):format(space_name))
end
return make_self(driver, space, name, tube_type, id, opts)
end

-- Function takes new queue state.
-- The "RUNNING" and "WAITING" states do not require additional actions.
local function on_state_change(state)
if state == queue_state.states.STARTUP then
local replicaset_mode = queue.cfg['in_replicaset'] or false
-- gh-202: In replicaset mode, tubes can be created and deleted on different nodes.
-- Accordingly, it is necessary to rebuild the queue.tube index.
if replicaset_mode then
for _, tube_name in pairs(queue.tube()) do
queue.tube[tube_name] = nil
end
for _, tube_tuple in box.space._queue:pairs() do
if queue.driver[tube_tuple[4]] ~= nil then
recreate_tube(tube_tuple)
end
end
Comment on lines +550 to +554
Copy link
Contributor

@oleg-jukovec oleg-jukovec Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could extract it in a separate function: recreate_tubes. The code appears at least two times in the file.

end
for name, tube in pairs(queue.tube) do
tube_release_all_orphaned_tasks(tube)
log.info('queue: [tube "%s"] start driver', name)
Expand Down Expand Up @@ -551,22 +580,6 @@ local function on_state_change(state)
end
end

-- function takes tuples and recreates tube
local function recreate_tube(tube_tuple)
local name, id, space_name, tube_type, opts = tube_tuple:unpack()

local driver = queue.driver[tube_type]
if driver == nil then
error("Unknown tube type " .. tostring(tube_type))
end

local space = box.space[space_name]
if space == nil then
error(("Space '%s' doesn't exists"):format(space_name))
end
return make_self(driver, space, name, tube_type, id, opts)
end

-------------------------------------------------------------------------------
-- create tube
function method.create_tube(tube_name, tube_type, opts)
Expand Down
53 changes: 52 additions & 1 deletion t/200-master-replica.t
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ end
-- Replica connection handler.
local conn = {}

test:plan(7)
test:plan(8)

test:test('Check master-replica setup', function(test)
test:plan(8)
Expand Down Expand Up @@ -337,6 +337,57 @@ test:test('Check in_replicaset switching', function(test)
'taken tasks count after release_all')
end)

-- gh-202
test:test('Check that tubes indexes is actual after role change', function(test)
local engine = os.getenv('ENGINE') or 'memtx'
test:plan(10)
box.cfg{read_only = true}
queue_state.poll(queue_state.states.WAITING, 10)
test:is(queue.state(), 'WAITING', 'master state is waiting')
conn:eval('box.cfg{read_only=false}')
conn:eval([[
queue_state = require('queue.abstract.queue_state')
queue_state.poll(queue_state.states.RUNNING, 10)
]])
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
conn:eval([[queue.create_tube('repl_tube', 'fifo', {engine =]] .. engine .. [[})]])

-- Switch roles back.
conn:eval('box.cfg{read_only=true}')
conn:eval([[
queue_state = require('queue.abstract.queue_state')
queue_state.poll(queue_state.states.WAITING, 10)
]])
box.cfg{read_only = false}
queue_state.poll(queue_state.states.RUNNING, 10)
test:is(queue.state(), 'RUNNING', 'master state is running')
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
test:ok(queue.tube.repl_tube, 'repl_tube is accessible')

box.cfg{read_only = true}
queue_state.poll(queue_state.states.WAITING, 10)
test:is(queue.state(), 'WAITING', 'master state is waiting')
conn:eval('box.cfg{read_only=false}')
conn:eval([[
queue_state = require('queue.abstract.queue_state')
queue_state.poll(queue_state.states.RUNNING, 10)
]])
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
conn:eval('queue.tube.repl_tube:drop()')

-- Switch roles back.
conn:eval('box.cfg{read_only=true}')
conn:eval([[
queue_state = require('queue.abstract.queue_state')
queue_state.poll(queue_state.states.WAITING, 10)
]])
box.cfg{read_only = false}
queue_state.poll(queue_state.states.RUNNING, 10)
test:is(queue.state(), 'RUNNING', 'master state is running')
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
test:isnil(queue.tube.repl_tube, "repl_tube is not indexed")
end)

rawset(_G, 'queue', nil)
conn:eval('rawset(_G, "queue", nil)')
conn:close()
Expand Down
2 changes: 1 addition & 1 deletion t/tnt/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ local function tnt_cluster_prepare(cfg_args)

box.cfg(cfg_args)
-- Allow guest all operations.
box.schema.user.grant('guest', 'read, write, execute', 'universe')
box.schema.user.grant('guest', 'read, write, execute, create, drop', 'universe')
box.schema.user.create('replicator', {password = 'password'})
box.schema.user.grant('replicator', 'replication')

Expand Down