Skip to content

Commit fd3616c

Browse files
0x501DLeonidVas
authored andcommitted
session: full support for shared sessions
In order for the user to be able to finish servicing the task after the queue switched from replica to master, the `_queue_inactive_session` space was renamed to `_queue_shared_sessions` and had an `active` field in it. When the `take` method is called, the session uuid is automatically added to the `_queue_shared_sessions` space with `exp_time` set to zero and `active` field set to true. When the client closes the session, this entry becomes inactive (`active` = false) and `exp_time` is set. At the start of the queue, we make all active sessions inactive, and set an expiration time for them. If the `ttr` setting is not set, then we delete all sessions. If `ttr` is not set shared sessions is disabled. Follows up #120
1 parent ac1a9af commit fd3616c

File tree

6 files changed

+224
-88
lines changed

6 files changed

+224
-88
lines changed

README.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,11 @@ is set to false.
280280
1. `connection_id` - connection id (numeric)
281281
2. `session_uuid` - session UUID (string)
282282

283-
## Fields of the `_queue_inactive_sessions` space
283+
## Fields of the `_queue_shared_sessions` space
284284

285285
1. `uuid` - session UUID (string)
286286
2. `exp_time` - session expiration time (numeric)
287+
3. `active` - session state (boolean)
287288

288289
This space is temporary if `in_replicaset` is set to false.
289290

@@ -378,12 +379,10 @@ Current queue state can be shown by using `queue.state()` method.
378379
In the `STARTUP` state, the queue is waiting for possible data synchronization
379380
with other cluster members by the time of the largest upstream lag multiplied
380381
by two. After that, all taken tasks are released, except for tasks with
381-
session uuid matching inactive sessions uuids. This makes possible to take
382+
session uuid matching shared sessions uuids. This makes possible to take
382383
a task, switch roles on the cluster, and release the task within the timeout
383-
specified by the `queue.cfg({ttr = N})` parameter. Note: all clients that `take()`
384-
and do not `ack()/release()` tasks must be disconnected before changing the role.
385-
And the last step in the `STARTUP` state is starting tube driver using new
386-
method called `start()`.
384+
specified by the `queue.cfg({ttr = N})` parameter. And the last step in the
385+
`STARTUP` state is starting tube driver using new method called `start()`.
387386

388387
In the `RUNNING` state, the queue is working as usually. The `ENDING` state calls
389388
`stop()` method. in the `WAITING` state, the queue listens for a change in the

queue/abstract.lua

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ local function tube_release_all_orphaned_tasks(tube)
5656
local taken = box.space._queue_taken_2.index.task:get{
5757
tube.tube_id, task[1]
5858
}
59-
if taken and session.exist_inactive(taken[4]) then
59+
if taken and session.exist_shared(taken[4]) then
6060
log.info(prefix ..
6161
('skipping task: %d, tube_id: %d'):format(task[1],
6262
tube.tube_id))
@@ -619,26 +619,26 @@ end
619619
-- When the queue is running in single mode,
620620
-- the space is converted to temporary mode to increase performance.
621621
--
622-
local function switch_in_replicaset(mode)
623-
if mode == nil then
622+
local function switch_in_replicaset(replicaset_mode)
623+
if replicaset_mode == nil then
624624
log.warn('queue: queue required after box.cfg{}')
625-
mode = false
625+
replicaset_mode = false
626626
end
627627

628628
if not box.space._queue_taken_2 then
629629
return
630630
end
631631

632-
if box.space._queue_taken_2.temporary and mode == false then
632+
if box.space._queue_taken_2.temporary and replicaset_mode == false then
633633
return
634634
end
635635

636-
if not box.space._queue_taken_2.temporary and mode == true then
636+
if not box.space._queue_taken_2.temporary and replicaset_mode == true then
637637
return
638638
end
639639

