Skip to content

Commit de7c0f3

Browse files
0x501DLeonidVas
authored andcommitted
test: add master-replica switching tests
Follows up #120
1 parent 9f54091 commit de7c0f3

File tree

2 files changed

+349
-1
lines changed

2 files changed

+349
-1
lines changed

t/200-master-replica.t

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fio = require('fio')
4+
local tnt = require('t.tnt')
5+
local test = require('tap').test('')
6+
local uuid = require('uuid')
7+
local queue = require('queue')
8+
local fiber = require('fiber')
9+
10+
local session = require('queue.abstract.queue_session')
11+
local queue_state = require('queue.abstract.queue_state')
12+
rawset(_G, 'queue', require('queue'))
13+
14+
-- Replica connection handler
15+
local conn = {}
16+
17+
test:plan(5)
18+
19+
test:test('Check master-replica setup', function(test)
20+
test:plan(8)
21+
local engine = os.getenv('ENGINE') or 'memtx'
22+
tnt.cluster.cfg{}
23+
24+
test:ok(rawget(box, 'space'), 'box started')
25+
test:ok(queue, 'queue is loaded')
26+
27+
test:ok(tnt.cluster.wait_replica(), 'wait for replica to connect')
28+
conn = tnt.cluster.connect_replica()
29+
test:ok(conn.error == nil, 'no errors on connect to replica')
30+
test:ok(conn:ping(), 'ping replica')
31+
test:is(queue.state(), 'RUNNING', 'check master queue state')
32+
conn:eval('rawset(_G, "queue", require("queue"))')
33+
test:is(conn:call('queue.state'), 'INIT', 'check replica queue state')
34+
35+
-- Setup tube. Set ttr = 0.5 for sessions expire testing.
36+
conn:call('queue.cfg', {{ttr = 0.5}})
37+
queue.cfg{ttr = 0.5}
38+
local tube = queue.create_tube('test', 'fifo', {engine = engine})
39+
test:ok(tube, 'test tube created')
40+
end)
41+
42+
test:test('Check queue state switching', function(test)
43+
test:plan(4)
44+
box.cfg{read_only = true}
45+
test:ok(queue_state.poll(queue_state.states.WAITING, 10),
46+
"queue state changed to waiting")
47+
test:is(session.expiration_fiber:status(), 'dead',
48+
"check that session expiration fiber is canceled")
49+
box.cfg{read_only = false}
50+
test:ok(queue_state.poll(queue_state.states.RUNNING, 10),
51+
"queue state changed to running")
52+
test:is(session.expiration_fiber:status(), 'suspended',
53+
"check that session expiration fiber started")
54+
end)
55+
56+
test:test('Check session resuming', function(test)
57+
test:plan(17)
58+
local client = tnt.cluster.connect_master()
59+
test:ok(client.error == nil, 'no errors on client connect to master')
60+
local session_uuid = client:call('queue.identify')
61+
local uuid_obj = uuid.frombin(session_uuid)
62+
63+
test:ok(queue.tube.test:put('testdata'), 'put task')
64+
local task_master = client:call('queue.tube.test:take')
65+
test:ok(task_master, 'task was taken')
66+
test:is(task_master[3], 'testdata', 'task.data')
67+
client:close()
68+
69+
local qt = box.space._queue_taken_2:select()
70+
test:is(uuid.frombin(qt[1][4]):str(), uuid_obj:str(),
71+
'task taken by actual uuid')
72+
73+
-- Wait for disconnect callback.
74+
local attempts = 0
75+
while true do
76+
local is = box.space._queue_inactive_sessions:select()
77+
78+
if is[1] then
79+
test:is(uuid.frombin(is[1][1]):str(), uuid_obj:str(),
80+
'check inactive sessions')
81+
break
82+
end
83+
84+
attempts = attempts + 1
85+
if attempts == 10 then
86+
test:ok(false, 'check inactive sessions')
87+
return false
88+
end
89+
fiber.sleep(0.01)
90+
end
91+
92+
-- Switch roles.
93+
box.cfg{read_only = true}
94+
queue_state.poll(queue_state.states.WAITING, 10)
95+
test:is(queue.state(), 'WAITING', 'master state is waiting')
96+
conn:eval('box.cfg{read_only=false}')
97+
conn:eval([[
98+
queue_state = require('queue.abstract.queue_state')
99+
queue_state.poll(queue_state.states.RUNNING, 10)
100+
]])
101+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
102+
103+
local cfg = conn:eval('return queue.cfg')
104+
test:is(cfg.ttr, 0.5, 'check cfg applied after lazy start')
105+
106+
test:ok(conn:call('queue.identify', {session_uuid}), 'identify old session')
107+
local stat = conn:call('queue.statistics')
108+
test:is(stat.test.tasks.taken, 1, 'taken tasks count')
109+
test:is(stat.test.tasks.done, 0, 'done tasks count')
110+
local task_replica = conn:call('queue.tube.test:ack', {task_master[1]})
111+
test:is(task_replica[3], 'testdata', 'check task data')
112+
local stat = conn:call('queue.statistics')
113+
test:is(stat.test.tasks.taken, 0, 'taken tasks count after ack()')
114+
test:is(stat.test.tasks.done, 1, 'done tasks count after ack()')
115+
116+
-- Switch roles back.
117+
conn:eval('box.cfg{read_only=true}')
118+
conn:eval([[
119+
queue_state = require('queue.abstract.queue_state')
120+
queue_state.poll(queue_state.states.WAITING, 10)
121+
]])
122+
box.cfg{read_only = false}
123+
queue_state.poll(queue_state.states.RUNNING, 10)
124+
test:is(queue.state(), 'RUNNING', 'master state is running')
125+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
126+
end)
127+
128+
test:test('Check task is cleaned after migrate', function(test)
129+
test:plan(9)
130+
local client = tnt.cluster.connect_master()
131+
local session_uuid = client:call('queue.identify')
132+
local uuid_obj = uuid.frombin(session_uuid)
133+
test:ok(queue.tube.test:put('testdata'), 'put task')
134+
test:ok(client:call('queue.tube.test:take'), 'take task from master')
135+
client:close()
136+
137+
-- Wait for disconnect callback.
138+
local attempts = 0
139+
while true do
140+
local is = box.space._queue_inactive_sessions:select()
141+
142+
if is[1] then
143+
test:is(uuid.frombin(is[1][1]):str(), uuid_obj:str(),
144+
'check inactive sessions')
145+
break
146+
end
147+
148+
attempts = attempts + 1
149+
if attempts == 10 then
150+
test:ok(false, 'check inactive sessions')
151+
return false
152+
end
153+
fiber.sleep(0.01)
154+
end
155+
156+
-- Switch roles.
157+
box.cfg{read_only = true}
158+
159+
queue_state.poll(queue_state.states.WAITING, 10)
160+
test:is(queue.state(), 'WAITING', 'master state is waiting')
161+
conn:eval('box.cfg{read_only=false}')
162+
conn:eval([[
163+
queue_state = require('queue.abstract.queue_state')
164+
queue_state.poll(queue_state.states.RUNNING, 10)
165+
]])
166+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
167+
168+
-- Check task.
169+
local stat = conn:call('queue.statistics')
170+
test:is(stat.test.tasks.taken, 1, 'taken tasks count before timeout')
171+
fiber.sleep(1.5)
172+
local stat = conn:call('queue.statistics')
173+
test:is(stat.test.tasks.taken, 0, 'taken tasks count after timeout')
174+
175+
-- Switch roles back.
176+
conn:eval('box.cfg{read_only=true}')
177+
conn:eval([[
178+
queue_state = require('queue.abstract.queue_state')
179+
queue_state.poll(queue_state.states.WAITING, 10)
180+
]])
181+
box.cfg{read_only = false}
182+
queue_state.poll(queue_state.states.RUNNING, 10)
183+
test:is(queue.state(), 'RUNNING', 'master state is running')
184+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
185+
end)
186+
187+
test:test('Check release_all method', function(test)
188+
test:plan(6)
189+
test:ok(queue.tube.test:put('testdata'), 'put task #0')
190+
test:ok(queue.tube.test:put('testdata'), 'put task #1')
191+
test:ok(queue.tube.test:take(), 'take task #0')
192+
test:ok(queue.tube.test:take(), 'take task #1')
193+
test:is(queue.statistics().test.tasks.taken, 2,
194+
'taken tasks count before release_all')
195+
queue.tube.test:release_all()
196+
test:is(queue.statistics().test.tasks.taken, 0,
197+
'taken tasks count after release_all')
198+
end)
199+
200+
rawset(_G, 'queue', nil)
201+
conn:eval('rawset(_G, "queue", nil)')
202+
conn:close()
203+
tnt.finish()
204+
os.exit(test:check() and 0 or 1)
205+
-- vim: set ft=lua :

