Skip to content

Commit 7900deb

Browse files
committed
add master-replica switching support
This patch adds ability to use queue in a master-replica scheme. The queue will monitor the operation mode of the tarantool and perform the necessary actions accordingly. This patch adds five states for queue: INIT, STARTUP, RUNNING, ENDING and WAITING. When the tarantool is launched for the first time, the state of the queue is always INIT until box.info.ro is false. States switching scheme: +-----------+ | RUNNING | +-----------+ ^ | (rw -> ro) | v +------+ +---------+ +--------+ | INIT | ---> | STARTUP | | ENDING | +------+ +---------+ +--------+ ^ | (ro -> rw) | v +-----------+ | WAITING | +-----------+ In the STARTUP state, the queue is waiting for possible data synchronization with other cluster members by the time of the largest upstream lag multiplied by two. After that, all taken tasks are released, except for tasks with session uuid matching inactive sessions uuids. This makes possible to take a task, switch roles on the cluster, and release the task within the timeout specified by the queue.cfg({ttr = N}) parameter. Note: all clients that take() and do not ack()/release() tasks must be disconnected before changing the role. And the last step in the STARTUP state is starting tube driver using new method called start(). Each tube driver must implement start() and stop() methods. The start() method should start the driver fibers, if any, in other words, initialize all asynchronous work with the tubes space. The stop() methods shuld stop the driver fibers, if any, respectively. In the RUNNING state, the queue is working as usually. The ENDING state calls stop() method. in the WAITING state, the queue listens for a change in the read_only flag. All states except INIT is controlled by new fiber called 'queue_state_fiber'. A new release_all() method has also been added, which forcibly returns all taken tasks to a ready state. This method can be called per tube. Closes #120
1 parent b4289e6 commit 7900deb

File tree

10 files changed

+384
-29
lines changed

10 files changed

+384
-29
lines changed

queue/abstract.lua

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local uuid = require('uuid')
44

55
local session = require('queue.abstract.queue_session')
66
local state = require('queue.abstract.state')
7+
local queue_state = require('queue.abstract.queue_state')
78

89
local util = require('queue.util')
910
local qc = require('queue.compat')
@@ -36,7 +37,10 @@ local queue = {
3637
stat = {}
3738
}
3839

39-
local function tube_release_all_tasks(tube)
40+
-- Release all tasks except tasks that have session_uuid
41+
-- stored in inactive sessions.
42+
-- They will be freed later in release_session_tasks().
43+
local function tube_release_all_orphaned_tasks(tube)
4044
local prefix = ('queue: [tube "%s"] '):format(tube.name)
4145

4246
-- We lean on stable iterators in this function.
@@ -51,8 +55,17 @@ local function tube_release_all_tasks(tube)
5155
log.info(prefix .. 'releasing all taken task (may take a while)')
5256
local released = 0
5357
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
54-
tube.raw:release(task[1], {})
55-
released = released + 1
58+
local taken = box.space._queue_taken_2.index.task:get{
59+
tube.tube_id, task[1]
60+
}
61+
if taken and session.exist_inactive(taken[4]) then
62+
log.info(prefix ..
63+
('skipping task: %d, tube_id: %d'):format(task[1],
64+
tube.tube_id))
65+
else
66+
tube.raw:release(task[1], {})
67+
released = released + 1
68+
end
5669
end
5770
log.info(prefix .. ('released %d tasks'):format(released))
5871
end
@@ -72,7 +85,20 @@ end
7285
-- tube methods
7386
local tube = {}
7487

88+
-- This check must be called from all public tube methods.
89+
local function check_state()
90+
if queue_state.get() ~= queue_state.states.RUNNING then
91+
error(('Queue is in %s state'):format(queue_state.show()))
92+
return false
93+
end
94+
95+
return true
96+
end
97+
7598
function tube.put(self, data, opts)
99+
if not check_state() then
100+
return nil
101+
end
76102
opts = opts or {}
77103
local task = self.raw:put(data, opts)
78104
return self.raw:normalize_task(task)
@@ -82,6 +108,9 @@ local conds = {}
82108
local releasing_connections = {}
83109

84110
function tube.take(self, timeout)
111+
if not check_state() then
112+
return nil
113+
end
85114
timeout = util.time(timeout or util.TIMEOUT_INFINITY)
86115
local task = self.raw:take()
87116
if task ~= nil then
@@ -120,6 +149,9 @@ function tube.take(self, timeout)
120149
end
121150

122151
function tube.touch(self, id, delta)
152+
if not check_state() then
153+
return
154+
end
123155
if delta == nil then
124156
return
125157
end
@@ -143,6 +175,9 @@ function tube.touch(self, id, delta)
143175
end
144176