640640
box.schema.create_space('_queue_taken_2_mgr', {
641-
temporary = not mode,
641+
temporary = not replicaset_mode,
642642
format = {
643643
{name = 'tube_id', type = num_type()},
644644
{name = 'task_id', type = num_type()},
@@ -728,11 +728,11 @@ function method.start()
728728
box.space._queue_taken:drop()
729729
end
730730

731-
local _mode = queue.cfg['in_replicaset'] or false
731+
local replicaset_mode = queue.cfg['in_replicaset'] or false
732732
if box.space._queue_taken_2 == nil then
733733
-- tube_id, task_id, connection_id, session_uuid, time
734734
box.schema.create_space('_queue_taken_2', {
735-
temporary = not _mode,
735+
temporary = not replicaset_mode,
736736
format = {
737737
{name = 'tube_id', type = num_type()},
738738
{name = 'task_id', type = num_type()},

queue/abstract/driver/fifottl.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ local function fifottl_fiber(self)
170170
else
171171
-- When switching the master to the replica, the fiber will be stopped.
172172
if self.sync_chan:get(0.1) ~= nil then
173-
print("Queue fifottl was stopped")
173+
log.info("Queue fifottl fiber was stopped")
174174
break
175175
end
176176
end

queue/abstract/driver/utubettl.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ local function utubettl_fiber(self)
178178
else
179179
-- When switching the master to the replica, the fiber will be stopped.
180180
if self.sync_chan:get(0.1) ~= nil then
181-
print("Queue utubettl was stopped")
181+
log.info("Queue utubettl fiber was stopped")
182182
break
183183
end
184184
end

queue/abstract/queue_session.lua

Lines changed: 88 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,60 @@ local queue_session = {}
1414
-- Replicaset mode switch.
1515
--
1616
-- Running a queue in master-replica mode requires that
17-
-- `_queue_inactive_sessions` space was not temporary.
17+
-- `_queue_shared_sessions` space was not temporary.
1818
-- When the queue is running in single mode,
1919
-- the space is converted to temporary mode to increase performance.
2020
--
21-
local function switch_in_replicaset(mode)
22-
if mode == nil then
23-
mode = false
21+
local function switch_in_replicaset(replicaset_mode)
22+
if replicaset_mode == nil then
23+
replicaset_mode = false
2424
end
2525

26-
if not box.space._queue_inactive_sessions then
26+
if not box.space._queue_shared_sessions then
2727
return
2828
end
2929

30-
if box.space._queue_inactive_sessions.temporary and mode == false then
30+
if box.space._queue_shared_sessions.temporary
31+
and replicaset_mode == false then
3132
return
3233
end
3334

34-
if not box.space._queue_inactive_sessions.temporary and mode == true then
35+
if not box.space._queue_shared_sessions.temporary
36+
and replicaset_mode == true then
3537
return
3638
end
3739

38-
box.schema.create_space('_queue_inactive_sessions_mgr', {
39-
temporary = not mode,
40+
box.schema.create_space('_queue_shared_sessions_mgr', {
41+
temporary = not replicaset_mode,
4042
format = {
4143
{ name = 'uuid', type = str_type() },
42-
{ name = 'exp_time', type = num_type() }
44+
{ name = 'exp_time', type = num_type() },
45+
{ name = 'active', type = 'boolean' },
4346
}
4447
})
4548

46-
box.space._queue_inactive_sessions_mgr:create_index('uuid', {
49+
box.space._queue_shared_sessions_mgr:create_index('uuid', {
4750
type = 'tree',
4851
parts = { 1, str_type() },
4952
unique = true
5053
})
54+
box.space._queue_shared_sessions_mgr:create_index('active', {
55+
type = 'tree',
56+
parts = { 3, 'boolean', 1, str_type() },
57+
unique = true
58+
})
5159

5260
box.begin() -- Disable implicit yields until the transaction ends.
53-
for _, tuple in box.space._queue_inactive_sessions:pairs() do
54-
box.space._queue_inactive_sessions_mgr:insert(tuple)
61+
for _, tuple in box.space._queue_shared_sessions:pairs() do
62+
box.space._queue_shared_sessions_mgr:insert(tuple)
5563
end
5664

57-
box.space._queue_inactive_sessions:drop()
58-
box.space._queue_inactive_sessions_mgr:rename('_queue_inactive_sessions')
65+
box.space._queue_shared_sessions:drop()
66+
box.space._queue_shared_sessions_mgr:rename('_queue_shared_sessions')
5967

6068
local status, err = pcall(box.commit)
6169
if not status then
62-
error(('Error migrate _queue_inactive_sessions: %s'):format(tostring(err)))
70+
error(('Error migrate _queue_shared_sessions: %s'):format(tostring(err)))
6371
end
6472
end
6573

@@ -87,37 +95,63 @@ local function identification_init()
8795
})
8896
end
8997

90-
local _mode = queue_session.cfg['in_replicaset'] or false
91-
if box.space._queue_inactive_sessions == nil then
92-
box.schema.create_space('_queue_inactive_sessions', {
93-
temporary = not _mode,
98+
local replicaset_mode = queue_session.cfg['in_replicaset'] or false
99+
if box.space._queue_shared_sessions == nil then
100+
box.schema.create_space('_queue_shared_sessions', {
101+
temporary = not replicaset_mode,
94102
format = {
95103
{ name = 'uuid', type = str_type() },
96-
{ name = 'exp_time', type = num_type() }
104+
{ name = 'exp_time', type = num_type() },
105+
{ name = 'active', type = 'boolean' },
97106
}
98107
})
99108

100-
box.space._queue_inactive_sessions:create_index('uuid', {
109+
box.space._queue_shared_sessions:create_index('uuid', {
101110
type = 'tree',
102111
parts = { 1, str_type() },
103112
unique = true
104113
})
114+
box.space._queue_shared_sessions:create_index('active', {
115+
type = 'tree',
116+
parts = { 3, 'boolean', 1, str_type() },
117+
unique = true
118+
})
105119
else
106120
switch_in_replicaset(queue_session.cfg['in_replicaset'])
121+
122+
-- At the start of the queue, we make all active sessions inactive,
123+
-- and set an expiration time for them. If the ttr setting is not set,
124+
-- then we delete all sessions.
125+
local ttr = queue_session.cfg['ttr'] or 0
126+
if ttr > 0 then
127+
for _, tuple in box.space._queue_shared_sessions.index.active:pairs{true} do
128+
box.space._queue_shared_sessions:update(tuple[1], {
129+
{'=', 2, util.event_time(ttr)},
130+
{'=', 3, false},
131+
})
132+
end
133+
else
134+
if queue_session._on_session_remove ~= nil then
135+
for _, tuple in box.space._queue_shared_sessions.index.uuid:pairs() do
136+
queue_session._on_session_remove(tuple[1])
137+
end
138+
end
139+
box.space._queue_shared_sessions:truncate()
140+
end
107141
end
108142
end
109143

110144
local function cleanup_inactive_sessions()
111145
local cur_time = util.time()
112146

113-
for _, val in box.space._queue_inactive_sessions:pairs() do
147+
for _, val in box.space._queue_shared_sessions.index.active:pairs{false} do
114148
local session_uuid = val[1]
115149
local exp_time = val[2]
116150
if cur_time >= exp_time then
117151
if queue_session._on_session_remove ~= nil then
118152
queue_session._on_session_remove(session_uuid)
119153
end
120-
box.space._queue_inactive_sessions:delete{session_uuid}
154+
box.space._queue_shared_sessions:delete{session_uuid}
121155
end
122156
end
123157
end
@@ -160,6 +194,7 @@ local function identify(conn_id, session_uuid)
160194
-- Generate new UUID for the session.
161195
cur_uuid = uuid.bin()
162196
queue_session_ids:insert{conn_id, cur_uuid}
197+
box.space._queue_shared_sessions:insert{cur_uuid, 0, true}
163198
elseif session_uuid ~= nil then
164199
-- Validate UUID.
165200
if not pcall(uuid.frombin, session_uuid) then
@@ -170,8 +205,8 @@ local function identify(conn_id, session_uuid)
170205
-- Check that a session with this uuid exists.
171206
local ids_by_uuid = queue_session_ids.index.uuid:select(
172207
session_uuid, { limit = 1 })[1]
173-
local inactive_session = box.space._queue_inactive_sessions:get(session_uuid)
174-
if ids_by_uuid == nil and inactive_session == nil then
208+
local shared_session = box.space._queue_shared_sessions:get(session_uuid)
209+
if ids_by_uuid == nil and shared_session == nil then
175210
error('The UUID ' .. uuid.frombin(session_uuid):str() ..
176211
' is unknown.')
177212
end
@@ -183,10 +218,15 @@ local function identify(conn_id, session_uuid)
183218
queue_session_ids:insert({conn_id, session_uuid})
184219
cur_uuid = session_uuid
185220
end
186-
end
187221

188-
-- Exclude the session from inactive.
189-
box.space._queue_inactive_sessions:delete{cur_uuid}
222+
-- Make session active.
223+
if shared_session then
224+
box.space._queue_shared_sessions:update(shared_session[1], {
225+
{'=', 2, 0},
226+
{'=', 3, true},
227+
})
228+
end
229+
end
190230

191231
return cur_uuid
192232
end
@@ -209,21 +249,33 @@ local function disconnect(conn_id)
209249
{ limit = 1 })[1]
210250

211251
-- If a queue session doesn't have any active connections it should be
212-
-- removed (if ttr is absent) or moved to the "inactive sessions" list.
252+
-- removed (if ttr is absent) or change the `active` flag to make the
253+
-- session inactive.
213254
if session_ids == nil then
214255
local ttr = queue_session.cfg['ttr'] or 0
215256
if ttr > 0 then
216-
box.space._queue_inactive_sessions:insert{session_uuid, util.event_time(ttr)}
257+
local tuple = box.space._queue_shared_sessions:get{session_uuid}
258+
if tuple == nil then
259+
box.space._queue_shared_sessions:insert{
260+
session_uuid, util.event_time(ttr), false
261+
}
262+
else
263+
box.space._queue_shared_sessions:update(tuple[1], {
264+
{'=', 2, util.event_time(ttr)},
265+
{'=', 3, false},
266+
})
267+
end
217268
elseif queue_session._on_session_remove ~= nil then
218269
queue_session._on_session_remove(session_uuid)
270+
box.space._queue_shared_sessions.index.uuid:delete{session_uuid}
219271
end
220272
end
221273
end
222274

223275
local function grant(user)
224276
box.schema.user.grant(user, 'read, write', 'space', '_queue_session_ids',
225277
{ if_not_exists = true })
226-
box.schema.user.grant(user, 'read, write', 'space', '_queue_inactive_sessions',
278+
box.schema.user.grant(user, 'read, write', 'space', '_queue_shared_sessions',
227279
{ if_not_exists = true })
228280
end
229281

@@ -246,7 +298,7 @@ local function validate_opts(opts)
246298
error('Invalid value of ttr: ' .. tostring(val))
247299
end
248300
elseif key == 'in_replicaset' then
249-
-- do nothing
301+
-- Do nothing.
250302
else
251303
error('Unknown option ' .. tostring(key))
252304
end
@@ -269,8 +321,8 @@ local function cfg(self, opts)
269321
switch_in_replicaset(self['in_replicaset'])
270322
end
271323

272-
local function exist_inactive(session_uuid)
273-
if box.space._queue_inactive_sessions:get{session_uuid} then
324+
local function exist_shared(session_uuid)
325+
if box.space._queue_shared_sessions:get{session_uuid} then
274326
return true
275327
end
276328

@@ -287,7 +339,7 @@ local method = {
287339
on_session_remove = on_session_remove,
288340
start = start,
289341
stop = stop,
290-
exist_inactive = exist_inactive
342+
exist_shared = exist_shared
291343
}
292344

293345
return setmetatable(queue_session, { __index = method })

0 commit comments

Comments
 (0)