Skip to content

Commit f75d2fc

Browse files
committed
add master-replica switching tests
1 parent 7900deb commit f75d2fc

File tree

2 files changed

+344
-1
lines changed

2 files changed

+344
-1
lines changed

t/200-master-replica.t

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fio = require('fio')
4+
local tnt = require('t.tnt')
5+
local test = require('tap').test('')
6+
local uuid = require('uuid')
7+
local queue = require('queue')
8+
local fiber = require('fiber')
9+
10+
local queue_state = require('queue.abstract.queue_state')
11+
rawset(_G, 'queue', require('queue'))
12+
13+
-- Replica connection handler
14+
local conn = {}
15+
16+
test:plan(5)
17+
18+
test:test('Check master-replica setup', function(test)
19+
test:plan(8)
20+
local engine = os.getenv('ENGINE') or 'memtx'
21+
tnt.cluster.cfg{}
22+
23+
test:ok(rawget(box, 'space'), 'box started')
24+
test:ok(queue, 'queue is loaded')
25+
26+
test:ok(tnt.cluster.wait_replica(), 'wait for replica to connect')
27+
conn = tnt.cluster.connect_replica()
28+
test:ok(conn.error == nil, 'no errors on connect to replica')
29+
test:ok(conn:ping(), 'ping replica')
30+
test:is(queue.state(), 'RUNNING', 'check master queue state')
31+
conn:eval('rawset(_G, "queue", require("queue"))')
32+
test:is(conn:call('queue.state'), 'INIT', 'check replica queue state')
33+
34+
-- Setup tube. Set ttr = 0.5 for sessions expire testing.
35+
conn:call('queue.cfg', {{ttr = 0.5}})
36+
queue.cfg{ttr = 0.5}
37+
local tube = queue.create_tube('test', 'fifo', {engine = engine})
38+
test:ok(tube, 'test tube created')
39+
end)
40+
41+
test:test('Check queue state switching', function(test)
42+
test:plan(2)
43+
box.cfg{read_only = true}
44+
test:ok(queue_state.poll(queue_state.states.WAITING, 10),
45+
"queue state changed to waiting")
46+
box.cfg{read_only = false}
47+
test:ok(queue_state.poll(queue_state.states.RUNNING, 10),
48+
"queue state changed to running")
49+
end)
50+
51+
test:test('Check session resuming', function(test)
52+
test:plan(17)
53+
local client = tnt.cluster.connect_master()
54+
test:ok(client.error == nil, 'no errors on client connect to master')
55+
local session_uuid = client:call('queue.identify')
56+
local uuid_obj = uuid.frombin(session_uuid)
57+
58+
test:ok(queue.tube.test:put('testdata'), 'put task')
59+
local task_master = client:call('queue.tube.test:take')
60+
test:ok(task_master, 'task was taken')
61+
test:is(task_master[3], 'testdata', 'task.data')
62+
client:close()
63+
64+
local qt = box.space._queue_taken_2:select()
65+
test:is(uuid.frombin(qt[1][4]):str(), uuid_obj:str(),
66+
'task taken by actual uuid')
67+
68+
-- wait for disconnect collback
69+
local attempts = 0
70+
while true do
71+
local is = box.space._queue_inactive_sessions:select()
72+
73+
if is[1] then
74+
test:is(uuid.frombin(is[1][1]):str(), uuid_obj:str(),
75+
'check inactive sessions')
76+
break
77+
end
78+
79+
attempts = attempts + 1
80+
if attempts == 10 then
81+
test:ok(false, 'check inactive sessions')
82+
return false
83+
end
84+
fiber.sleep(0.01)
85+
end
86+
87+
-- switch roles
88+
box.cfg{read_only = true}
89+
queue_state.poll(queue_state.states.WAITING, 10)
90+
test:is(queue.state(), 'WAITING', 'master state is waiting')
91+
conn:eval('box.cfg{read_only=false}')
92+
conn:eval([[
93+
queue_state = require('queue.abstract.queue_state')
94+
queue_state.poll(queue_state.states.RUNNING, 10)
95+
]])
96+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
97+
98+
local cfg = conn:eval('return queue.cfg')
99+
test:is(cfg.ttr, 0.5, 'check cfg applied after lazy start')
100+
101+
test:ok(conn:call('queue.identify', {session_uuid}), 'identify old session')
102+
local stat = conn:call('queue.statistics')
103+
test:is(stat.test.tasks.taken, 1, 'taken tasks count')
104+
test:is(stat.test.tasks.done, 0, 'done tasks count')
105+
local task_replica = conn:call('queue.tube.test:ack', {task_master[1]})
106+
test:is(task_replica[3], 'testdata', 'check task data')
107+
local stat = conn:call('queue.statistics')
108+
test:is(stat.test.tasks.taken, 0, 'taken tasks count after ack()')
109+
test:is(stat.test.tasks.done, 1, 'done tasks count after ack()')
110+
111+
-- switch roles back
112+
conn:eval('box.cfg{read_only=true}')
113+
conn:eval([[
114+
queue_state = require('queue.abstract.queue_state')
115+
queue_state.poll(queue_state.states.WAITING, 10)
116+
]])
117+
box.cfg{read_only = false}
118+
queue_state.poll(queue_state.states.RUNNING, 10)
119+
test:is(queue.state(), 'RUNNING', 'master state is running')
120+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
121+
end)
122+
123+
test:test('Check task is cleaned after migrate', function(test)
124+
test:plan(9)
125+
local client = tnt.cluster.connect_master()
126+
local session_uuid = client:call('queue.identify')
127+
local uuid_obj = uuid.frombin(session_uuid)
128+
test:ok(queue.tube.test:put('testdata'), 'put task')
129+
test:ok(client:call('queue.tube.test:take'), 'take task from master')
130+
client:close()
131+
132+
-- wait for disconnect collback
133+
local attempts = 0
134+
while true do
135+
local is = box.space._queue_inactive_sessions:select()
136+
137+
if is[1] then
138+
test:is(uuid.frombin(is[1][1]):str(), uuid_obj:str(),
139+
'check inactive sessions')
140+
break
141+
end
142+
143+
attempts = attempts + 1
144+
if attempts == 10 then
145+
test:ok(false, 'check inactive sessions')
146+
return false
147+
end
148+
fiber.sleep(0.01)
149+
end
150+
151+
-- switch roles
152+
box.cfg{read_only = true}
153+
154+
queue_state.poll(queue_state.states.WAITING, 10)
155+
test:is(queue.state(), 'WAITING', 'master state is waiting')
156+
conn:eval('box.cfg{read_only=false}')
157+
conn:eval([[
158+
queue_state = require('queue.abstract.queue_state')
159+
queue_state.poll(queue_state.states.RUNNING, 10)
160+
]])
161+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
162+
163+
-- check task
164+
local stat = conn:call('queue.statistics')
165+
test:is(stat.test.tasks.taken, 1, 'taken tasks count before timeout')
166+
fiber.sleep(1)
167+
local stat = conn:call('queue.statistics')
168+
test:is(stat.test.tasks.taken, 0, 'taken tasks count after timeout')
169+
170+
-- switch roles back
171+
conn:eval('box.cfg{read_only=true}')
172+
conn:eval([[
173+
queue_state = require('queue.abstract.queue_state')
174+
queue_state.poll(queue_state.states.WAITING, 10)
175+
]])
176+
box.cfg{read_only = false}
177+
queue_state.poll(queue_state.states.RUNNING, 10)
178+
test:is(queue.state(), 'RUNNING', 'master state is running')
179+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
180+
end)
181+
182+
test:test('Check release_all method', function(test)
183+
test:plan(6)
184+
test:ok(queue.tube.test:put('testdata'), 'put task #0')
185+
test:ok(queue.tube.test:put('testdata'), 'put task #1')
186+
test:ok(queue.tube.test:take(), 'take task #0')
187+
test:ok(queue.tube.test:take(), 'take task #1')
188+
test:is(queue.statistics().test.tasks.taken, 2,
189+
'taken tasks count before release_all')
190+
queue.tube.test:release_all()
191+
test:is(queue.statistics().test.tasks.taken, 0,
192+
'taken tasks count after release_all')
193+
end)
194+
195+
rawset(_G, 'queue', nil)
196+
conn:eval('rawset(_G, "queue", nil)')
197+
conn:close()
198+
tnt.finish()
199+
os.exit(test:check() and 0 or 1)
200+
-- vim: set ft=lua :