145177
function tube.ack(self, id)
178+
if not check_state() then
179+
return nil
180+
end
146181
check_task_is_taken(self.tube_id, id)
147182
local tube = box.space._queue:get{self.name}
148183
local space_name = tube[3]
@@ -169,10 +204,32 @@ local function tube_release_internal(self, id, opts, session_uuid)
169204
end
170205

171206
function tube.release(self, id, opts)
207+
if not check_state() then
208+
return nil
209+
end
172210
return tube_release_internal(self, id, opts)
173211
end
174212

213+
-- Release all tasks (force release, session_id will not be used).
214+
function tube.release_all(self)
215+
if not check_state() then
216+
return
217+
end
218+
local prefix = ('queue: [tube "%s"] '):format(self.name)
219+
220+
log.info(prefix .. 'releasing all taken task (may take a while)')
221+
local released = 0
222+
for _, task in self.raw:tasks_by_state(state.TAKEN) do
223+
self.raw:release(task[1], {})
224+
released = released + 1
225+
end
226+
log.info(('%s released %d tasks'):format(prefix, released))
227+
end
228+
175229
function tube.peek(self, id)
230+
if not check_state() then
231+
return nil
232+
end
176233
local task = self.raw:peek(id)
177234
if task == nil then
178235
error(("Task %s not found"):format(tostring(id)))
@@ -181,6 +238,9 @@ function tube.peek(self, id)
181238
end
182239

183240
function tube.bury(self, id)
241+
if not check_state() then
242+
return nil
243+
end
184244
local task = self:peek(id)
185245
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id)
186246
if is_taken then
@@ -193,17 +253,46 @@ function tube.bury(self, id)
193253
end
194254

195255
function tube.kick(self, count)
256+
if not check_state() then
257+
return nil
258+
end
196259
count = count or 1
197260
return self.raw:kick(count)
198261
end
199262

263+
function tube._start(self)
264+
if not self.raw.start then
265+
log.warn('queue: [tube "%s"] method start %s is not implemented',
266+
self.name)
267+
return
268+
end
269+
270+
return self.raw:start()
271+
end
272+
273+
function tube._stop(self)
274+
if not self.raw.stop then
275+
log.warn('queue: [tube "%s"] method stop %s is not implemented',
276+
self.name)
277+
return
278+
end
279+
280+
return self.raw:stop()
281+
end
282+
200283
function tube.delete(self, id)
284+
if not check_state() then
285+
return nil
286+
end
201287
self:peek(id)
202288
return self.raw:normalize_task(self.raw:delete(id))
203289
end
204290

205291
-- drop tube
206292
function tube.drop(self)
293+
if not check_state() then
294+
return nil
295+
end
207296
local tube_name = self.name
208297

209298
local tube = box.space._queue:get{tube_name}
@@ -238,6 +327,9 @@ end
238327
-- truncate tube
239328
-- (delete everything from tube)
240329
function tube.truncate(self)
330+
if not check_state() then
331+
return
332+
end
241333
self.raw:truncate()
242334
end
243335

@@ -248,6 +340,9 @@ function tube.on_task_change(self, cb)
248340
end
249341

