Skip to content

Commit a490861

Browse files
committed
add master-replica switching support
This patch adds ability to use queue in a master-replica scheme. The queue will monitor the operation mode of the tarantool and perform the necessary actions accordingly. This patch adds five states for queue: INIT, STARTUP, RUNNING, ENDING and WAITING. When the tarantool is launched for the first time, the state of the queue is always INIT until box.info.ro is false. States switching scheme: +-----------+ | RUNNING | +-----------+ ^ | (rw -> ro) | v +------+ +---------+ +--------+ | INIT | ---> | STARTUP | | ENDING | +------+ +---------+ +--------+ ^ | (ro -> rw) | v +-----------+ | WAITING | +-----------+ In the STARTUP state, the queue is waiting for possible data synchronization with other cluster members by the time of the largest upstream lag multiplied by two. After that, all taken tasks are released, except for tasks with session uuid matching inactive sessions uuids. This makes possible to take a task, switch roles on the cluster, and release the task within the timeout specified by the queue.cfg({ttr = N}) parameter. Note: all clients that take() and do not ack()/release() tasks must be disconnected before changing the role. And the last step in the STARTUP state is starting tube driver using new method called start(). Each tube driver must implement start() and stop() methods. The start() method should start the driver fibers, if any, in other words, initialize all asynchronous work with the tubes space. The stop() methods shuld stop the driver fibers, if any, respectively. In the RUNNING state, the queue is working as usually. The ENDING state calls stop() method. in the WAITING state, the queue listens for a change in the read_only flag. All states except INIT is controlled by new fiber called 'queue_state_fiber'. A new release_all() method has also been added, which forcibly returns all taken tasks to a ready state. This method can be called per tube. Closes #120
1 parent cede945 commit a490861

File tree

10 files changed

+397
-39
lines changed

10 files changed

+397
-39
lines changed

queue/abstract.lua

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ local log = require('log')
22
local fiber = require('fiber')
33
local uuid = require('uuid')
44

5-
local session = require('queue.abstract.queue_session')
6-
local state = require('queue.abstract.state')
5+
local session = require('queue.abstract.queue_session')
6+
local state = require('queue.abstract.state')
7+
local queue_state = require('queue.abstract.queue_state')
78

89
local util = require('queue.util')
910
local qc = require('queue.compat')
@@ -36,6 +37,9 @@ local queue = {
3637
stat = {}
3738
}
3839

40+
-- Release all tasks except tasks that have session_uuid
41+
-- stored in inactive sessions.
42+
-- They will be freed later in release_session_tasks().
3943
local function tube_release_all_tasks(tube)
4044
local prefix = ('queue: [tube "%s"] '):format(tube.name)
4145

@@ -51,8 +55,18 @@ local function tube_release_all_tasks(tube)
5155
log.info(prefix .. 'releasing all taken task (may take a while)')
5256
local released = 0
5357
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
58+
local taken = box.space._queue_taken_2.index.task:get{
59+
tube.tube_id, task[1]
60+
}
61+
if taken and box.space._queue_inactive_sessions:get{taken[4]} then
62+
log.info(prefix ..
63+
('skipping task: %d, tube_id: %d'):format(task[1],
64+
tube.tube_id))
65+
goto next
66+
end
5467
tube.raw:release(task[1], {})
5568
released = released + 1
69+
::next::
5670
end
5771
log.info(prefix .. ('released %d tasks'):format(released))
5872
end
@@ -73,6 +87,9 @@ end
7387
local tube = {}
7488

7589
function tube.put(self, data, opts)
90+
if not queue_state.check() then
91+
return nil
92+
end
7693
opts = opts or {}
7794
local task = self.raw:put(data, opts)
7895
return self.raw:normalize_task(task)
@@ -82,6 +99,9 @@ local conds = {}
8299
local releasing_connections = {}
83100

84101
function tube.take(self, timeout)
102+
if not queue_state.check() then
103+
return nil
104+
end
85105
timeout = util.time(timeout or util.TIMEOUT_INFINITY)
86106
local task = self.raw:take()
87107
if task ~= nil then
@@ -120,6 +140,9 @@ function tube.take(self, timeout)
120140
end
121141

122142
function tube.touch(self, id, delta)
143+
if not queue_state.check() then
144+
return
145+
end
123146
if delta == nil then
124147
return
125148
end
@@ -143,6 +166,9 @@ function tube.touch(self, id, delta)
143166
end
144167