t/tnt/init.lua

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ local fio = require('fio')
22
local log = require('log')
33
local yaml = require('yaml')
44
local errno = require('errno')
5+
local fiber = require('fiber')
6+
local popen = require('popen')
7+
local netbox = require('net.box')
58

69
local dir = os.getenv('QUEUE_TMP')
710
local cleanup = false
@@ -11,6 +14,19 @@ local vinyl_name = qc.vinyl_name
1114
local snapdir_optname = qc.snapdir_optname
1215
local logger_optname = qc.logger_optname
1316

17+
local bind_master = os.getenv('QUEUE_MASTER_ADDR')
18+
local bind_replica = os.getenv('QUEUE_REPLICA_ADDR')
19+
local dir_replica = nil
20+
local replica = nil
21+
22+
if bind_master == nil then
23+
bind_master = '127.0.0.1:3398'
24+
end
25+
26+
if bind_replica == nil then
27+
bind_replica = '127.0.0.1:3399'
28+
end
29+
1430
if dir == nil then
1531
dir = fio.tempdir()
1632
cleanup = true
@@ -37,6 +53,116 @@ local function tnt_prepare(cfg_args)
3753
box.cfg(cfg_args)
3854
end
3955

56+
-- Creates master and replica setup for queue states switching tests.
57+
local function tnt_cluster_prepare(cfg_args)
58+
-- Prepare master.
59+
cfg_args = cfg_args or {}
60+
local files = fio.glob(fio.pathjoin(dir, '*'))
61+
for _, file in pairs(files) do
62+
if fio.basename(file) ~= 'tarantool.log' then
63+
log.info("skip removing %s", file)
64+
fio.unlink(file)
65+
end
66+
end
67+
68+
cfg_args['wal_dir'] = dir
69+
cfg_args['read_only'] = false
70+
cfg_args[snapdir_optname()] = dir
71+
cfg_args[logger_optname()] = fio.pathjoin(dir, 'tarantool.log')
72+
cfg_args['listen'] = bind_master
73+
cfg_args['replication'] = {'replicator:password@' .. bind_replica,
74+
'replicator:password@' .. bind_master}
75+
if vinyl_name() then
76+
local vinyl_optname = vinyl_name() .. '_dir'
77+
cfg_args[vinyl_optname] = dir
78+
end
79+
cfg_args['replication_connect_quorum'] = 1
80+
cfg_args['replication_connect_timeout'] = 0.01
81+
82+
box.cfg(cfg_args)
83+
-- Allow guest all operations.
84+
box.schema.user.grant('guest', 'read, write, execute', 'universe')
85+
box.schema.user.create('replicator', {password = 'password'})
86+
box.schema.user.grant('replicator', 'replication')
87+
88+
-- Prepare replica.
89+
dir_replica = fio.tempdir()
90+
91+
local vinyl_opt = nil
92+
if vinyl_name() then
93+
vinyl_opt = ', ' .. vinyl_name() .. '_dir = \'' .. dir_replica .. '\''
94+
else
95+
vinyl_opt = ''
96+
end
97+
98+
local cmd_replica = {
99+
arg[-1],
100+
'-e',
101+
[[
102+
box.cfg {
103+
read_only = true,
104+
replication = 'replicator:password@]] .. bind_master ..
105+
'\', listen = \'' .. bind_replica ..
106+
'\', wal_dir = \'' .. dir_replica ..
107+
'\', ' .. snapdir_optname() .. ' = \'' .. dir_replica ..
108+
'\', ' .. logger_optname() .. ' = \'' ..
109+
fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' ..
110+
vinyl_opt ..
111+
'}'
112+
}
113+
114+
replica = popen.new(cmd_replica, {
115+
stdin = 'devnull',
116+
stdout = 'devnull',
117+
stderr = 'devnull',
118+
})
119+
120+
-- Wait for replica to connect.
121+
local id = (box.info.replication[1].uuid ~= box.info.uuid and 1) or 2
122+
local attempts = 0
123+
124+
while true do
125+
if #box.info.replication == 2 and box.info.replication[id].upstream then
126+
break
127+
end
128+
attempts = attempts + 1
129+
if attempts == 30 then
130+
error('wait for replica failed')
131+
end
132+
fiber.sleep(0.1)
133+
end
134+
end
135+
136+
local function connect_replica()
137+
if not replica then
138+
return nil
139+
end
140+
141+
return netbox.connect(bind_replica)
142+
end
143+
144+
local function connect_master()
145+
return netbox.connect(bind_master)
146+
end
147+
148+
-- Wait for replica to connect.
149+
local function wait_replica()
150+
local attempts = 0
151+
152+
while true do
153+
if #box.info.replication == 2 then
154+
return true
155+
end
156+
attempts = attempts + 1
157+
if attempts == 10 then
158+
return false
159+
end
160+
fiber.sleep(0.1)
161+
end
162+
163+
return false
164+
end
165+
40166
return {
41167
finish = function(code)
42168
local files = fio.glob(fio.pathjoin(dir, '*'))
@@ -52,6 +178,17 @@ return {
52178
log.info("rmdir %s", dir)
53179
fio.rmdir(dir)
54180
end
181+
if dir_replica then
182+
local files = fio.glob(fio.pathjoin(dir, '*'))
183+
for _, file in pairs(files) do
184+
log.info("remove %s", file)
185+
fio.unlink(file)
186+
end
187+
end
188+
if replica then
189+
replica:kill()
190+
replica:wait()
191+
end
55192
end,
56193

57194
dir = function()
@@ -77,5 +214,11 @@ return {
77214
return data
78215
end,
79216

80-
cfg = tnt_prepare
217+
cfg = tnt_prepare,
218+
cluster = {
219+
cfg = tnt_cluster_prepare,
220+
wait_replica = wait_replica,
221+
connect_replica = connect_replica,
222+
connect_master = connect_master
223+
}
81224
}

0 commit comments

Comments
 (0)