250342
function tube.grant(self, user, args)
343+
if not check_state() then
344+
return
345+
end
251346
local function tube_grant_space(user, name, tp)
252347
box.schema.user.grant(user, tp or 'read,write', 'space', name, {
253348
if_not_exists = true,
@@ -296,6 +391,8 @@ local required_driver_methods = {
296391
'bury',
297392
'kick',
298393
'peek',
394+
'stop',
395+
'start',
299396
'touch',
300397
'truncate',
301398
'tasks_by_state'
@@ -406,6 +503,10 @@ local function release_session_tasks(session_uuid)
406503
end
407504
end
408505

506+
function method.state()
507+
return queue_state.show()
508+
end
509+
409510
function method._on_consumer_disconnect()
410511
local conn_id = connection.id()
411512

@@ -449,6 +550,9 @@ end
449550
-------------------------------------------------------------------------------
450551
-- create tube
451552
function method.create_tube(tube_name, tube_type, opts)
553+
if not check_state() then
554+
return
555+
end
452556
opts = opts or {}
453557
if opts.if_not_exists == nil then
454558
opts.if_not_exists = false
@@ -551,7 +655,7 @@ function method.start()
551655
if _taken == nil then
552656
-- tube_id, task_id, connection_id, session_uuid, time
553657
_taken = box.schema.create_space('_queue_taken_2', {
554-
temporary = true,
658+
temporary = false,
555659
format = {
556660
{name = 'tube_id', type = num_type()},
557661
{name = 'task_id', type = num_type()},
@@ -570,6 +674,12 @@ function method.start()
570674
parts = {4, str_type()},
571675
unique = false
572676
})
677+
else
678+
-- Upgrade space, queue states require that this space
679+
-- was not temporary.
680+
if _taken.temporary then
681+
_taken:alter{temporary = false}
682+
end
573683
end
574684

575685
for _, tube_tuple in _queue:pairs() do
@@ -578,14 +688,15 @@ function method.start()
578688
if queue.driver[tube_tuple[4]] ~= nil then
579689
local tube = recreate_tube(tube_tuple)
580690
-- gh-66: release all taken tasks on start
581-
tube_release_all_tasks(tube)
691+
tube_release_all_orphaned_tasks(tube)
582692
end
583693
end
584694

585695
session.on_session_remove(release_session_tasks)
586696
session.start()
587697

588698
connection.on_disconnect(queue._on_consumer_disconnect)
699+
queue_state.init(queue.tube)
589700
return queue
590701
end
591702

@@ -609,7 +720,7 @@ function method.register_driver(driver_name, tube_ctr)
609720
if tube_tuple[4] == driver_name then
610721
local tube = recreate_tube(tube_tuple)
611722
-- Release all task for tube on start.
612-
tube_release_all_tasks(tube)
723+
tube_release_all_orphaned_tasks(tube)
613724
end
614725
end
615726
end

queue/abstract/driver/fifo.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,14 @@ function method.truncate(self)
148148
self.space:truncate()
149149
end
150150

151+
-- This driver has no background activity.
152+
-- Implement dummy methods for the API requirement.
153+
function method.start()
154+
return
155+
end
156+
157+
function method.stop()
158+
return
159+
end
160+
151161
return tube

queue/abstract/driver/fifottl.lua

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local log = require('log')
2+
local ffi = require('ffi')
23
local fiber = require('fiber')
34
local state = require('queue.abstract.state')
45

@@ -159,6 +160,16 @@ local function fifottl_fiber(self)
159160
while true do
160161
if box.info.ro == false then
161162
local stat, err = pcall(fifottl_fiber_iteration, self, processed)
163+
164+
-- Errors from tarantool has different type.
165+
-- We must check it by calling ffi.istype()
166+
if ffi.istype("struct error", err) then
167+
if err.type == "FiberIsCancelled" then
168+
log.info("fifottl was cancelled")
169+
break
170+
end
171+
end
172+
162173
if not stat and not (err.code == box.error.READONLY) then
163174
log.error("error catched: %s", tostring(err))
164175
log.error("exiting fiber '%s'", fiber.name())
@@ -365,4 +376,19 @@ function method.truncate(self)
365376
self.space:truncate()
366377
end
367378

379+
function method.start(self)
380+
if self.fiber then
381+
return
382+
end
383+
self.fiber = fiber.create(fifottl_fiber, self)
384+
end
385+
386+
function method.stop(self)
387+
if not self.fiber then
388+
return
389+
end
390+
self.fiber:cancel()
391+
self.fiber = nil
392+
end
393+
368394
return tube

queue/abstract/driver/utube.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,14 @@ function method.truncate(self)
175175
self.space:truncate()
176176
end
177177

178+
-- This driver has no background activity.
179+
-- Implement dummy methods for the API requirement.
180+
function method.start()
181+
return
182+
end
183+
184+
function method.stop()
185+
return
186+
end
187+
178188
return tube

queue/abstract/driver/utubettl.lua

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local log = require('log')
2+
local ffi = require('ffi')
23
local fiber = require('fiber')
34

45
local state = require('queue.abstract.state')
@@ -167,6 +168,16 @@ local function utubettl_fiber(self)
167168
while true do
168169
if box.info.ro == false then
169170
local stat, err = pcall(utubettl_fiber_iteration, self, processed)
171+
172+
-- Errors from tarantool has different type.
173+
-- We must check it by calling ffi.istype()
174+
if ffi.istype("struct error", err) then
175+
if err.type == "FiberIsCancelled" then
176+
log.info("utubettl was cancelled")
177+
break
178+
end
179+
end
180+
170181
if not stat and not err.code == box.error.READONLY then
171182
log.error("error catched: %s", tostring(err))
172183
log.error("exiting fiber '%s'", fiber.name())
@@ -392,4 +403,19 @@ function method.truncate(self)
392403
self.space:truncate()
393404
end
394405

406+
function method.start(self)
407+
if self.fiber then
408+
return
409+
end
410+
self.fiber = fiber.create(utubettl_fiber, self)
411+
end
412+
413+
function method.stop(self)
414+
if not self.fiber then
415+
return
416+
end
417+
self.fiber:cancel()
418+
self.fiber = nil
419+
end
420+
395421
return tube

0 commit comments

Comments
 (0)