@@ -22,6 +22,7 @@ align="right">
22
22
* [ Fields of the \_ queue\_ session\_ ids space] ( #fields-of-the-_queue_session_ids-space )
23
23
* [ Fields of the space associated with each queue] ( #fields-of-the-space-associated-with-each-queue )
24
24
* [ Task state diagram] ( #task-state-diagram )
25
+ * [ Queue state diagram] ( #queue-state-diagram )
25
26
* [ Installing] ( #installing )
26
27
* [ Using the queue module] ( #using-the-queue-module )
27
28
* [ Initialization] ( #initialization )
@@ -37,7 +38,9 @@ align="right">
37
38
* [ Kicking a number of tasks] ( #kicking-a-number-of-tasks )
38
39
* [ Deleting a task] ( #deleting-a-task )
39
40
* [ Dropping a queue] ( #dropping-a-queue )
41
+ * [ Releasing all taken tasks] ( #releasing-all-taken-tasks )
40
42
* [ Getting statistics] ( #getting-statistics )
43
+ * [ Queue and replication] ( #queue-and-replication )
41
44
* [ Implementation details] ( #implementation-details )
42
45
* [ Queue drivers] ( #queue-drivers )
43
46
* [ Driver API] ( #driver-api )
@@ -256,7 +259,7 @@ space; the client waits for tasks in this queue
256
259
1 . ` timeout ` - the client wait timeout
257
260
1 . ` time ` - the time when the client took a task
258
261
259
- The ` _queue_taken_2 ` (` _queue_taken ` is deprecated) temporary space contains
262
+ The ` _queue_taken_2 ` (` _queue_taken ` is deprecated) space contains
260
263
tuples for each job which is processing a task in the queue.
261
264
262
265
## Fields of the ` _queue_taken_2 ` space
@@ -276,6 +279,11 @@ session id) to the session UUID.
276
279
1 . ` connection_id ` - connection id (numeric)
277
280
2 . ` session_uuid ` - session UUID (string)
278
281
282
+ ## Fields of the ` _queue_inactive_sessions ` space
283
+
284
+ 1 . ` uuid ` - session UUID (string)
285
+ 2 . ` exp_time ` - session expiration time (numeric)
286
+
279
287
Also, there is a space which is associated with each queue,
280
288
which is named in the ` space ` field of the ` _queue ` space.
281
289
The associated space contains one tuple for each task.
@@ -338,6 +346,48 @@ flowchart LR
338
346
DELAYED--> |"delete()"| DONE
339
347
```
340
348
349
+ # Queue state diagram
350
+
351
+ Queue can be used in a master-replica scheme:
352
+
353
+ There are five states for queue:
354
+ * INIT
355
+ * STARTUP
356
+ * RUNNING
357
+ * ENDING
358
+ * WAITING
359
+
360
+ When the tarantool is launched for the first time,
361
+ the state of the queue is always ` INIT ` until ` box.info.ro ` is false.
362
+
363
+ States switching scheme:
364
+ ``` mermaid
365
+ flowchart LR
366
+ I(("init"))-->S[startup]
367
+ S[startup]-->R[running]
368
+ W[waiting]--> |"(ro ->rw)"| S[startup]
369
+ R[running]--> |"(ro ->rw)"| E[ending]
370
+ E[ending]-->W[waiting]
371
+ ```
372
+
373
+ Current queue state can be shown by using ` queue.state() ` method.
374
+
375
+ In the ` STARTUP ` state, the queue is waiting for possible data synchronization
376
+ with other cluster members by the time of the largest upstream lag multiplied
377
+ by two. After that, all taken tasks are released, except for tasks with
378
+ session uuid matching inactive sessions uuids. This makes possible to take
379
+ a task, switch roles on the cluster, and release the task within the timeout
380
+ specified by the ` queue.cfg({ttr = N}) ` parameter. Note: all clients that ` take() `
381
+ and do not ` ack()/release() ` tasks must be disconnected before changing the role.
382
+ And the last step in the ` STARTUP ` state is starting tube driver using new
383
+ method called ` start() ` .
384
+
385
+ In the ` RUNNING ` state, the queue is working as usually. The ` ENDING ` state calls
386
+ ` stop() ` method. in the ` WAITING ` state, the queue listens for a change in the
387
+ read_only flag.
388
+
389
+ All states except ` INIT ` is controlled by new fiber called ` queue_state_fiber ` .
390
+
341
391
# Installing
342
392
343
393
There are three alternative ways of installation.
@@ -640,6 +690,14 @@ Reverse the effect of a `create` request.
640
690
Effect: remove the tuple from the ` _queue ` space,
641
691
and drop the space associated with the queue.
642
692
693
+ ## Releasing all taken tasks
694
+
695
+ ``` lua
696
+ queue .tube .tube_name :release_all ()
697
+ ```
698
+
699
+ Forcibly returns all taken tasks to a ready state.
700
+
643
701
## Getting statistics
644
702
645
703
``` lua
@@ -682,6 +740,84 @@ queue.statistics('list_of_sites')
682
740
...
683
741
```
684
742
743
+ ## Queue and replication
744
+
745
+ Usage example:
746
+
747
+ ``` lua
748
+ -- Instance file for the master.
749
+ box .cfg {
750
+ listen = 3301 ,
751
+ replication = {' replicator:password@127.0.0.1:3301' , -- Master URI.
752
+ ' replicator:password@127.0.0.1:3302' }, -- Replica URI.
753
+ read_only = false ,
754
+ }
755
+
756
+ box .once (" schema" , function ()
757
+ box .schema .user .create (' replicator' , {password = ' password' })
758
+ box .schema .user .grant (' replicator' , ' replication' ) -- grant replication role
759
+ end )
760
+
761
+ queue = require (" queue" )
762
+ queue .cfg ({ttr = 300 }) -- Clean up session after 5 minutes after disconnect.
763
+ require (' console' ).start ()
764
+ os.exit ()
765
+ ```
766
+
767
+ ``` lua
768
+ -- Instance file for the replica.
769
+ box .cfg {
770
+ listen = 3302 ,
771
+ replication = {' replicator:password@127.0.0.1:3301' , -- Master URI.
772
+ ' replicator:password@127.0.0.1:3302' }, -- Replica URI.
773
+ read_only = true
774
+ }
775
+
776
+ queue = require (" queue" )
777
+ queue .cfg ({ttr = 300 }) -- Clean up session after 5 minutes after disconnect.
778
+ require (' console' ).start ()
779
+ os.exit ()
780
+ ```
781
+
782
+ Start master and replica instances and check queue state:
783
+
784
+ Master:
785
+ ``` sh
786
+ tarantool> queue.state ()
787
+ ---
788
+ - RUNNING
789
+ ...
790
+ ```
791
+
792
+ Replica:
793
+ ``` sh
794
+ tarantool> queue.state ()
795
+ ---
796
+ - INIT
797
+ ...
798
+ ```
799
+
800
+ Now reverse the ` read_only ` setting of the master and replica and check the
801
+ status of the queue again.
802
+
803
+ Master:
804
+ ``` sh
805
+ tarantool> box.cfg({read_only = true})
806
+ tarantool> queue.state ()
807
+ ---
808
+ - WAITING
809
+ ...
810
+ ```
811
+
812
+ Replica:
813
+ ``` sh
814
+ tarantool> box.cfg({read_only = false})
815
+ tarantool> queue.state ()
816
+ ---
817
+ - RUNNING
818
+ ...
819
+ ```
820
+
685
821
# Implementation details
686
822
687
823
The implementation is based on the common functions for all queues:
@@ -725,6 +861,8 @@ Driver class must implement the following API:
725
861
1 . ` create_space ` - creates the supporting space. The arguments are:
726
862
* space name
727
863
* space options
864
+ 1 . ` start ` - initialize internal resources if any, e.g. start fibers.
865
+ 1 . ` stop ` - clean up internal resources if any, e.g. stop fibers.
728
866
729
867
To sum up, when the user creates a new queue, the queue framework
730
868
passes the request to the driver, asking it to create a space to
0 commit comments