Skip to content

Commit 7181e7d

Browse files
committed
cfg: add in_replicaset option
Running a queue in master-replica mode requires that _queue_inactive_sessions and _queue_taken_2 were not temporary spases. This results in a 20 percent performance loss. The `in_replicaset` option allows you to get the same performance in single mode. Follows up #120
1 parent 1eb7cd9 commit 7181e7d

File tree

6 files changed

+222
-39
lines changed

6 files changed

+222
-39
lines changed

README.md

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,9 @@ tuples for each job which is processing a task in the queue.
271271
1. `session_uuid` - session UUID (string)
272272
1. `time` - the time when the client began to execute the task
273273

274-
The `_queue_session_ids` temporary space contains a map: connection id (box
275-
session id) to the session UUID.
274+
The `_queue_session_ids` space contains a map: connection id (box
275+
session id) to the session UUID. This space is temporary if `in_replicaset`
276+
is set to false.
276277

277278
## Fields of the `_queue_session_ids` space
278279

@@ -284,6 +285,8 @@ session id) to the session UUID.
284285
1. `uuid` - session UUID (string)
285286
2. `exp_time` - session expiration time (numeric)
286287

288+
This space is temporary if `in_replicaset` is set to false.
289+
287290
Also, there is a space which is associated with each queue,
288291
which is named in the `space` field of the `_queue` space.
289292
The associated space contains one tuple for each task.
@@ -441,6 +444,8 @@ If an invalid value or an unknown option is used, an error will be thrown.
441444
Available `options`:
442445
* `ttr` - time to release in seconds. The time after which, if there is no active
443446
connection in the session, it will be released with all its tasks.
447+
* `in_replicaset` - enable replication mode. Must be true if the queue is used
448+
in master and replica mode. Default value is false.
444449

445450
## Session identify
446451

@@ -731,6 +736,11 @@ Usage example:
731736

732737
```lua
733738
-- Instance file for the master.
739+
queue = require("queue")
740+
-- Queue is in replicaset.
741+
-- Clean up session after 5 minutes after disconnect.
742+
queue.cfg({ttr = 300, in_replicaset = true})
743+
734744
box.cfg{
735745
listen = 3301,
736746
replication = {'replicator:password@127.0.0.1:3301', -- Master URI.
@@ -743,23 +753,23 @@ box.once("schema", function()
743753
box.schema.user.grant('replicator', 'replication') -- grant replication role
744754
end)
745755

746-
queue = require("queue")
747-
queue.cfg({ttr = 300}) -- Clean up session after 5 minutes after disconnect.
748756
require('console').start()
749757
os.exit()
750758
```
751759

752760
```lua
753761
-- Instance file for the replica.
762+
queue = require("queue")
763+
-- Queue is in replicaset.
764+
-- Clean up session after 5 minutes after disconnect.
765+
queue.cfg({ttr = 300, in_replicaset = true})
754766
box.cfg{
755767
listen = 3302,
756768
replication = {'replicator:password@127.0.0.1:3301', -- Master URI.
757769
'replicator:password@127.0.0.1:3302'}, -- Replica URI.
758770
read_only = true
759771
}
760772

761-
queue = require("queue")
762-
queue.cfg({ttr = 300}) -- Clean up session after 5 minutes after disconnect.
763773
require('console').start()
764774
os.exit()
765775
```
@@ -846,9 +856,6 @@ Driver class must implement the following API:
846856
1. `create_space` - creates the supporting space. The arguments are:
847857
* space name
848858
* space options
849-
850-
Driver class should implement the following API:
851-
852859
1. `start` - initialize internal resources if any, e.g. start fibers.
853860
1. `stop` - clean up internal resources if any, e.g. stop fibers.
854861

queue/abstract.lua

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -398,13 +398,14 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
398398
-- task was removed
399399
if task == nil then return end
400400

