@@ -1285,6 +1285,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
1285
1285
confirm_enabled = ConfirmEnabled ,
1286
1286
delivery_flow = Flow
1287
1287
}) ->
1288
+ rabbit_global_counters :messages_received (amqp091 , 1 ),
1288
1289
check_msg_size (Content , MaxMessageSize , GCThreshold ),
1289
1290
ExchangeName = rabbit_misc :r (VHostPath , exchange , ExchangeNameBin ),
1290
1291
check_write_permitted (ExchangeName , User , AuthzContext ),
@@ -1302,7 +1303,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
1302
1303
{MsgSeqNo , State1 } =
1303
1304
case DoConfirm orelse Mandatory of
1304
1305
false -> {undefined , State };
1305
- true -> SeqNo = State # ch .publish_seqno ,
1306
+ true -> rabbit_global_counters :messages_received_confirm (amqp091 , 1 ),
1307
+ SeqNo = State # ch .publish_seqno ,
1306
1308
{SeqNo , State # ch {publish_seqno = SeqNo + 1 }}
1307
1309
end ,
1308
1310
case rabbit_basic :message (ExchangeName , RoutingKey , DecodedContent ) of
@@ -1314,9 +1316,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
1314
1316
Username , TraceState ),
1315
1317
DQ = {Delivery # delivery {flow = Flow }, QNames },
1316
1318
{noreply , case Tx of
1317
- none -> deliver_to_queues (DQ , State1 );
1318
- {Msgs , Acks } -> Msgs1 = ? QUEUE :in (DQ , Msgs ),
1319
- State1 # ch {tx = {Msgs1 , Acks }}
1319
+ none ->
1320
+ deliver_to_queues (DQ , State1 );
1321
+ {Msgs , Acks } ->
1322
+ Msgs1 = ? QUEUE :in (DQ , Msgs ),
1323
+ State1 # ch {tx = {Msgs1 , Acks }}
1320
1324
end };
1321
1325
{error , Reason } ->
1322
1326
precondition_failed (" invalid message: ~p " , [Reason ])
@@ -1360,14 +1364,14 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
1360
1364
DeliveryTag , QueueStates0 )
1361
1365
end ) of
1362
1366
{ok , MessageCount , Msg , QueueStates } ->
1367
+ {ok , QueueType } = rabbit_queue_type :module (QueueName , QueueStates ),
1363
1368
handle_basic_get (WriterPid , DeliveryTag , NoAck , MessageCount , Msg ,
1364
- State # ch {queue_states = QueueStates });
1369
+ QueueType , State # ch {queue_states = QueueStates });
1365
1370
{empty , QueueStates } ->
1371
+ {ok , QueueType } = rabbit_queue_type :module (QueueName , QueueStates ),
1372
+ rabbit_global_counters :messages_get_empty (amqp091 , QueueType , 1 ),
1366
1373
? INCR_STATS (queue_stats , QueueName , 1 , get_empty , State ),
1367
1374
{reply , # 'basic.get_empty' {}, State # ch {queue_states = QueueStates }};
1368
- empty ->
1369
- ? INCR_STATS (queue_stats , QueueName , 1 , get_empty , State ),
1370
- {reply , # 'basic.get_empty' {}, State };
1371
1375
{error , {unsupported , single_active_consumer }} ->
1372
1376
rabbit_misc :protocol_error (
1373
1377
resource_locked ,
@@ -1692,9 +1696,9 @@ handle_method(#'tx.select'{}, _, State) ->
1692
1696
handle_method (# 'tx.commit' {}, _ , # ch {tx = none }) ->
1693
1697
precondition_failed (" channel is not transactional" );
1694
1698
1695
- handle_method (# 'tx.commit' {}, _ , State = # ch {tx = {Msgs , Acks },
1699
+ handle_method (# 'tx.commit' {}, _ , State = # ch {tx = {Deliveries , Acks },
1696
1700
limiter = Limiter }) ->
1697
- State1 = queue_fold (fun deliver_to_queues /2 , State , Msgs ),
1701
+ State1 = queue_fold (fun deliver_to_queues /2 , State , Deliveries ),
1698
1702
Rev = fun (X ) -> lists :reverse (lists :sort (X )) end ,
1699
1703
{State2 , Actions2 } =
1700
1704
lists :foldl (fun ({ack , A }, {Acc , Actions }) ->
@@ -1954,7 +1958,7 @@ internal_reject(Requeue, Acked, Limiter,
1954
1958
ok = notify_limiter (Limiter , Acked ),
1955
1959
{State # ch {queue_states = QueueStates }, Actions }.
1956
1960
1957
- record_sent (Type , Tag , AckRequired ,
1961
+ record_sent (Type , QueueType , Tag , AckRequired ,
1958
1962
Msg = {QName , _QPid , MsgId , Redelivered , _Message },
1959
1963
State = # ch {cfg = # conf {channel = ChannelNum ,
1960
1964
trace_state = TraceState ,
@@ -1964,15 +1968,28 @@ record_sent(Type, Tag, AckRequired,
1964
1968
unacked_message_q = UAMQ ,
1965
1969
next_tag = DeliveryTag
1966
1970
}) ->
1967
- ? INCR_STATS (queue_stats , QName , 1 , case {Type , AckRequired } of
1968
- {get , true } -> get ;
1969
- {get , false } -> get_no_ack ;
1970
- {deliver , true } -> deliver ;
1971
- {deliver , false } -> deliver_no_ack
1972
- end , State ),
1971
+ rabbit_global_counters :messages_delivered (amqp091 , QueueType , 1 ),
1972
+ ? INCR_STATS (queue_stats , QName , 1 ,
1973
+ case {Type , AckRequired } of
1974
+ {get , true } ->
1975
+ rabbit_global_counters :messages_delivered_get_manual_ack (amqp091 , QueueType , 1 ),
1976
+ get ;
1977
+ {get , false } ->
1978
+ rabbit_global_counters :messages_delivered_get_auto_ack (amqp091 , QueueType , 1 ),
1979
+ get_no_ack ;
1980
+ {deliver , true } ->
1981
+ rabbit_global_counters :messages_delivered_consume_manual_ack (amqp091 , QueueType , 1 ),
1982
+ deliver ;
1983
+ {deliver , false } ->
1984
+ rabbit_global_counters :messages_delivered_consume_auto_ack (amqp091 , QueueType , 1 ),
1985
+ deliver_no_ack
1986
+ end , State ),
1973
1987
case Redelivered of
1974
- true -> ? INCR_STATS (queue_stats , QName , 1 , redeliver , State );
1975
- false -> ok
1988
+ true ->
1989
+ rabbit_global_counters :messages_redelivered (amqp091 , QueueType , 1 ),
1990
+ ? INCR_STATS (queue_stats , QName , 1 , redeliver , State );
1991
+ false ->
1992
+ ok
1976
1993
end ,
1977
1994
DeliveredAt = os :system_time (millisecond ),
1978
1995
rabbit_trace :tap_out (Msg , ConnName , ChannelNum , Username , TraceState ),
@@ -2034,8 +2051,14 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) ->
2034
2051
ok = notify_limiter (State # ch .limiter , Acked ),
2035
2052
{State # ch {queue_states = QueueStates }, Actions }.
2036
2053
2037
- incr_queue_stats (QName , MsgIds , State ) ->
2054
+ incr_queue_stats (QName , MsgIds , State = # ch { queue_states = QueueStates } ) ->
2038
2055
Count = length (MsgIds ),
2056
+ case rabbit_queue_type :module (QName , QueueStates ) of
2057
+ {ok , QueueType } ->
2058
+ rabbit_global_counters :messages_acknowledged (amqp091 , QueueType , Count );
2059
+ _ ->
2060
+ noop
2061
+ end ,
2039
2062
? INCR_STATS (queue_stats , QName , Count , ack , State ).
2040
2063
2041
2064
% % {Msgs, Acks}
@@ -2108,15 +2131,16 @@ notify_limiter(Limiter, Acked) ->
2108
2131
deliver_to_queues ({# delivery {message = # basic_message {exchange_name = XName },
2109
2132
confirm = false ,
2110
2133
mandatory = false },
2111
- _RoutedToQueueNames = []}, State ) -> % % optimisation
2134
+ _RoutedToQueueNames = []}, State ) -> % % optimisation when there are no queues
2112
2135
? INCR_STATS (exchange_stats , XName , 1 , publish , State ),
2136
+ rabbit_global_counters :messages_unroutable_dropped (amqp091 , 1 ),
2113
2137
? INCR_STATS (exchange_stats , XName , 1 , drop_unroutable , State ),
2114
2138
State ;
2115
2139
deliver_to_queues ({Delivery = # delivery {message = Message = # basic_message {exchange_name = XName },
2116
2140
mandatory = Mandatory ,
2117
2141
confirm = Confirm ,
2118
2142
msg_seq_no = MsgSeqNo },
2119
- _RoutedToQueueNames = [QName ]}, State0 = # ch {queue_states = QueueStates0 }) ->
2143
+ _RoutedToQueueNames = [QName ]}, State0 = # ch {queue_states = QueueStates0 }) -> % % optimisation when there is one queue
2120
2144
AllNames = case rabbit_amqqueue :lookup (QName ) of
2121
2145
{ok , Q0 } ->
2122
2146
case amqqueue :get_options (Q0 ) of
@@ -2128,6 +2152,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
2128
2152
Qs = rabbit_amqqueue :lookup (AllNames ),
2129
2153
case rabbit_queue_type :deliver (Qs , Delivery , QueueStates0 ) of
2130
2154
{ok , QueueStates , Actions } ->
2155
+ rabbit_global_counters :messages_routed (amqp091 , length (Qs )),
2131
2156
% % NB: the order here is important since basic.returns must be
2132
2157
% % sent before confirms.
2133
2158
ok = process_routing_mandatory (Mandatory , Qs , Message , State0 ),
@@ -2164,6 +2189,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
2164
2189
end ,
2165
2190
case rabbit_queue_type :deliver (Qs , Delivery , QueueStates0 ) of
2166
2191
{ok , QueueStates , Actions } ->
2192
+ rabbit_global_counters :messages_routed (amqp091 , length (Qs )),
2167
2193
% % NB: the order here is important since basic.returns must be
2168
2194
% % sent before confirms.
2169
2195
ok = process_routing_mandatory (Mandatory , Qs , Message , State0 ),
@@ -2213,11 +2239,13 @@ infer_extra_bcc(Qs) ->
2213
2239
process_routing_mandatory (_Mandatory = true ,
2214
2240
_RoutedToQs = [],
2215
2241
Msg , State ) ->
2242
+ rabbit_global_counters :messages_unroutable_returned (amqp091 , 1 ),
2216
2243
ok = basic_return (Msg , State , no_route ),
2217
2244
ok ;
2218
2245
process_routing_mandatory (_Mandatory = false ,
2219
2246
_RoutedToQs = [],
2220
2247
# basic_message {exchange_name = ExchangeName }, State ) ->
2248
+ rabbit_global_counters :messages_unroutable_dropped (amqp091 , 1 ),
2221
2249
? INCR_STATS (exchange_stats , ExchangeName , 1 , drop_unroutable , State ),
2222
2250
ok ;
2223
2251
process_routing_mandatory (_ , _ , _ , _ ) ->
@@ -2245,6 +2273,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
2245
2273
case rabbit_node_monitor :pause_partition_guard () of
2246
2274
ok ->
2247
2275
Confirms = lists :append (C ),
2276
+ rabbit_global_counters :messages_confirmed (amqp091 , length (Confirms )),
2248
2277
Rejects = lists :append (R ),
2249
2278
ConfirmMsgSeqNos =
2250
2279
lists :foldl (
@@ -2721,8 +2750,9 @@ handle_deliver0(ConsumerTag, AckRequired,
2721
2750
redelivered = Redelivered ,
2722
2751
exchange = ExchangeName # resource .name ,
2723
2752
routing_key = RoutingKey },
2724
- case rabbit_queue_type :module (QName , Qs ) of
2725
- {ok , rabbit_classic_queue } ->
2753
+ {ok , QueueType } = rabbit_queue_type :module (QName , Qs ),
2754
+ case QueueType of
2755
+ rabbit_classic_queue ->
2726
2756
ok = rabbit_writer :send_command_and_notify (
2727
2757
WriterPid , QPid , self (), Deliver , Content );
2728
2758
_ ->
@@ -2732,13 +2762,14 @@ handle_deliver0(ConsumerTag, AckRequired,
2732
2762
undefined -> ok ;
2733
2763
_ -> rabbit_basic :maybe_gc_large_msg (Content , GCThreshold )
2734
2764
end ,
2735
- record_sent (deliver , ConsumerTag , AckRequired , Msg , State ).
2765
+ record_sent (deliver , QueueType , ConsumerTag , AckRequired , Msg , State ).
2736
2766
2737
2767
handle_basic_get (WriterPid , DeliveryTag , NoAck , MessageCount ,
2738
2768
Msg = {_QName , _QPid , _MsgId , Redelivered ,
2739
2769
# basic_message {exchange_name = ExchangeName ,
2740
2770
routing_keys = [RoutingKey | _CcRoutes ],
2741
- content = Content }}, State ) ->
2771
+ content = Content }},
2772
+ QueueType , State ) ->
2742
2773
ok = rabbit_writer :send_command (
2743
2774
WriterPid ,
2744
2775
# 'basic.get_ok' {delivery_tag = DeliveryTag ,
@@ -2747,7 +2778,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
2747
2778
routing_key = RoutingKey ,
2748
2779
message_count = MessageCount },
2749
2780
Content ),
2750
- {noreply , record_sent (get , DeliveryTag , not (NoAck ), Msg , State )}.
2781
+ {noreply , record_sent (get , QueueType , DeliveryTag , not (NoAck ), Msg , State )}.
2751
2782
2752
2783
init_tick_timer (State = # ch {tick_timer = undefined }) ->
2753
2784
{ok , Interval } = application :get_env (rabbit , channel_tick_interval ),
@@ -2783,10 +2814,10 @@ get_operation_timeout_and_deadline() ->
2783
2814
Deadline = now_millis () + Timeout ,
2784
2815
{Timeout , Deadline }.
2785
2816
2786
- queue_fold (Fun , Init , Q ) ->
2787
- case ? QUEUE :out (Q ) of
2788
- {empty , _Q } -> Init ;
2789
- {{value , V }, Q1 } -> queue_fold (Fun , Fun (V , Init ), Q1 )
2817
+ queue_fold (Fun , Acc , Queue ) ->
2818
+ case ? QUEUE :out (Queue ) of
2819
+ {empty , _Queue } -> Acc ;
2820
+ {{value , Item }, Queue1 } -> queue_fold (Fun , Fun (Item , Acc ), Queue1 )
2790
2821
end .
2791
2822
2792
2823
evaluate_consumer_timeout (State0 = # ch {cfg = # conf {channel = Channel ,
0 commit comments