t/tnt/init.lua

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ local fio = require('fio')
22
local log = require('log')
33
local yaml = require('yaml')
44
local errno = require('errno')
5+
local fiber = require('fiber')
6+
local popen = require('popen')
7+
local netbox = require('net.box')
58

69
local dir = os.getenv('QUEUE_TMP')
710
local cleanup = false
@@ -11,6 +14,19 @@ local vinyl_name = qc.vinyl_name
1114
local snapdir_optname = qc.snapdir_optname
1215
local logger_optname = qc.logger_optname
1316

17+
local bind_master = os.getenv('QUEUE_MASTER_ADDR')
18+
local bind_replica = os.getenv('QUEUE_REPLICA_ADDR')
19+
local dir_replica = nil
20+
local replica = nil
21+
22+
if bind_master == nil then
23+
bind_master = '127.0.0.1:3398'
24+
end
25+
26+
if bind_replica == nil then
27+
bind_replica = '127.0.0.1:3399'
28+
end
29+
1430
if dir == nil then
1531
dir = fio.tempdir()
1632
cleanup = true
@@ -37,6 +53,116 @@ local function tnt_prepare(cfg_args)
3753
box.cfg(cfg_args)
3854
end
3955

56+
-- Creates master and replica setup for queue states switching tests.
57+
local function tnt_cluster_prepare(cfg_args)
58+
-- Prepare master.
59+
cfg_args = cfg_args or {}
60+
local files = fio.glob(fio.pathjoin(dir, '*'))
61+
for _, file in pairs(files) do
62+
if fio.basename(file) ~= 'tarantool.log' then
63+
log.info("skip removing %s", file)
64+
fio.unlink(file)
65+
end
66+
end
67+
68+
cfg_args['wal_dir'] = dir
69+
cfg_args['read_only'] = false
70+
cfg_args[snapdir_optname()] = dir
71+
cfg_args[logger_optname()] = fio.pathjoin(dir, 'tarantool.log')
72+
cfg_args['listen'] = bind_master
73+
cfg_args['replication'] = {'replicator:password@' .. bind_replica,
74+
'replicator:password@' .. bind_master}
75+
if vinyl_name() then
76+
local vinyl_optname = vinyl_name() .. '_dir'
77+
cfg_args[vinyl_optname] = dir
78+
end
79+
cfg_args['replication_connect_quorum'] = 1
80+
cfg_args['replication_connect_timeout'] = 0.01
81+
82+
box.cfg(cfg_args)
83+
-- Allow guest all operations.
84+
box.schema.user.grant('guest', 'read, write, execute', 'universe')
85+
box.schema.user.create('replicator', {password = 'password'})
86+
box.schema.user.grant('replicator', 'replication')
87+
88+
-- Prepare replica.
89+
dir_replica = fio.tempdir()
90+
91+
local vinyl_opt = nil
92+
if vinyl_name() then
93+
vinyl_opt = ', ' .. vinyl_name() .. '_dir = \'' .. dir_replica .. '\''
94+
else
95+
vinyl_opt = ''
96+
end
97+
98+
local cmd_replica = {
99+
arg[-1],
100+
'-e',
101+
[[
102+
box.cfg {
103+
read_only = true,
104+
replication = 'replicator:password@]] .. bind_master ..
105+
'\', listen = \'' .. bind_replica ..
106+
'\', wal_dir = \'' .. dir_replica ..
107+
'\', ' .. snapdir_optname() .. ' = \'' .. dir_replica ..
108+
'\', ' .. logger_optname() .. ' = \'' ..
109+
fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' ..
110+
vinyl_opt ..
111+
'}'
112+
}
113+
114+
replica = popen.new(cmd_replica, {
115+
stdin = 'devnull',
116+
stdout = 'devnull',
117+
stderr = 'devnull',
118+
})
119+
120+
-- Wait for replica to connect.
121+
local id = (box.info.replication[1].uuid ~= box.info.uuid and 1) or 2
122+
local attempts = 0
123+
124+
while true do
125+
if #box.info.replication == 2 and box.info.replication[id].upstream then
126+
break
127+
end
128+
attempts = attempts + 1
129+
if attempts == 30 then
130+
error('wait for replica failed')
131+
end
132+
fiber.sleep(0.1)
133+
end
134+
end
135+
136+
local function connect_replica()
137+
if not replica then
138+
return nil
139+
end
140+
141+
return netbox.connect(bind_replica)
142+
end
143+
144+
local function connect_master()
145+
return netbox.connect(bind_master)
146+
end
147+
148+
-- Wait for replica to connect.
149+
local function wait_replica()
150+
local attempts = 0
151+
152+
while true do
153+
if #box.info.replication == 2 then
154+
return true
155+
end
156+
attempts = attempts + 1
157+
if attempts == 10 then
158+
return false
159+
end
160+
fiber.sleep(0.1)
161+
end
162+
163+
return false
164+
end
165+
40166
return {
41167
finish = function(code)
42168
local files = fio.glob(fio.pathjoin(dir, '*'))
@@ -52,6 +178,17 @@ return {
52178
log.info("rmdir %s", dir)
53179
fio.rmdir(dir)
54180
end
181+
if dir_replica then
182+
local files = fio.glob(fio.pathjoin(dir, '*'))
183+
for _, file in pairs(files) do
184+
log.info("remove %s", file)
185+
fio.unlink(file)
186+
end
187+
end
188+
if replica then
189+
replica:kill()
190+
replica:wait()
191+
end
55192
end,
56193

57194
dir = function()
@@ -77,5 +214,11 @@ return {
77214
return data
78215
end,
79216

80-
cfg = tnt_prepare
217+
cfg = tnt_prepare,
218+
cluster = {
219+
cfg = tnt_cluster_prepare,
220+
wait_replica = wait_replica,
221+
connect_replica = connect_replica,
222+
connect_master = connect_master
223+
}
81224
}

0 commit comments

Comments
 (0)