401+
-- We cannot use a local variable to access the space `_queue_taken_2`
402+
-- because it can be recreated in `switch_in_replicaset()`.
401403
local queue_consumers = box.space._queue_consumers
402-
local queue_taken = box.space._queue_taken_2
403404

404405
-- if task was taken and become other state
405-
local taken = queue_taken.index.task:get{tube_id, task[1]}
406+
local taken = box.space._queue_taken_2.index.task:get{tube_id, task[1]}
406407
if taken ~= nil then
407-
queue_taken:delete{taken[1], taken[2]}
408+
box.space._queue_taken_2:delete{taken[1], taken[2]}
408409
end
409410
-- task switched to ready (or new task)
410411
if task[2] == state.READY then
@@ -424,7 +425,7 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
424425
elseif task[2] == state.TAKEN then
425426
local conn_id = connection.id()
426427
local session_uuid = session.identify(conn_id)
427-
queue_taken:insert{
428+
box.space._queue_taken_2:insert{
428429
self.tube_id,
429430
task[1],
430431
conn_id,
@@ -601,6 +602,67 @@ function method.create_tube(tube_name, tube_type, opts)
601602
return self
602603
end
603604

605+
--
606+
-- Replicaset mode switch.
607+
--
608+
-- Running a queue in master-replica mode requires that
609+
-- `_queue_taken_2` space was not temporary.
610+
-- When the queue is running in single mode,
611+
-- the space is converted to temporary mode to increase performance.
612+
--
613+
local function switch_in_replicaset(mode)
614+
if mode == nil then
615+
log.warn('queue: queue required after box.cfg{}')
616+
mode = false
617+
end
618+
619+
if not box.space._queue_taken_2 then
620+
return
621+
end
622+
623+
if box.space._queue_taken_2.temporary and mode == false then
624+
return
625+
end
626+
627+
if not box.space._queue_taken_2.temporary and mode == true then
628+
return
629+
end
630+
631+
box.schema.create_space('_queue_taken_2_mgr', {
632+
temporary = not mode,
633+
format = {
634+
{name = 'tube_id', type = num_type()},
635+
{name = 'task_id', type = num_type()},
636+
{name = 'connection_id', type = num_type()},
637+
{name = 'session_uuid', type = str_type()},
638+
{name = 'taken_time', type = num_type()}
639+
}})
640+
641+
box.space._queue_taken_2_mgr:create_index('task', {
642+
type = 'tree',
643+
parts = {1, num_type(), 2, num_type()},
644+
unique = true
645+
})
646+
box.space._queue_taken_2_mgr:create_index('uuid', {
647+
type = 'tree',
648+
parts = {4, str_type()},
649+
unique = false
650+
})
651+
652+
box.begin() -- Disable implicit yields until the transaction ends.
653+
for _, tuple in box.space._queue_taken_2:pairs() do
654+
box.space._queue_taken_2_mgr:insert(tuple)
655+
end
656+
657+
box.space._queue_taken_2:drop()
658+
box.space._queue_taken_2_mgr:rename('_queue_taken_2')
659+
660+
local status, err = pcall(box.commit)
661+
if not status then
662+
error(('Error migrate _queue_taken_2: %s'):format(tostring(err)))
663+
end
664+
end
665+
604666
-- create or join infrastructure
605667
function method.start()
606668
-- tube_name, tube_id, space_name, tube_type, opts
@@ -657,11 +719,11 @@ function method.start()
657719
box.space._queue_taken:drop()
658720
end
659721

660-
local _taken = box.space._queue_taken_2
661-
if _taken == nil then
722+
local _mode = queue.cfg['in_replicaset'] or false
723+
if box.space._queue_taken_2 == nil then
662724
-- tube_id, task_id, connection_id, session_uuid, time
663-
_taken = box.schema.create_space('_queue_taken_2', {
664-
temporary = false,
725+
box.schema.create_space('_queue_taken_2', {
726+
temporary = not _mode,
665727
format = {
666728
{name = 'tube_id', type = num_type()},
667729
{name = 'task_id', type = num_type()},
@@ -670,22 +732,18 @@ function method.start()
670732
{name = 'taken_time', type = num_type()}
671733
}})
672734

673-
_taken:create_index('task', {
735+
box.space._queue_taken_2:create_index('task', {
674736
type = 'tree',
675737
parts = {1, num_type(), 2, num_type()},
676738
unique = true
677739
})
678-
_taken:create_index('uuid', {
740+
box.space._queue_taken_2:create_index('uuid', {
679741
type = 'tree',
680742
parts = {4, str_type()},
681743
unique = false
682744
})
683745
else
684-
-- Upgrade space, queue states require that this space
685-
-- was not temporary.
686-
if _taken.temporary then
687-
_taken:alter{temporary = false}
688-
end
746+
switch_in_replicaset(queue.cfg['in_replicaset'])
689747
end
690748

691749
for _, tube_tuple in _queue:pairs() do
@@ -778,16 +836,28 @@ local function cfg(self, opts)
778836
opts = opts or {}
779837
local session_opts = {}
780838

839+
-- Set default in_replicaset value.
840+
if opts['in_replicaset'] == nil then
841+
opts['in_replicaset'] = false
842+
end
843+
781844
-- Check all options before configuring so that
782845
-- the configuration is done transactionally.
783846
for key, val in pairs(opts) do
784847
if key == 'ttr' then
785848
session_opts[key] = val
849+
elseif key == 'in_replicaset' then
850+
if type(val) ~= 'boolean' then
851+
error('Invalid value of in_replicaset: ' .. tostring(val))
852+
end
853+
session_opts[key] = val
786854
else
787855
error('Unknown option ' .. tostring(key))
788856
end
789857
end
790858

859+
switch_in_replicaset(opts['in_replicaset'])
860+
791861
-- Configuring the queue_session module.
792862
session.cfg(session_opts)
793863

queue/abstract/queue_session.lua

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,59 @@ local str_type = qc.str_type
1010

1111
local queue_session = {}
1212

13+
--
14+
-- Replicaset mode switch.
15+
--
16+
-- Running a queue in master-replica mode requires that
17+
-- `_queue_inactive_sessions` space was not temporary.
18+
-- When the queue is running in single mode,
19+
-- the space is converted to temporary mode to increase performance.
20+
--
21+
local function switch_in_replicaset(mode)
22+
if mode == nil then
23+
mode = false
24+
end
25+
26+
if not box.space._queue_inactive_sessions then
27+
return
28+
end
29+
30+
if box.space._queue_inactive_sessions.temporary and mode == false then
31+
return
32+
end
33+
34+
if not box.space._queue_inactive_sessions.temporary and mode == true then
35+
return
36+
end
37+
38+
box.schema.create_space('_queue_inactive_sessions_mgr', {
39+
temporary = not mode,
40+
format = {
41+
{ name = 'uuid', type = str_type() },
42+
{ name = 'exp_time', type = num_type() }
43+
}
44+
})
45+
46+
box.space._queue_inactive_sessions_mgr:create_index('uuid', {
47+
type = 'tree',
48+
parts = { 1, str_type() },
49+
unique = true
50+
})
51+
52+
box.begin() -- Disable implicit yields until the transaction ends.
53+
for _, tuple in box.space._queue_inactive_sessions:pairs() do
54+
box.space._queue_inactive_sessions_mgr:insert(tuple)
55+
end
56+
57+
box.space._queue_inactive_sessions:drop()
58+
box.space._queue_inactive_sessions_mgr:rename('_queue_inactive_sessions')
59+
60+
local status, err = pcall(box.commit)
61+
if not status then
62+
error(('Error migrate _queue_inactive_sessions: %s'):format(tostring(err)))
63+
end
64+
end
65+
1366
--- Create everything that's needed to work with "shared" sessions.
1467
local function identification_init()
1568
local queue_session_ids = box.space._queue_session_ids
@@ -34,21 +87,23 @@ local function identification_init()
3487
})
3588
end
3689

37-
local queue_inactive_sessions = box.space._queue_inactive_sessions
38-
if queue_inactive_sessions == nil then
39-
queue_inactive_sessions = box.schema.create_space('_queue_inactive_sessions', {
40-
temporary = false,
90+
local _mode = queue_session.cfg['in_replicaset'] or false
91+
if box.space._queue_inactive_sessions == nil then
92+
box.schema.create_space('_queue_inactive_sessions', {
93+
temporary = not _mode,
4194
format = {
4295
{ name = 'uuid', type = str_type() },
4396
{ name = 'exp_time', type = num_type() }
4497
}
4598
})
4699

47-
queue_inactive_sessions:create_index('uuid', {
100+
box.space._queue_inactive_sessions:create_index('uuid', {
48101
type = 'tree',
49102
parts = { 1, str_type() },
50103
unique = true
51104
})
105+
else
106+
switch_in_replicaset(queue_session.cfg['in_replicaset'])
52107
end
53108
end
54109

@@ -93,7 +148,6 @@ end
93148
-- an error will be thrown.
94149
local function identify(conn_id, session_uuid)
95150
local queue_session_ids = box.space._queue_session_ids
96-
local queue_inactive_sessions = box.space._queue_inactive_sessions
97151
local session_ids = queue_session_ids:get(conn_id)
98152
local cur_uuid = session_ids and session_ids[2]
99153

@@ -114,7 +168,7 @@ local function identify(conn_id, session_uuid)
114168
-- Check that a session with this uuid exists.
115169
local ids_by_uuid = queue_session_ids.index.uuid:select(
116170
session_uuid, { limit = 1 })[1]
117-
local inactive_session = queue_inactive_sessions:get(session_uuid)
171+
local inactive_session = box.space._queue_inactive_sessions:get(session_uuid)
118172
if ids_by_uuid == nil and inactive_session == nil then
119173
error('The UUID ' .. uuid.frombin(session_uuid):str() ..
120174
' is unknown.')
@@ -130,7 +184,7 @@ local function identify(conn_id, session_uuid)
130184
end
131185

132186
-- Exclude the session from inactive.
133-
queue_inactive_sessions:delete{cur_uuid}
187+
box.space._queue_inactive_sessions:delete{cur_uuid}
134188

135189
return cur_uuid
136190
end
@@ -146,7 +200,6 @@ end
146200
-- release its tasks if necessary.
147201
local function disconnect(conn_id)
148202
local queue_session_ids = box.space._queue_session_ids
149-
local queue_inactive_sessions = box.space._queue_inactive_sessions
150203
local session_uuid = queue_session.identify(conn_id)
151204

152205
queue_session_ids:delete{conn_id}
@@ -158,7 +211,7 @@ local function disconnect(conn_id)
158211
if session_ids == nil then
159212
local ttr = queue_session.cfg['ttr'] or 0
160213
if ttr > 0 then
161-
queue_inactive_sessions:insert{session_uuid, util.event_time(ttr)}
214+
box.space._queue_inactive_sessions:insert{session_uuid, util.event_time(ttr)}
162215
elseif queue_session._on_session_remove ~= nil then
163216
queue_session._on_session_remove(session_uuid)
164217
end
@@ -189,6 +242,8 @@ local function validate_opts(opts)
189242
if type(val) ~= 'number' or val < 0 then
190243
error('Invalid value of ttr: ' .. tostring(val))
191244
end
245+
elseif key == 'in_replicaset' then
246+
-- do nothing
192247
else
193248
error('Unknown option ' .. tostring(key))
194249
end
@@ -207,6 +262,8 @@ local function cfg(self, opts)
207262
for key, val in pairs(opts) do
208263
self[key] = val
209264
end
265+
266+
switch_in_replicaset(self['in_replicaset'])
210267
end
211268

212269
local function exist_inactive(session_uuid)

0 commit comments

Comments
 (0)