Skip to content

Commit ac1a9af

Browse files
0x501DLeonidVas
authored andcommitted
cfg: add in_replicaset option
Running a queue in master-replica mode requires that _queue_inactive_sessions and _queue_taken_2 were not temporary spases. This results in a 20 percent performance loss. The `in_replicaset` option allows you to get the same performance in single mode. Follows up #120
1 parent d0c89d6 commit ac1a9af

File tree

6 files changed

+223
-36
lines changed

6 files changed

+223
-36
lines changed

README.md

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,9 @@ tuples for each job which is processing a task in the queue.
271271
1. `session_uuid` - session UUID (string)
272272
1. `time` - the time when the client began to execute the task
273273

274-
The `_queue_session_ids` temporary space contains a map: connection id (box
275-
session id) to the session UUID.
274+
The `_queue_session_ids` space contains a map: connection id (box
275+
session id) to the session UUID. This space is temporary if `in_replicaset`
276+
is set to false.
276277

277278
## Fields of the `_queue_session_ids` space
278279

@@ -284,6 +285,8 @@ session id) to the session UUID.
284285
1. `uuid` - session UUID (string)
285286
2. `exp_time` - session expiration time (numeric)
286287

288+
This space is temporary if `in_replicaset` is set to false.
289+
287290
Also, there is a space which is associated with each queue,
288291
which is named in the `space` field of the `_queue` space.
289292
The associated space contains one tuple for each task.
@@ -456,6 +459,9 @@ If an invalid value or an unknown option is used, an error will be thrown.
456459
Available `options`:
457460
* `ttr` - time to release in seconds. The time after which, if there is no active
458461
connection in the session, it will be released with all its tasks.
462+
* `in_replicaset` - enable replication mode. Must be true if the queue is used
463+
in master and replica mode. With replication mode enabled, the potential loss of
464+
performance can be ~20% compared to single mode. Default value is false.
459465

460466
## Session identify
461467

@@ -746,6 +752,11 @@ Usage example:
746752

747753
```lua
748754
-- Instance file for the master.
755+
queue = require("queue")
756+
-- Queue is in replicaset.
757+
-- Clean up session after 5 minutes after disconnect.
758+
queue.cfg({ttr = 300, in_replicaset = true})
759+
749760
box.cfg{
750761
listen = 3301,
751762
replication = {'replicator:password@127.0.0.1:3301', -- Master URI.
@@ -758,23 +769,23 @@ box.once("schema", function()
758769
box.schema.user.grant('replicator', 'replication') -- grant replication role
759770
end)
760771

761-
queue = require("queue")
762-
queue.cfg({ttr = 300}) -- Clean up session after 5 minutes after disconnect.
763772
require('console').start()
764773
os.exit()
765774
```
766775

767776
```lua
768777
-- Instance file for the replica.
778+
queue = require("queue")
779+
-- Queue is in replicaset.
780+
-- Clean up session after 5 minutes after disconnect.
781+
queue.cfg({ttr = 300, in_replicaset = true})
769782
box.cfg{
770783
listen = 3302,
771784
replication = {'replicator:password@127.0.0.1:3301', -- Master URI.
772785
'replicator:password@127.0.0.1:3302'}, -- Replica URI.
773786
read_only = true
774787
}
775788

776-
queue = require("queue")
777-
queue.cfg({ttr = 300}) -- Clean up session after 5 minutes after disconnect.
778789
require('console').start()
779790
os.exit()
780791
```

queue/abstract.lua

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -406,13 +406,14 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
406406
-- task was removed
407407
if task == nil then return end
408408

409+
-- We cannot use a local variable to access the space `_queue_taken_2`
410+
-- because it can be recreated in `switch_in_replicaset()`.
409411
local queue_consumers = box.space._queue_consumers
410-
local queue_taken = box.space._queue_taken_2
411412

