1212 delete_immediately /1 , delete_exclusive /2 , delete /4 , purge /1 ,
1313 forget_all_durable /1 ]).
1414-export ([pseudo_queue /2 , pseudo_queue /3 , immutable /1 ]).
15- -export ([exists /1 , lookup /1 , lookup /2 , lookup_many /1 ,
15+ -export ([exists /1 , lookup /1 , lookup /2 , lookup_many /1 , lookup_durable_queue / 1 ,
1616 not_found_or_absent_dirty /1 ,
1717 with /2 , with /3 , with_or_die /2 ,
1818 assert_equivalence /5 ,
@@ -162,24 +162,28 @@ start(Qs) ->
162162 amqqueue :is_classic (Q )],
163163 ok .
164164
165- mark_local_durable_queues_stopped (VHost ) ->
166- Qs0 = find_local_durable_queues (VHost ),
167- Qs = [amqqueue :set_state (Q , stopped )
168- || Q <- Qs0 , amqqueue :get_type (Q ) =:= rabbit_classic_queue ,
169- amqqueue :get_state (Q ) =/= stopped ],
170- rabbit_db_queue :insert (Qs ).
171-
172- find_local_durable_queues (VHost ) ->
173- Qs = rabbit_db_queue :get_all_durable (VHost ),
174- lists :filter (fun (Q ) ->
175- rabbit_queue_type :is_recoverable (Q )
176- end , Qs ).
165+ mark_local_durable_queues_stopped (VHostName ) ->
166+ rabbit_db_queue :update_durable (
167+ fun (Q ) ->
168+ amqqueue :set_state (Q , stopped )
169+ end ,
170+ fun (Q ) ->
171+ amqqueue :get_vhost (Q ) =:= VHostName andalso
172+ rabbit_queue_type :is_recoverable (Q ) andalso
173+ amqqueue :get_type (Q ) =:= rabbit_classic_queue andalso
174+ amqqueue :get_state (Q ) =/= stopped
175+ end ).
176+
177+ find_local_durable_queues (VHostName ) ->
178+ rabbit_db_queue :filter_all_durable (fun (Q ) ->
179+ amqqueue :get_vhost (Q ) =:= VHostName andalso
180+ rabbit_queue_type :is_recoverable (Q )
181+ end ).
177182
178183find_recoverable_queues () ->
179- Qs = rabbit_db_queue :get_all_durable (),
180- lists :filter (fun (Q ) ->
181- rabbit_queue_type :is_recoverable (Q )
182- end , Qs ).
184+ rabbit_db_queue :filter_all_durable (fun (Q ) ->
185+ rabbit_queue_type :is_recoverable (Q )
186+ end ).
183187
184188- spec declare (name (),
185189 boolean (),
@@ -248,13 +252,12 @@ internal_declare(Q, Recover) ->
248252
249253do_internal_declare (Q0 , true ) ->
250254 Q = amqqueue :set_state (Q0 , live ),
251- store_queue (Q ),
255+ ok = store_queue (Q ),
252256 {created , Q0 };
253257do_internal_declare (Q0 , false ) ->
254258 Q = rabbit_policy :set (amqqueue :set_state (Q0 , live )),
255259 Queue = rabbit_queue_decorator :set (Q ),
256- DurableQueue = amqqueue :reset_mirroring_and_decorators (Q ),
257- rabbit_db_queue :create_or_get (DurableQueue , Queue ).
260+ rabbit_db_queue :create_or_get (Queue ).
258261
259262- spec update
260263 (name (), fun ((amqqueue :amqqueue ()) -> amqqueue :amqqueue ())) ->
@@ -272,8 +275,7 @@ ensure_rabbit_queue_record_is_initialized(Q) ->
272275
273276store_queue (Q0 ) ->
274277 Q = rabbit_queue_decorator :set (Q0 ),
275- DurableQ = amqqueue :reset_mirroring_and_decorators (Q0 ),
276- rabbit_db_queue :insert (DurableQ , Q ).
278+ rabbit_db_queue :set (Q ).
277279
278280- spec update_decorators (name ()) -> 'ok' .
279281
@@ -316,14 +318,17 @@ is_server_named_allowed(Args) ->
316318 ([name ()]) ->
317319 [amqqueue :amqqueue ()].
318320
319- lookup ([]) -> []; % % optimisation
320- lookup (Names ) ->
321- rabbit_db_queue :get (Names ).
321+ lookup (Name ) when is_record (Name , resource ) ->
322+ rabbit_db_queue :get (Name ).
323+
324+ lookup_durable_queue (QName ) ->
325+ rabbit_db_queue :get_durable (QName ).
322326
323327- spec lookup_many ([name ()]) -> [amqqueue :amqqueue ()].
324328
329+ lookup_many ([]) -> []; % % optimisation
325330lookup_many (Names ) when is_list (Names ) ->
326- lookup (Names ).
331+ rabbit_db_queue : get_many (Names ).
327332
328333- spec lookup (binary (), binary ()) ->
329334 rabbit_types :ok (amqqueue :amqqueue ()) |
@@ -341,7 +346,15 @@ exists(Name) ->
341346- spec not_found_or_absent_dirty (name ()) -> not_found_or_absent ().
342347
343348not_found_or_absent_dirty (Name ) ->
344- rabbit_db_queue :not_found_or_absent_queue_dirty (Name ).
349+ % % We should read from both tables inside a tx, to get a
350+ % % consistent view. But the chances of an inconsistency are small,
351+ % % and only affect the error kind.
352+ case rabbit_db_queue :get_durable (Name ) of
353+ {error , not_found } ->
354+ not_found ;
355+ {ok , Q } ->
356+ {absent , Q , nodedown }
357+ end .
345358
346359- spec get_rebalance_lock (pid ()) ->
347360 {true , {rebalance_queues , pid ()}} | false .
@@ -542,7 +555,7 @@ with(#resource{} = Name, F, E, RetriesLeft) ->
542555 fun () -> retry_wait (Q , F , E , RetriesLeft ) end ,
543556 fun () -> F (Q ) end );
544557 {error , not_found } ->
545- E (rabbit_db_queue : not_found_or_absent_queue_dirty (Name ))
558+ E (not_found_or_absent_dirty (Name ))
546559 end .
547560
548561- spec retry_wait (amqqueue :amqqueue (),
@@ -1239,16 +1252,18 @@ list_down(VHostPath) ->
12391252 false -> [];
12401253 true ->
12411254 Alive = sets :from_list ([amqqueue :get_name (Q ) || Q <- list (VHostPath )]),
1242- Durable = rabbit_db_queue :get_all_durable (VHostPath ),
12431255 NodesRunning = rabbit_nodes :all_running (),
1244- lists :filter (fun (Q ) ->
1245- N = amqqueue :get_name (Q ),
1246- Pid = amqqueue :get_pid (Q ),
1247- St = amqqueue :get_state (Q ),
1248- (St =:= stopped andalso not lists :member (node (Pid ), NodesRunning ))
1249- orelse
1250- (not sets :is_element (N , Alive ))
1251- end , Durable )
1256+ rabbit_db_queue :filter_all_durable (
1257+ fun (Q ) ->
1258+ N = amqqueue :get_name (Q ),
1259+ Pid = amqqueue :get_pid (Q ),
1260+ St = amqqueue :get_state (Q ),
1261+ amqqueue :get_vhost (Q ) =:= VHostPath
1262+ andalso
1263+ ((St =:= stopped andalso not lists :member (node (Pid ), NodesRunning ))
1264+ orelse
1265+ (not sets :is_element (N , Alive )))
1266+ end )
12521267 end .
12531268
12541269count (VHost ) ->
@@ -1671,7 +1686,7 @@ internal_delete(QueueName, ActingUser, Reason) ->
16711686 ok ->
16721687 ok ;
16731688 Deletions ->
1674- rabbit_binding :process_deletions (Deletions ),
1689+ _ = rabbit_binding :process_deletions (Deletions ),
16751690 rabbit_binding :notify_deletions (Deletions , ? INTERNAL_USER ),
16761691 rabbit_core_metrics :queue_deleted (QueueName ),
16771692 ok = rabbit_event :notify (queue_deleted ,
@@ -1683,12 +1698,12 @@ internal_delete(QueueName, ActingUser, Reason) ->
16831698
16841699forget_all_durable (Node ) ->
16851700 UpdateFun = fun (Q ) ->
1686- forget_node_for_queue (Node , Q )
1687- end ,
1701+ forget_node_for_queue (Node , Q )
1702+ end ,
16881703 FilterFun = fun (Q ) ->
16891704 is_local_to_node (amqqueue :get_pid (Q ), Node )
16901705 end ,
1691- rabbit_db_queue :match_and_update ( amqqueue : pattern_match_all (), UpdateFun , FilterFun ).
1706+ rabbit_db_queue :foreach_durable ( UpdateFun , FilterFun ).
16921707
16931708% % Try to promote a mirror while down - it should recover as a
16941709% % leader. We try to take the oldest mirror here for best chance of
@@ -1717,7 +1732,11 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
17171732 {false , _ } -> forget_node_for_queue (DeadNode , T , Q );
17181733 {true , rabbit_classic_queue } ->
17191734 Q1 = amqqueue :set_pid (Q , rabbit_misc :node_to_fake_pid (H )),
1720- ok = rabbit_db_queue :insert ([Q1 ]);
1735+ % % rabbit_db_queue:set_many/1 just stores a durable queue record,
1736+ % % that is the only one required here.
1737+ % % rabbit_db_queue:set/1 writes both durable and transient, thus
1738+ % % can't be used for this operation.
1739+ ok = rabbit_db_queue :set_many ([Q1 ]);
17211740 {true , rabbit_quorum_queue } ->
17221741 ok
17231742 end .
@@ -1809,43 +1828,45 @@ has_synchronised_mirrors_online(Q) ->
18091828- spec on_node_up (node ()) -> 'ok' .
18101829
18111830on_node_up (Node ) ->
1812- rabbit_db_queue :on_node_up (Node , fun maybe_clear_recoverable_node /2 ).
1813-
1814- maybe_clear_recoverable_node (Node , Q ) ->
1815- SPids = amqqueue :get_sync_slave_pids (Q ),
1816- RSs = amqqueue :get_recoverable_slaves (Q ),
1817- case lists :member (Node , RSs ) of
1818- true ->
1819- % % There is a race with
1820- % % rabbit_mirror_queue_slave:record_synchronised/1 called
1821- % % by the incoming mirror node and this function, called
1822- % % by the leader node. If this function is executed after
1823- % % record_synchronised/1, the node is erroneously removed
1824- % % from the recoverable mirror list.
1825- % %
1826- % % We check if the mirror node's queue PID is alive. If it is
1827- % % the case, then this function is executed after. In this
1828- % % situation, we don't touch the queue record, it is already
1829- % % correct.
1830- DoClearNode =
1831- case [SP || SP <- SPids , node (SP ) =:= Node ] of
1832- [SPid ] -> not rabbit_misc :is_process_alive (SPid );
1833- _ -> true
1834- end ,
1835- if
1836- DoClearNode -> RSs1 = RSs -- [Node ],
1837- store_queue (
1838- amqqueue :set_recoverable_slaves (Q , RSs1 ));
1839- true -> ok
1840- end ;
1841- false ->
1842- ok
1831+ rabbit_db_queue :foreach_transient (maybe_clear_recoverable_node (Node )).
1832+
1833+ maybe_clear_recoverable_node (Node ) ->
1834+ fun (Q ) ->
1835+ SPids = amqqueue :get_sync_slave_pids (Q ),
1836+ RSs = amqqueue :get_recoverable_slaves (Q ),
1837+ case lists :member (Node , RSs ) of
1838+ true ->
1839+ % % There is a race with
1840+ % % rabbit_mirror_queue_slave:record_synchronised/1 called
1841+ % % by the incoming mirror node and this function, called
1842+ % % by the leader node. If this function is executed after
1843+ % % record_synchronised/1, the node is erroneously removed
1844+ % % from the recoverable mirror list.
1845+ % %
1846+ % % We check if the mirror node's queue PID is alive. If it is
1847+ % % the case, then this function is executed after. In this
1848+ % % situation, we don't touch the queue record, it is already
1849+ % % correct.
1850+ DoClearNode =
1851+ case [SP || SP <- SPids , node (SP ) =:= Node ] of
1852+ [SPid ] -> not rabbit_misc :is_process_alive (SPid );
1853+ _ -> true
1854+ end ,
1855+ if
1856+ DoClearNode -> RSs1 = RSs -- [Node ],
1857+ store_queue (
1858+ amqqueue :set_recoverable_slaves (Q , RSs1 ));
1859+ true -> ok
1860+ end ;
1861+ false ->
1862+ ok
1863+ end
18431864 end .
18441865
18451866- spec on_node_down (node ()) -> 'ok' .
18461867
18471868on_node_down (Node ) ->
1848- {Time , Ret } = timer :tc (fun () -> rabbit_db_queue :on_node_down ( Node , fun filter_transient_queues_to_delete / 2 ) end ),
1869+ {Time , Ret } = timer :tc (fun () -> rabbit_db_queue :delete_transient ( filter_transient_queues_to_delete ( Node ) ) end ),
18491870 case Ret of
18501871 ok -> ok ;
18511872 {QueueNames , Deletions } ->
@@ -1859,12 +1880,14 @@ on_node_down(Node) ->
18591880 ok
18601881 end .
18611882
1862- filter_transient_queues_to_delete (Node , Q ) ->
1863- amqqueue :qnode (Q ) == Node andalso
1864- not rabbit_mnesia :is_process_alive (amqqueue :get_pid (Q ))
1865- andalso (not amqqueue :is_classic (Q ) orelse not amqqueue :is_durable (Q ))
1866- andalso (not rabbit_amqqueue :is_replicated (Q )
1867- orelse rabbit_amqqueue :is_dead_exclusive (Q )).
1883+ filter_transient_queues_to_delete (Node ) ->
1884+ fun (Q ) ->
1885+ amqqueue :qnode (Q ) == Node andalso
1886+ not rabbit_mnesia :is_process_alive (amqqueue :get_pid (Q ))
1887+ andalso (not amqqueue :is_classic (Q ) orelse not amqqueue :is_durable (Q ))
1888+ andalso (not rabbit_amqqueue :is_replicated (Q )
1889+ orelse rabbit_amqqueue :is_dead_exclusive (Q ))
1890+ end .
18681891
18691892notify_queue_binding_deletions (QueueDeletions ) when is_list (QueueDeletions ) ->
18701893 Deletions = rabbit_binding :process_deletions (
0 commit comments