Skip to content

Commit 41d8618

Browse files
committed
add master replica switching tests
1 parent 3e2adb6 commit 41d8618

File tree

3 files changed

+322
-9
lines changed

3 files changed

+322
-9
lines changed

t/040-utubettl.t

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ test:test('read_only test', function(test)
226226
tube:put('abc', { delay = 0.1 })
227227
local ttl_fiber = tube.raw.fiber
228228
box.cfg{ read_only = true }
229-
local rc = queue_state.poll_waiting(10)
230-
test:ok(rc, "queue state changed to waiting")
229+
test:ok(queue_state.poll_waiting(10), "queue state changed to waiting")
231230
fiber.sleep(0.11)
232231
test:is(ttl_fiber:status(), 'dead',
233232
"check that background fiber is canceled")
@@ -236,8 +235,7 @@ test:test('read_only test', function(test)
236235
local stat, err = pcall(tube.take, tube, 0.2)
237236
test:is(stat, false, "check that task wasn't taken")
238237
box.cfg{ read_only = false }
239-
local rc = queue_state.poll_running(10)
240-
test:ok(rc, "queue state changed to running")
238+
test:ok(queue_state.poll_running(10), "queue state changed to running")
241239
test:is(tube.raw.fiber:status(), 'suspended',
242240
"check that background fiber started")
243241
local task = tube:take()

t/200-master-replica.t

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fio = require('fio')
4+
local tnt = require('t.tnt')
5+
local test = require('tap').test('')
6+
local uuid = require('uuid')
7+
local queue = require('queue')
8+
local fiber = require('fiber')
9+
10+
local queue_state = require('queue.abstract.queue_state')
11+
rawset(_G, 'queue', require('queue'))
12+
13+
test:plan(12)
14+
15+
local engine = os.getenv('ENGINE') or 'memtx'
16+
17+
tnt.cfg_cluster{}
18+
19+
test:ok(rawget(box, 'space'), 'box started')
20+
test:ok(queue, 'queue is loaded')
21+
22+
test:ok(tnt.wait_for_replica(), 'wait for replica to connect')
23+
local conn = tnt.connect_replica()
24+
test:ok(conn.error == nil, 'no errors on connect to replica')
25+
test:ok(conn:ping(), 'ping replica')
26+
test:is(queue.state(), 'RUNNING', 'check master queue state')
27+
test:is(conn:eval('return require("queue").state()'), 'INIT',
28+
'check replica queue state')
29+
30+
conn:eval('rawset(_G, "queue", require("queue"))')
31+
conn:call('queue.cfg', {{ttr = 0.5}})
32+
33+
-- setup tube
34+
queue.cfg{ttr = 0.5}
35+
local tube = queue.create_tube('test', 'fifo', {engine = engine})
36+
test:ok(tube, 'test tube created')
37+
38+
test:test('Check queue state switching', function(test)
39+
test:plan(2)
40+
box.cfg{read_only = true}
41+
test:ok(queue_state.poll_waiting(10), "queue state changed to waiting")
42+
box.cfg{read_only = false}
43+
test:ok(queue_state.poll_running(10), "queue state changed to running")
44+
end)
45+
46+
test:test('Check session resuming', function(test)
47+
test:plan(17)
48+
local client = tnt.connect_master()
49+
test:ok(client.error == nil, 'no errors on client connect to master')
50+
local session_uuid = client:call('queue.identify')
51+
local uuid_obj = uuid.frombin(session_uuid)
52+
53+
test:ok(queue.tube.test:put('testdata'), 'put task')
54+
local task_master = client:call('queue.tube.test:take')
55+
test:ok(task_master, 'task was taken')
56+
test:is(task_master[3], 'testdata', 'task.data')
57+
client:close()
58+
59+
local qt = box.space._queue_taken_2:select()
60+
test:is(uuid.frombin(qt[1][4]):str(), uuid_obj:str(),
61+
'task taken by actual uuid')
62+
63+
-- wait for disconnect collback
64+
local attempts = 0
65+
while true do
66+
local is = box.space._queue_inactive_sessions:select()
67+
68+
if is[1] then
69+
test:is(uuid.frombin(is[1][1]):str(), uuid_obj:str(),
70+
'check inactive sessions')
71+
break
72+
end
73+
74+
attempts = attempts + 1
75+
if attempts == 10 then
76+
test:ok(false, 'check inactive sessions')
77+
return false
78+
end
79+
fiber.sleep(0.01)
80+
end
81+
82+
-- switch roles
83+
box.cfg{read_only = true}
84+
queue_state.poll_waiting(10)
85+
test:is(queue.state(), 'WAITING', 'master state is waiting')
86+
conn:eval('box.cfg{read_only=false}')
87+
conn:eval('require("queue.abstract.queue_state").poll_running(10)')
88+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
89+
90+
local cfg = conn:eval('return queue.cfg')
91+
test:is(cfg.ttr, 0.5, 'check cfg applied after lazy start')
92+
93+
test:ok(conn:call('queue.identify', {session_uuid}), 'identify old session')
94+
local stat = conn:call('queue.statistics')
95+
test:is(stat.test.tasks.taken, 1, 'taken tasks count')
96+
test:is(stat.test.tasks.done, 0, 'done tasks count')
97+
local task_replica = conn:call('queue.tube.test:ack', {task_master[1]})
98+
test:is(task_replica[3], 'testdata', 'check task data')
99+
local stat = conn:call('queue.statistics')
100+
test:is(stat.test.tasks.taken, 0, 'taken tasks count after ack()')
101+
test:is(stat.test.tasks.done, 1, 'done tasks count after ack()')
102+
103+
-- switch roles back
104+
conn:eval('box.cfg{read_only=true}')
105+
conn:eval('require("queue.abstract.queue_state").poll_waiting(10)')
106+
box.cfg{read_only = false}
107+
queue_state.poll_running(10)
108+
test:is(queue.state(), 'RUNNING', 'master state is running')
109+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
110+
end)
111+
112+
test:test('Check task is cleaned after migrate', function(test)
113+
test:plan(9)
114+
local client = tnt.connect_master()
115+
local session_uuid = client:call('queue.identify')
116+
local uuid_obj = uuid.frombin(session_uuid)
117+
test:ok(queue.tube.test:put('testdata'), 'put task')
118+
test:ok(client:call('queue.tube.test:take'), 'take task from master')
119+
client:close()
120+
121+
-- wait for disconnect collback
122+
local attempts = 0
123+
while true do
124+
local is = box.space._queue_inactive_sessions:select()
125+
126+
if is[1] then
127+
test:is(uuid.frombin(is[1][1]):str(), uuid_obj:str(),
128+
'check inactive sessions')
129+
break
130+
end
131+
132+
attempts = attempts + 1
133+
if attempts == 10 then
134+
test:ok(false, 'check inactive sessions')
135+
return false
136+
end
137+
fiber.sleep(0.01)
138+
end
139+
140+
-- switch roles
141+
box.cfg{read_only = true}
142+
143+
queue_state.poll_waiting(10)
144+
test:is(queue.state(), 'WAITING', 'master state is waiting')
145+
conn:eval('box.cfg{read_only=false}')
146+
conn:eval('require("queue.abstract.queue_state").poll_running(10)')
147+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
148+
149+
-- check task
150+
local stat = conn:call('queue.statistics')
151+
test:is(stat.test.tasks.taken, 1, 'taken tasks count before timeout')
152+
fiber.sleep(1)
153+
local stat = conn:call('queue.statistics')
154+
test:is(stat.test.tasks.taken, 0, 'taken tasks count after timeout')
155+
156+
-- switch roles back
157+
conn:eval('box.cfg{read_only=true}')
158+
conn:eval('require("queue.abstract.queue_state").poll_waiting(10)')
159+
box.cfg{read_only = false}
160+
queue_state.poll_running(10)
161+
test:is(queue.state(), 'RUNNING', 'master state is running')
162+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
163+
end)
164+
165+
test:test('Check release_all method', function(test)
166+
test:plan(6)
167+
test:ok(queue.tube.test:put('testdata'), 'put task #0')
168+
test:ok(queue.tube.test:put('testdata'), 'put task #1')
169+
test:ok(queue.tube.test:take(), 'take task #0')
170+
test:ok(queue.tube.test:take(), 'take task #1')
171+
test:is(queue.statistics().test.tasks.taken, 2,
172+
'taken tasks count before release_all')
173+
queue.tube.test:release_all()
174+
test:is(queue.statistics().test.tasks.taken, 0,
175+
'taken tasks count after release_all')
176+
end)
177+
178+
rawset(_G, 'queue', nil)
179+
conn:eval('rawset(_G, "queue", nil)')
180+
conn:close()
181+
tnt.finish()
182+
os.exit(test:check() and 0 or 1)
183+
-- vim: set ft=lua :

t/tnt/init.lua

Lines changed: 137 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
local fio = require('fio')
2-
local log = require('log')
3-
local yaml = require('yaml')
4-
local errno = require('errno')
1+
local fio = require('fio')
2+
local log = require('log')
3+
local yaml = require('yaml')
4+
local errno = require('errno')
5+
local fiber = require('fiber')
6+
local popen = require('popen')
7+
local netbox = require('net.box')
58

69
local dir = os.getenv('QUEUE_TMP')
710
local cleanup = false
@@ -11,6 +14,11 @@ local vinyl_name = qc.vinyl_name
1114
local snapdir_optname = qc.snapdir_optname
1215
local logger_optname = qc.logger_optname
1316

17+
local bind_master = '127.0.0.1:3398'
18+
local bind_replica = '127.0.0.1:3399'
19+
local dir_replica = nil
20+
local replica = nil
21+
1422
if dir == nil then
1523
dir = fio.tempdir()
1624
cleanup = true
@@ -37,6 +45,114 @@ local function tnt_prepare(cfg_args)
3745
box.cfg(cfg_args)
3846
end
3947

48+
local function tnt_cluster_prepare(cfg_args)
49+
-- prepare master
50+
cfg_args = cfg_args or {}
51+
local files = fio.glob(fio.pathjoin(dir, '*'))
52+
for _, file in pairs(files) do
53+
if fio.basename(file) ~= 'tarantool.log' then
54+
log.info("skip removing %s", file)
55+
fio.unlink(file)
56+
end
57+
end
58+
59+
cfg_args['wal_dir'] = dir
60+
cfg_args['read_only'] = false
61+
cfg_args[snapdir_optname()] = dir
62+
cfg_args[logger_optname()] = fio.pathjoin(dir, 'tarantool.log')
63+
cfg_args['listen'] = bind_master
64+
cfg_args['replication'] = {'replicator:password@' .. bind_replica,
65+
'replicator:password@' .. bind_master}
66+
if vinyl_name() then
67+
local vinyl_optname = vinyl_name() .. '_dir'
68+
cfg_args[vinyl_optname] = dir
69+
end
70+
cfg_args['replication_connect_quorum'] = 1
71+
cfg_args['replication_connect_timeout'] = 0.01
72+
73+
box.cfg(cfg_args)
74+
box.schema.user.grant('guest', 'read, write, execute', 'universe')
75+
box.schema.user.create('replicator', {password = 'password'})
76+
box.schema.user.grant('replicator', 'replication')
77+
78+
-- prepare replica
79+
dir_replica = fio.tempdir()
80+
81+
local vinyl_opt = nil
82+
if vinyl_name() then
83+
vinyl_opt = ', ' .. vinyl_name() .. '_dir = \'' .. dir_replica .. '\''
84+
else
85+
vinyl_opt = ''
86+
end
87+
88+
local cmd_replica = {
89+
arg[-1],
90+
'-e',
91+
[[
92+
box.cfg {
93+
read_only = true,
94+
replication = 'replicator:password@]] .. bind_master ..
95+
'\', listen = \'' .. bind_replica ..
96+
'\', wal_dir = \'' .. dir_replica ..
97+
'\', ' .. snapdir_optname() .. ' = \'' .. dir_replica ..
98+
'\', ' .. logger_optname() .. ' = \'' ..
99+
fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' ..
100+
vinyl_opt ..
101+
'}'
102+
}
103+
104+
replica = popen.new(cmd_replica, {
105+
stdin = 'devnull',
106+
stdout = 'devnull',
107+
stderr = 'devnull',
108+
})
109+
110+
-- wait for replica to connect
111+
local id = (box.info.replication[1].uuid ~= box.info.uuid and 1) or 2
112+
local attempts = 0
113+
114+
while true do
115+
if #box.info.replication == 2 and box.info.replication[id].upstream then
116+
break
117+
end
118+
attempts = attempts + 1
119+
if attempts == 30 then
120+
error('wait for replica failed')
121+
end
122+
fiber.sleep(0.1)
123+
end
124+
end
125+
126+
local function connect_replica()
127+
if not replica then
128+
return nil
129+
end
130+
131+
return netbox.connect(bind_replica)
132+
end
133+
134+
local function connect_master()
135+
return netbox.connect(bind_master)
136+
end
137+
138+
-- Wait for replica to connect.
139+
local function wait_for_replica()
140+
local attempts = 0
141+
142+
while true do
143+
if #box.info.replication == 2 then
144+
return true
145+
end
146+
attempts = attempts + 1
147+
if attempts == 10 then
148+
return false
149+
end
150+
fiber.sleep(0.1)
151+
end
152+
153+
return false
154+
end
155+
40156
return {
41157
finish = function(code)
42158
local files = fio.glob(fio.pathjoin(dir, '*'))
@@ -52,6 +168,18 @@ return {
52168
log.info("rmdir %s", dir)
53169
fio.rmdir(dir)
54170
end
171+
if dir_replica then
172+
local files = fio.glob(fio.pathjoin(dir, '*'))
173+
for _, file in pairs(files) do
174+
log.info("remove %s", file)
175+
fio.unlink(file)
176+
print("clean replica " .. file)
177+
end
178+
end
179+
if replica then
180+
replica:kill()
181+
replica:wait()
182+
end
55183
end,
56184

57185
dir = function()
@@ -77,5 +205,9 @@ return {
77205
return data
78206
end,
79207

80-
cfg = tnt_prepare
208+
cfg = tnt_prepare,
209+
cfg_cluster = tnt_cluster_prepare,
210+
wait_for_replica = wait_for_replica,
211+
connect_replica = connect_replica,
212+
connect_master = connect_master
81213
}

0 commit comments

Comments
 (0)