145168
function tube.ack(self, id)
169+
if not queue_state.check() then
170+
return nil
171+
end
146172
check_task_is_taken(self.tube_id, id)
147173
local tube = box.space._queue:get{self.name}
148174
local space_name = tube[3]
@@ -169,10 +195,32 @@ local function tube_release_internal(self, id, opts, session_uuid)
169195
end
170196

171197
function tube.release(self, id, opts)
198+
if not queue_state.check() then
199+
return nil
200+
end
172201
return tube_release_internal(self, id, opts)
173202
end
174203

204+
-- release all tasks (force release, session_id will not be used)
205+
function tube.release_all(self)
206+
if not queue_state.check() then
207+
return
208+
end
209+
local prefix = ('queue: [tube "%s"] '):format(self.name)
210+
211+
log.info(prefix .. 'releasing all taken task (may take a while)')
212+
local released = 0
213+
for _, task in self.raw:tasks_by_state(state.TAKEN) do
214+
self.raw:release(task[1], {})
215+
released = released + 1
216+
end
217+
log.info(prefix .. ('released %d tasks'):format(released))
218+
end
219+
175220
function tube.peek(self, id)
221+
if not queue_state.check() then
222+
return nil
223+
end
176224
local task = self.raw:peek(id)
177225
if task == nil then
178226
error(("Task %s not found"):format(tostring(id)))
@@ -181,6 +229,9 @@ function tube.peek(self, id)
181229
end
182230

183231
function tube.bury(self, id)
232+
if not queue_state.check() then
233+
return nil
234+
end
184235
local task = self:peek(id)
185236
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id)
186237
if is_taken then
@@ -193,17 +244,46 @@ function tube.bury(self, id)
193244
end
194245

195246
function tube.kick(self, count)
247+
if not queue_state.check() then
248+
return nil
249+
end
196250
count = count or 1
197251
return self.raw:kick(count)
198252
end
199253

254+
function tube.start(self)
255+
if not self.raw.start then
256+
log.warn('queue: [tube "%s"] method start %s',
257+
self.name, 'is not implemented')
258+
return
259+
end
260+
261+
return self.raw:start()
262+
end
263+
264+
function tube.stop(self)
265+
if not self.raw.stop then
266+
log.warn('queue: [tube "%s"] method stop %s',
267+
self.name, 'is not implemented')
268+
return
269+
end
270+
271+
return self.raw:stop()
272+
end
273+
200274
function tube.delete(self, id)
275+
if not queue_state.check() then
276+
return nil
277+
end
201278
self:peek(id)
202279
return self.raw:normalize_task(self.raw:delete(id))
203280
end
204281

205282
-- drop tube
206283
function tube.drop(self)
284+
if not queue_state.check() then
285+
return nil
286+
end
207287
local tube_name = self.name
208288

209289
local tube = box.space._queue:get{tube_name}
@@ -238,6 +318,9 @@ end
238318
-- truncate tube
239319
-- (delete everything from tube)
240320
function tube.truncate(self)
321+
if not queue_state.check() then
322+
return
323+
end
241324
self.raw:truncate()
242325
end
243326

@@ -248,6 +331,9 @@ function tube.on_task_change(self, cb)
248331
end
249332