412413
-- if task was taken and become other state
413-
local taken = queue_taken.index.task:get{tube_id, task[1]}
414+
local taken = box.space._queue_taken_2.index.task:get{tube_id, task[1]}
414415
if taken ~= nil then
415-
queue_taken:delete{taken[1], taken[2]}
416+
box.space._queue_taken_2:delete{taken[1], taken[2]}
416417
end
417418
-- task switched to ready (or new task)
418419
if task[2] == state.READY then
@@ -432,7 +433,7 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
432433
elseif task[2] == state.TAKEN then
433434
local conn_id = connection.id()
434435
local session_uuid = session.identify(conn_id)
435-
queue_taken:insert{
436+
box.space._queue_taken_2:insert{
436437
self.tube_id,
437438
task[1],
438439
conn_id,
@@ -610,6 +611,67 @@ function method.create_tube(tube_name, tube_type, opts)
610611
return self
611612
end
612613

614+
--
615+
-- Replicaset mode switch.
616+
--
617+
-- Running a queue in master-replica mode requires that
618+
-- `_queue_taken_2` space was not temporary.
619+
-- When the queue is running in single mode,
620+
-- the space is converted to temporary mode to increase performance.
621+
--
622+
local function switch_in_replicaset(mode)
623+
if mode == nil then
624+
log.warn('queue: queue required after box.cfg{}')
625+
mode = false
626+
end
627+
628+
if not box.space._queue_taken_2 then
629+
return
630+
end
631+
632+
if box.space._queue_taken_2.temporary and mode == false then
633+
return
634+
end
635+
636+
if not box.space._queue_taken_2.temporary and mode == true then
637+
return
638+
end
639+
640+
box.schema.create_space('_queue_taken_2_mgr', {
641+
temporary = not mode,
642+
format = {
643+
{name = 'tube_id', type = num_type()},
644+
{name = 'task_id', type = num_type()},
645+
{name = 'connection_id', type = num_type()},
646+
{name = 'session_uuid', type = str_type()},
647+
{name = 'taken_time', type = num_type()}
648+
}})
649+
650+
box.space._queue_taken_2_mgr:create_index('task', {
651+
type = 'tree',
652+
parts = {1, num_type(), 2, num_type()},
653+
unique = true
654+
})
655+
box.space._queue_taken_2_mgr:create_index('uuid', {
656+
type = 'tree',
657+
parts = {4, str_type()},
658+
unique = false
659+
})
660+
661+
box.begin() -- Disable implicit yields until the transaction ends.
662+
for _, tuple in box.space._queue_taken_2:pairs() do
663+
box.space._queue_taken_2_mgr:insert(tuple)
664+
end
665+
666+
box.space._queue_taken_2:drop()
667+
box.space._queue_taken_2_mgr:rename('_queue_taken_2')
668+
669+
local status, err = pcall(box.commit)
670+
if not status then
671+
error(('Error migrate _queue_taken_2: %s'):format(tostring(err)))
672+
end
673+
end
674+
613675
-- create or join infrastructure
614676
function method.start()
615677
-- tube_name, tube_id, space_name, tube_type, opts
@@ -666,11 +728,11 @@ function method.start()
666728
box.space._queue_taken:drop()
667729
end
668730

669-
local _taken = box.space._queue_taken_2
670-
if _taken == nil then
731+
local _mode = queue.cfg['in_replicaset'] or false
732+
if box.space._queue_taken_2 == nil then
671733
-- tube_id, task_id, connection_id, session_uuid, time
672-
_taken = box.schema.create_space('_queue_taken_2', {
673-
temporary = false,
734+
box.schema.create_space('_queue_taken_2', {
735+
temporary = not _mode,
674736
format = {
675737
{name = 'tube_id', type = num_type()},
676738
{name = 'task_id', type = num_type()},
@@ -679,22 +741,18 @@ function method.start()
679741
{name = 'taken_time', type = num_type()}
680742
}})
681743

682-
_taken:create_index('task', {
744+
box.space._queue_taken_2:create_index('task', {
683745
type = 'tree',
684746
parts = {1, num_type(), 2, num_type()},
685747
unique = true
686748
})
687-
_taken:create_index('uuid', {
749+
box.space._queue_taken_2:create_index('uuid', {
688750
type = 'tree',
689751
parts = {4, str_type()},
690752
unique = false
691753
})
692754
else
693-
-- Upgrade space, queue states require that this space
694-
-- was not temporary.
695-
if _taken.temporary then
696-
_taken:alter{temporary = false}
697-
end
755+
switch_in_replicaset(queue.cfg['in_replicaset'])
698756
end
699757

700758
for _, tube_tuple in _queue:pairs() do
@@ -787,16 +845,28 @@ local function cfg(self, opts)
787845
opts = opts or {}
788846
local session_opts = {}
789847

848+
-- Set default in_replicaset value.
849+
if opts['in_replicaset'] == nil then
850+
opts['in_replicaset'] = false
851+
end
852+
790853
-- Check all options before configuring so that
791854
-- the configuration is done transactionally.
792855
for key, val in pairs(opts) do
793856
if key == 'ttr' then
794857
session_opts[key] = val
858+
elseif key == 'in_replicaset' then
859+
if type(val) ~= 'boolean' then
860+
error('Invalid value of in_replicaset: ' .. tostring(val))
861+
end
862+
session_opts[key] = val
795863
else
796864
error('Unknown option ' .. tostring(key))
797865
end
798866
end
799867

868+
switch_in_replicaset(opts['in_replicaset'])
869+
800870
-- Configuring the queue_session module.
801871
session.cfg(session_opts)
802872

queue/abstract/queue_session.lua

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,59 @@ local str_type = qc.str_type
1010

1111
local queue_session = {}
1212

13+
--
14+
-- Replicaset mode switch.
15+
--
16+
-- Running a queue in master-replica mode requires that
17+
-- `_queue_inactive_sessions` space was not temporary.
18+
-- When the queue is running in single mode,
19+
-- the space is converted to temporary mode to increase performance.
20+
--
21+
local function switch_in_replicaset(mode)
22+
if mode == nil then
23+
mode = false
24+
end
25+
26+
if not box.space._queue_inactive_sessions then
27+
return
28+
end
29+
30+
if box.space._queue_inactive_sessions.temporary and mode == false then
31+
return
32+
end
33+
34+
if not box.space._queue_inactive_sessions.temporary and mode == true then
35+
return
36+
end
37+
38+
box.schema.create_space('_queue_inactive_sessions_mgr', {
39+
temporary = not mode,
40+
format = {
41+
{ name = 'uuid', type = str_type() },
42+
{ name = 'exp_time', type = num_type() }
43+
}
44+
})
45+
46+
box.space._queue_inactive_sessions_mgr:create_index('uuid', {
47+
type = 'tree',
48+
parts = { 1, str_type() },
49+
unique = true
50+
})
51+
52+
box.begin() -- Disable implicit yields until the transaction ends.
53+
for _, tuple in box.space._queue_inactive_sessions:pairs() do
54+
box.space._queue_inactive_sessions_mgr:insert(tuple)
55+
end
56+
57+
box.space._queue_inactive_sessions:drop()
58+
box.space._queue_inactive_sessions_mgr:rename('_queue_inactive_sessions')
59+
60+
local status, err = pcall(box.commit)
61+
if not status then
62+
error(('Error migrate _queue_inactive_sessions: %s'):format(tostring(err)))
63+
end
64+
end
65+
1366
--- Create everything that's needed to work with "shared" sessions.
1467
local function identification_init()
1568
local queue_session_ids = box.space._queue_session_ids
@@ -34,21 +87,23 @@ local function identification_init()
3487
})
3588
end
3689

37-
local queue_inactive_sessions = box.space._queue_inactive_sessions
38-
if queue_inactive_sessions == nil then
39-
queue_inactive_sessions = box.schema.create_space('_queue_inactive_sessions', {
40-
temporary = false,
90+
local _mode = queue_session.cfg['in_replicaset'] or false
91+
if box.space._queue_inactive_sessions == nil then
92+
box.schema.create_space('_queue_inactive_sessions', {
93+
temporary = not _mode,
4194
format = {
4295
{ name = 'uuid', type = str_type() },
4396
{ name = 'exp_time', type = num_type() }
4497
}
4598
})
4699

47-
queue_inactive_sessions:create_index('uuid', {
100+
box.space._queue_inactive_sessions:create_index('uuid', {
48101
type = 'tree',
49102
parts = { 1, str_type() },
50103
unique = true
51104
})
105+
else
106+
switch_in_replicaset(queue_session.cfg['in_replicaset'])
52107
end
53108
end
54109

@@ -95,7 +150,6 @@ end
95150
-- an error will be thrown.
96151
local function identify(conn_id, session_uuid)
97152
local queue_session_ids = box.space._queue_session_ids
98-
local queue_inactive_sessions = box.space._queue_inactive_sessions
99153
local session_ids = queue_session_ids:get(conn_id)
100154
local cur_uuid = session_ids and session_ids[2]
101155

@@ -116,7 +170,7 @@ local function identify(conn_id, session_uuid)
116170
-- Check that a session with this uuid exists.
117171
local ids_by_uuid = queue_session_ids.index.uuid:select(
118172
session_uuid, { limit = 1 })[1]
119-
local inactive_session = queue_inactive_sessions:get(session_uuid)
173+
local inactive_session = box.space._queue_inactive_sessions:get(session_uuid)
120174
if ids_by_uuid == nil and inactive_session == nil then
121175
error('The UUID ' .. uuid.frombin(session_uuid):str() ..
122176
' is unknown.')
@@ -132,7 +186,7 @@ local function identify(conn_id, session_uuid)
132186
end
133187

134188
-- Exclude the session from inactive.
135-
queue_inactive_sessions:delete{cur_uuid}
189+
box.space._queue_inactive_sessions:delete{cur_uuid}
136190

137191
return cur_uuid
138192
end
@@ -148,7 +202,6 @@ end
148202
-- release its tasks if necessary.
149203
local function disconnect(conn_id)
150204
local queue_session_ids = box.space._queue_session_ids
151-
local queue_inactive_sessions = box.space._queue_inactive_sessions
152205
local session_uuid = queue_session.identify(conn_id)
153206

154207
queue_session_ids:delete{conn_id}
@@ -160,7 +213,7 @@ local function disconnect(conn_id)
160213
if session_ids == nil then
161214
local ttr = queue_session.cfg['ttr'] or 0
162215
if ttr > 0 then
163-
queue_inactive_sessions:insert{session_uuid, util.event_time(ttr)}
216+
box.space._queue_inactive_sessions:insert{session_uuid, util.event_time(ttr)}
164217
elseif queue_session._on_session_remove ~= nil then
165218
queue_session._on_session_remove(session_uuid)
166219
end
@@ -192,6 +245,8 @@ local function validate_opts(opts)
192245
if type(val) ~= 'number' or val < 0 then
193246
error('Invalid value of ttr: ' .. tostring(val))
194247
end
248+
elseif key == 'in_replicaset' then
249+
-- do nothing
195250
else
196251
error('Unknown option ' .. tostring(key))
197252
end
@@ -210,6 +265,8 @@ local function cfg(self, opts)
210265
for key, val in pairs(opts) do
211266
self[key] = val
212267
end
268+
269+
switch_in_replicaset(self['in_replicaset'])
213270
end
214271

215272
local function exist_inactive(session_uuid)

queue/abstract/queue_state.lua

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ end
4444

4545
local function max_lag()
4646
local max_lag = 0
47+
local n_replica = table.maxn(box.info.replication)
4748

48-
for i = 1, #box.info.replication do
49+
for i = 1, n_replica do
4950
if box.info.replication[i].upstream then
5051
local lag = box.info.replication[i].upstream.lag
5152
if lag > max_lag then

0 commit comments

Comments
 (0)