@@ -22,6 +22,7 @@ align="right">
22
22
* [ Fields of the \_ queue\_ session\_ ids space] ( #fields-of-the-_queue_session_ids-space )
23
23
* [ Fields of the space associated with each queue] ( #fields-of-the-space-associated-with-each-queue )
24
24
* [ Task state diagram] ( #task-state-diagram )
25
+ * [ Queue state diagram] ( #queue-state-diagram )
25
26
* [ Installing] ( #installing )
26
27
* [ Using the queue module] ( #using-the-queue-module )
27
28
* [ Initialization] ( #initialization )
@@ -37,7 +38,9 @@ align="right">
37
38
* [ Kicking a number of tasks] ( #kicking-a-number-of-tasks )
38
39
* [ Deleting a task] ( #deleting-a-task )
39
40
* [ Dropping a queue] ( #dropping-a-queue )
41
+ * [ Releasing all taken tasks] ( #releasing-all-taken-tasks )
40
42
* [ Getting statistics] ( #getting-statistics )
43
+ * [ Queue and replication] ( #queue-and-replication )
41
44
* [ Implementation details] ( #implementation-details )
42
45
* [ Queue drivers] ( #queue-drivers )
43
46
* [ Driver API] ( #driver-api )
@@ -256,7 +259,7 @@ space; the client waits for tasks in this queue
256
259
1 . ` timeout ` - the client wait timeout
257
260
1 . ` time ` - the time when the client took a task
258
261
259
- The ` _queue_taken_2 ` (` _queue_taken ` is deprecated) temporary space contains
262
+ The ` _queue_taken_2 ` (` _queue_taken ` is deprecated) space contains
260
263
tuples for each job which is processing a task in the queue.
261
264
262
265
## Fields of the ` _queue_taken_2 ` space
@@ -276,6 +279,11 @@ session id) to the session UUID.
276
279
1 . ` connection_id ` - connection id (numeric)
277
280
2 . ` session_uuid ` - session UUID (string)
278
281
282
+ ## Fields of the ` _queue_inactive_sessions ` space
283
+
284
+ 1 . ` uuid ` - session UUID (string)
285
+ 2 . ` exp_time ` - session expiration time (numeric)
286
+
279
287
Also, there is a space which is associated with each queue,
280
288
which is named in the ` space ` field of the ` _queue ` space.
281
289
The associated space contains one tuple for each task.
@@ -323,6 +331,48 @@ the sections of the corresponding [queue types](#queue-types).
323
331
324
332
![ Task state diagram] ( ./doc/images/statediagram.svg )
325
333
334
+ # Queue state diagram
335
+
336
+ Queue can be used in a master-replica scheme:
337
+
338
+ There are five states for queue:
339
+ * INIT
340
+ * STARTUP
341
+ * RUNNING
342
+ * ENDING
343
+ * WAITING
344
+
345
+ When the tarantool is launched for the first time,
346
+ the state of the queue is always ` INIT ` until ` box.info.ro ` is false.
347
+
348
+ States switching scheme:
349
+ ``` mermaid
350
+ flowchart LR
351
+ I(("init"))-->S[startup]
352
+ S[startup]-->R[running]
353
+ W[waiting]--> |"(ro ->rw)"| S[startup]
354
+ R[running]--> |"(ro ->rw)"| E[ending]
355
+ E[ending]-->W[waiting]
356
+ ```
357
+
358
+ Current queue state can be shown by using ` queue.state() ` method.
359
+
360
+ In the ` STARTUP ` state, the queue is waiting for possible data synchronization
361
+ with other cluster members by the time of the largest upstream lag multiplied
362
+ by two. After that, all taken tasks are released, except for tasks with
363
+ session uuid matching inactive sessions uuids. This makes possible to take
364
+ a task, switch roles on the cluster, and release the task within the timeout
365
+ specified by the ` queue.cfg({ttr = N}) ` parameter. Note: all clients that ` take() `
366
+ and do not ` ack()/release() ` tasks must be disconnected before changing the role.
367
+ And the last step in the ` STARTUP ` state is starting tube driver using new
368
+ method called ` start() ` .
369
+
370
+ In the ` RUNNING ` state, the queue is working as usually. The ` ENDING ` state calls
371
+ ` stop() ` method. in the ` WAITING ` state, the queue listens for a change in the
372
+ read_only flag.
373
+
374
+ All states except ` INIT ` is controlled by new fiber called ` queue_state_fiber ` .
375
+
326
376
# Installing
327
377
328
378
There are three alternative ways of installation.
@@ -625,6 +675,14 @@ Reverse the effect of a `create` request.
625
675
Effect: remove the tuple from the ` _queue ` space,
626
676
and drop the space associated with the queue.
627
677
678
+ ## Releasing all taken tasks
679
+
680
+ ``` lua
681
+ queue .tube .tube_name :release_all ()
682
+ ```
683
+
684
+ Forcibly returns all taken tasks to a ready state.
685
+
628
686
## Getting statistics
629
687
630
688
``` lua
@@ -667,6 +725,84 @@ queue.statistics('list_of_sites')
667
725
...
668
726
```
669
727
728
+ ## Queue and replication
729
+
730
+ Usage example:
731
+
732
+ ``` lua
733
+ -- Instance file for the master.
734
+ box .cfg {
735
+ listen = 3301 ,
736
+ replication = {' replicator:password@127.0.0.1:3301' , -- Master URI.
737
+ ' replicator:password@127.0.0.1:3302' }, -- Replica URI.
738
+ read_only = false ,
739
+ }
740
+
741
+ box .once (" schema" , function ()
742
+ box .schema .user .create (' replicator' , {password = ' password' })
743
+ box .schema .user .grant (' replicator' , ' replication' ) -- grant replication role
744
+ end )
745
+
746
+ queue = require (" queue" )
747
+ queue .cfg ({ttr = 300 }) -- Clean up session after 5 minutes after disconnect.
748
+ require (' console' ).start ()
749
+ os.exit ()
750
+ ```
751
+
752
+ ``` lua
753
+ -- Instance file for the replica.
754
+ box .cfg {
755
+ listen = 3302 ,
756
+ replication = {' replicator:password@127.0.0.1:3301' , -- Master URI.
757
+ ' replicator:password@127.0.0.1:3302' }, -- Replica URI.
758
+ read_only = true
759
+ }
760
+
761
+ queue = require (" queue" )
762
+ queue .cfg ({ttr = 300 }) -- Clean up session after 5 minutes after disconnect.
763
+ require (' console' ).start ()
764
+ os.exit ()
765
+ ```
766
+
767
+ Start master and replica instances and check queue state:
768
+
769
+ Master:
770
+ ``` sh
771
+ tarantool> queue.state ()
772
+ ---
773
+ - RUNNING
774
+ ...
775
+ ```
776
+
777
+ Replica:
778
+ ``` sh
779
+ tarantool> queue.state ()
780
+ ---
781
+ - INIT
782
+ ...
783
+ ```
784
+
785
+ Now reverse the ` read_only ` setting of the master and replica and check the
786
+ status of the queue again.
787
+
788
+ Master:
789
+ ``` sh
790
+ tarantool> box.cfg({read_only = true})
791
+ tarantool> queue.state ()
792
+ ---
793
+ - WAITING
794
+ ...
795
+ ```
796
+
797
+ Replica:
798
+ ``` sh
799
+ tarantool> box.cfg({read_only = false})
800
+ tarantool> queue.state ()
801
+ ---
802
+ - RUNNING
803
+ ...
804
+ ```
805
+
670
806
# Implementation details
671
807
672
808
The implementation is based on the common functions for all queues:
@@ -711,6 +847,11 @@ Driver class must implement the following API:
711
847
* space name
712
848
* space options
713
849
850
+ Driver class should implement the following API:
851
+
852
+ 1 . ` start ` - initialize internal resources if any, e.g. start fibers.
853
+ 1 . ` stop ` - clean up internal resources if any, e.g. stop fibers.
854
+
714
855
To sum up, when the user creates a new queue, the queue framework
715
856
passes the request to the driver, asking it to create a space to
716
857
support this queue, and then creates a driver instance, passing to it
0 commit comments