250333
function tube.grant(self, user, args)
334+
if not queue_state.check() then
335+
return
336+
end
251337
local function tube_grant_space(user, name, tp)
252338
box.schema.user.grant(user, tp or 'read,write', 'space', name, {
253339
if_not_exists = true,
@@ -296,6 +382,8 @@ local required_driver_methods = {
296382
'bury',
297383
'kick',
298384
'peek',
385+
'stop',
386+
'start',
299387
'touch',
300388
'truncate',
301389
'tasks_by_state'
@@ -406,6 +494,10 @@ local function release_session_tasks(session_uuid)
406494
end
407495
end
408496

497+
function method.state()
498+
return queue_state.show()
499+
end
500+
409501
function method._on_consumer_disconnect()
410502
local conn_id = connection.id()
411503

@@ -449,6 +541,9 @@ end
449541
-------------------------------------------------------------------------------
450542
-- create tube
451543
function method.create_tube(tube_name, tube_type, opts)
544+
if not queue_state.check() then
545+
return
546+
end
452547
opts = opts or {}
453548
if opts.if_not_exists == nil then
454549
opts.if_not_exists = false
@@ -551,7 +646,7 @@ function method.start()
551646
if _taken == nil then
552647
-- tube_id, task_id, connection_id, session_uuid, time
553648
_taken = box.schema.create_space('_queue_taken_2', {
554-
temporary = true,
649+
temporary = false,
555650
format = {
556651
{name = 'tube_id', type = num_type()},
557652
{name = 'task_id', type = num_type()},
@@ -570,6 +665,10 @@ function method.start()
570665
parts = {4, str_type()},
571666
unique = false
572667
})
668+
else
669+
if _taken.temporary then
670+
_taken:alter{temporary = false}
671+
end
573672
end
574673

575674
for _, tube_tuple in _queue:pairs() do
@@ -586,6 +685,8 @@ function method.start()
586685
session.start()
587686

588687
connection.on_disconnect(queue._on_consumer_disconnect)
688+
queue_state.set_running()
689+
queue_state.create_state_fiber(queue.tube)
589690
return queue
590691
end
591692

queue/abstract/driver/fifo.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,14 @@ function method.truncate(self)
148148
self.space:truncate()
149149
end
150150

151+
-- this driver has no background activity
152+
-- implement dummy methods for the API requirement
153+
function method.start()
154+
return
155+
end
156+
157+
function method.stop()
158+
return
159+
end
160+
151161
return tube

queue/abstract/driver/fifottl.lua

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local log = require('log')
2+
local ffi = require('ffi')
23
local fiber = require('fiber')
34
local state = require('queue.abstract.state')
45

@@ -159,6 +160,14 @@ local function fifottl_fiber(self)
159160
while true do
160161
if box.info.ro == false then
161162
local stat, err = pcall(fifottl_fiber_iteration, self, processed)
163+
164+
if ffi.istype("struct error", err) then
165+
if err.type == "FiberIsCancelled" then
166+
log.info("fifottl was cancelled")
167+
break
168+
end
169+
end
170+
162171
if not stat and not (err.code == box.error.READONLY) then
163172
log.error("error catched: %s", tostring(err))
164173
log.error("exiting fiber '%s'", fiber.name())
@@ -365,4 +374,21 @@ function method.truncate(self)
365374
self.space:truncate()
366375
end
367376

377+
function method.start(self)
378+
if self.fiber then
379+
log.info('fifottl fiber is already started')
380+
return
381+
end
382+
self.fiber = fiber.create(fifottl_fiber, self)
383+
end
384+
385+
function method.stop(self)
386+
if not self.fiber then
387+
log.info('fifottl fiber is not started')
388+
return
389+
end
390+
self.fiber:cancel()
391+
self.fiber = nil
392+
end
393+
368394
return tube

queue/abstract/driver/utube.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,14 @@ function method.truncate(self)
175175
self.space:truncate()
176176
end
177177

178+
-- this driver has no background activity
179+
-- implement dummy methods for the API requirement
180+
function method.start()
181+
return
182+
end
183+
184+
function method.stop()
185+
return
186+
end
187+
178188
return tube

queue/abstract/driver/utubettl.lua

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local log = require('log')
2+
local ffi = require('ffi')
23
local fiber = require('fiber')
34

45
local state = require('queue.abstract.state')
@@ -167,6 +168,14 @@ local function utubettl_fiber(self)
167168
while true do
168169
if box.info.ro == false then
169170
local stat, err = pcall(utubettl_fiber_iteration, self, processed)
171+
172+
if ffi.istype("struct error", err) then
173+
if err.type == "FiberIsCancelled" then
174+
log.info("utubettl was cancelled")
175+
break
176+
end
177+
end
178+
170179
if not stat and not err.code == box.error.READONLY then
171180
log.error("error catched: %s", tostring(err))
172181
log.error("exiting fiber '%s'", fiber.name())
@@ -392,4 +401,21 @@ function method.truncate(self)
392401
self.space:truncate()
393402
end
394403

404+
function method.start(self)
405+
if self.fiber then
406+
log.info('utubettl fiber is already started')
407+
return
408+
end
409+
self.fiber = fiber.create(utubettl_fiber, self)
410+
end
411+
412+
function method.stop(self)
413+
if not self.fiber then
414+
log.info('utubettl fiber is not started')
415+
return
416+
end
417+
self.fiber:cancel()
418+
self.fiber = nil
419+
end
420+
395421
return tube

0 commit comments

Comments
 (0)